多线程文件读取和处理

Multithread file reading and processing

提问人:yuw444 提问时间:1/9/2023 最后编辑:Ken Whiteyuw444 更新时间:1/9/2023 访问量:54

问:

我刚开始接触多线程编程。下面是我尝试使用1个线程读取和逐行处理多个线程进行大型txt文件读取和处理的尝试。现在,我放了一个微不足道的过程函数;在我的实际应用程序中,实际的处理函数需要更长的时间,然后将处理后的结果输出到一个新文件中。

不知何故,代码间歇性地工作;某些竞争条件可能会出现在某个地方。我尝试使用并广泛记录进行调试,但无法弄清楚错误在哪里。所以,请帮忙。任何建议都是值得赞赏的。gdb

#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
#include <semaphore.h>
#include <string.h>
#include <zlib.h>

#define READER_SIZE 30

#define NUM_PROCESSORS 4

// shared data by the reader and processors
//------------------------------------------------
char reader_buffer[READER_SIZE][64]; 
int reader_in = 0;                      // next free position
int reader_out = 0;                     // first full position

sem_t reads_mutex; // semaphore to lock buffer
sem_t reads_full;  // counts the number of filled slots
sem_t reads_empty; // counts the number of empty slots
gzFile fileIn;
//------------------------------------------------

// shared data by the processors and writer
//------------------------------------------------
pthread_mutex_t writes_mutex; // mutex lock for writting
gzFile fileOut;
//------------------------------------------------

// Function reader
void *producer()
{
    while (!gzeof(fileIn))
    {
        printf("reader turn ");
        sem_wait(&reads_empty);
        sem_wait(&reads_mutex);

        gzgets(fileIn, reader_buffer[reader_in], 64);

        // printf("reader is working!\n");
        // printf("producer read in: %d, %s \n", reader_in, reader_buffer[reader_in]);
        int empty, full;
        sem_getvalue(&reads_empty, &empty);
        sem_getvalue(&reads_full, &full);

        reader_in = (reader_in + 1) % READER_SIZE;
        printf("reader_in: %d, empty is %d, full is %d\n", reader_in, empty, full + 1);

        sem_post(&reads_mutex);
        sem_post(&reads_full);
    }
}

void *consumer(void *id)
{
    int consumer_id = *((int *)id);
    while (!gzeof(fileIn))
    {
        printf("consumer %d turn ", consumer_id);
        sem_wait(&reads_full);
        sem_wait(&reads_mutex);

        // copy the data
        char processor_buffer[64];
        char cell_barcode[17];

        strcpy(processor_buffer, reader_buffer[reader_out]);
        reader_out = (reader_out + 1) % READER_SIZE;

        int empty, full;
        sem_getvalue(&reads_empty, &empty);
        sem_getvalue(&reads_full, &full);

        printf("reader_out: %d, empty is %d, full is %d \n", reader_out, empty + 1, full);

        sem_post(&reads_mutex);
        sem_post(&reads_empty);

        // processing, here I made trivial to take substring
        if(processor_buffer[0] == '@')
        {

        pthread_mutex_lock(&writes_mutex);

        gzputs(fileOut, processor_buffer);

        pthread_mutex_unlock(&writes_mutex);

        }

    }

}

int main()
{
    fileIn = gzopen("temp.gz", "rb");
    fileOut = gzopen("test.gz", "wb");

    int *proc_id = malloc(NUM_PROCESSORS * sizeof(int));

    pthread_t reader_thread;
    pthread_t processor_thread[NUM_PROCESSORS];

    // initialize semaphores
    sem_init(&reads_mutex, 0, 1);
    sem_init(&reads_full, 0, 0);
    sem_init(&reads_empty, 0, READER_SIZE);

    pthread_mutex_init(&writes_mutex, NULL);

    pthread_create(&reader_thread, NULL, producer, NULL);

    for (int i = 0; i < NUM_PROCESSORS; i++)
    {
        proc_id[i] = i;
        pthread_create(&processor_thread[i], NULL, consumer, &proc_id[i]);
    }

    // wait for the reader and processor threads to finish
    pthread_join(reader_thread, NULL);

    for (int i = 0; i < NUM_PROCESSORS; i++)
    {

        pthread_join(processor_thread[i], NULL);
    }

    sem_destroy(&reads_mutex);
    sem_destroy(&reads_full);
    sem_destroy(&reads_empty);

    gzclose(fileIn);

    gzclose(fileOut);

    return 0;
}
C 多线程 IO

评论

1赞 Ted Lyngmo 1/9/2023
尝试遵守,看看这是否能在运行程序时为您提供更多线索。您也可以尝试 - 但您不能同时使用这两个 .-g -fsanitize=thread-g -fsanitize=address,undefinedthread
1赞 Craig Estey 1/9/2023
由于读取压缩文件(通过)并将解压缩的输出发布到共享缓冲区,我不确定为什么要对输入流使用任何函数(例如)。这似乎是错误的,并且存在可能会弄乱输出尾部的竞争条件。producergzgetsconsumergz*gzeof
1赞 Craig Estey 1/9/2023
另外,让我们假装没有压缩。如果文件的“块”为 64: ,则生产者以轮询方式将这些块排入队列(假设有 3 个输出线程):、 和 但是,即使锁定输出,输出顺序也可能因为输出线程之间的争用而打乱。c0, c1, c2, ..., cNt0: c0, c3, c6, ...t1: c1, c4, c7, ...t2: c2, c5, c8, ...
1赞 Craig Estey 1/9/2023
好。顺序不重要。块处理时间是否比读取/解压缩时间和重新压缩/写入时间长得多?有几种方法可以做到这一点,具体取决于相对的处理时间。如果输出顺序重要,我会让生产者将数据发送到单个共享队列。每个使用者线程都可以从单个队列的顶部取消排队。这允许所有使用者线程以最大速度运行。否则,如果处理时间因块数据而异,则轮询可能会导致瓶颈,因为一个线程被赋予了几个“硬”块,而其他线程处于空闲状态。
1赞 Craig Estey 1/9/2023
别客气!看我的回答: 需要测试以确定异步 I/O 是否真的发生在 C 代码中 它有一些基本的队列代码。但是,它没有线程间锁定(例如互斥锁)。因此,您可以添加这些。或者,如果将 enqueue/dequeue 索引放在 struct: 中,则可以使用基元以原子方式更改索引对,而不是使用互斥锁struct qpair { int enq; int deq; };stdatomic.h

答: 暂无答案