提问人:yuw444 提问时间:1/9/2023 最后编辑:Ken Whiteyuw444 更新时间:1/9/2023 访问量:54
多线程文件读取和处理
Multithread file reading and processing
问:
我刚开始接触多线程编程。下面是我尝试使用1个线程读取和逐行处理多个线程进行大型txt文件读取和处理的尝试。现在,我放了一个微不足道的过程函数;在我的实际应用程序中,实际的处理函数需要更长的时间,然后将处理后的结果输出到一个新文件中。
不知何故,代码间歇性地工作;某些竞争条件可能会出现在某个地方。我尝试使用并广泛记录进行调试,但无法弄清楚错误在哪里。所以,请帮忙。任何建议都是值得赞赏的。gdb
#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
#include <semaphore.h>
#include <string.h>
#include <zlib.h>
#define READER_SIZE 30
#define NUM_PROCESSORS 4
// shared data by the reader and processors
//------------------------------------------------
char reader_buffer[READER_SIZE][64];
int reader_in = 0; // next free position
int reader_out = 0; // first full position
sem_t reads_mutex; // semaphore to lock buffer
sem_t reads_full; // counts the number of filled slots
sem_t reads_empty; // counts the number of empty slots
gzFile fileIn;
//------------------------------------------------
// shared data by the processors and writer
//------------------------------------------------
pthread_mutex_t writes_mutex; // mutex lock for writting
gzFile fileOut;
//------------------------------------------------
// Function reader
void *producer()
{
while (!gzeof(fileIn))
{
printf("reader turn ");
sem_wait(&reads_empty);
sem_wait(&reads_mutex);
gzgets(fileIn, reader_buffer[reader_in], 64);
// printf("reader is working!\n");
// printf("producer read in: %d, %s \n", reader_in, reader_buffer[reader_in]);
int empty, full;
sem_getvalue(&reads_empty, &empty);
sem_getvalue(&reads_full, &full);
reader_in = (reader_in + 1) % READER_SIZE;
printf("reader_in: %d, empty is %d, full is %d\n", reader_in, empty, full + 1);
sem_post(&reads_mutex);
sem_post(&reads_full);
}
}
void *consumer(void *id)
{
int consumer_id = *((int *)id);
while (!gzeof(fileIn))
{
printf("consumer %d turn ", consumer_id);
sem_wait(&reads_full);
sem_wait(&reads_mutex);
// copy the data
char processor_buffer[64];
char cell_barcode[17];
strcpy(processor_buffer, reader_buffer[reader_out]);
reader_out = (reader_out + 1) % READER_SIZE;
int empty, full;
sem_getvalue(&reads_empty, &empty);
sem_getvalue(&reads_full, &full);
printf("reader_out: %d, empty is %d, full is %d \n", reader_out, empty + 1, full);
sem_post(&reads_mutex);
sem_post(&reads_empty);
// processing, here I made trivial to take substring
if(processor_buffer[0] == '@')
{
pthread_mutex_lock(&writes_mutex);
gzputs(fileOut, processor_buffer);
pthread_mutex_unlock(&writes_mutex);
}
}
}
int main()
{
fileIn = gzopen("temp.gz", "rb");
fileOut = gzopen("test.gz", "wb");
int *proc_id = malloc(NUM_PROCESSORS * sizeof(int));
pthread_t reader_thread;
pthread_t processor_thread[NUM_PROCESSORS];
// initialize semaphores
sem_init(&reads_mutex, 0, 1);
sem_init(&reads_full, 0, 0);
sem_init(&reads_empty, 0, READER_SIZE);
pthread_mutex_init(&writes_mutex, NULL);
pthread_create(&reader_thread, NULL, producer, NULL);
for (int i = 0; i < NUM_PROCESSORS; i++)
{
proc_id[i] = i;
pthread_create(&processor_thread[i], NULL, consumer, &proc_id[i]);
}
// wait for the reader and processor threads to finish
pthread_join(reader_thread, NULL);
for (int i = 0; i < NUM_PROCESSORS; i++)
{
pthread_join(processor_thread[i], NULL);
}
sem_destroy(&reads_mutex);
sem_destroy(&reads_full);
sem_destroy(&reads_empty);
gzclose(fileIn);
gzclose(fileOut);
return 0;
}
答: 暂无答案
评论
-g -fsanitize=thread
-g -fsanitize=address,undefined
thread
producer
gzgets
consumer
gz*
gzeof
c0, c1, c2, ..., cN
t0: c0, c3, c6, ...
t1: c1, c4, c7, ...
t2: c2, c5, c8, ...
struct qpair { int enq; int deq; };
stdatomic.h