使用 TPL 在 C# 中与 I/O 绑定线程交互以生成 100 GB 随机数据的最佳方式 [已关闭]

Best way to interact with I/O bound threads to produce 100 GB random data in C# using TPL [closed]

提问人:Usman 提问时间:9/15/2023 更新时间:9/15/2023 访问量:59

问:


想改进这个问题吗?更新问题,以便可以通过编辑这篇文章来用事实和引文来回答。

2个月前关闭。

我需要生成一个包含随机数据的 100GB 文件。我知道除了几行只是生成随机数据之外,这都是 I/O 密集型操作。

我的目标是创建 100 GB 或 50 GB 的文件。我尝试了几种方法:

  • 创建 Environment.ProcessorCount 等效线程,并为每个线程分配部分数据(例如特定数据块)

  • I/O 绑定缓冲区,同时将数据保存在内存中,并在迭代期间将其刷新到磁盘。再次重新填充它们,直到相应的线程尚未完成。

  • Parallel.For 具有本地存储选项。(此选项被描述为不是 I/O 绑定操作的最佳选择,因为它对于内存中数据集更有效。

所有选项都无法生成包含随机数据的 100 GB 数据文件。所有这些都需要数小时(即整整半天才能产生 50 GB)......

现在测试以下技术:

现在,我正在为每个线程生成单独的文件,并要求每个线程在这些文件中生成数据。这至少可以避免在文件级别锁定。 然后,我计划在所有线程完成文件中的数据生成后合并这些文件。但我在这里也没有太大希望。读取一个超大的单个文件并将其内容附加到另一个目标文件可能不正确,最终可能会产生相同的效果。虽然在这里我可以在一个线程中做事,根本不需要锁定。

问题:如何有效地解决这个问题?

C# IO 任务并行库

评论

2赞 Matthew 9/15/2023
单个 CPU 内核生成数据的速度可能比 I/O 写入数据的速度更快,因此并行化在这里可能没有帮助。
1赞 Fildor 9/15/2023
你已经问过这个问题了,不是吗?
1赞 Fildor 9/15/2023
这回答了你的问题吗?如何在 C# 中使用 TPL 以最快的方式将数十亿条随机生成的数据记录写入文件
0赞 Usman 9/15/2023
@Fildor:是的,那里的谈话更长。我在这里从另一个角度要求寻求不同的观点。
0赞 Usman 9/15/2023
@Matthew :如果我有一个线程,它将按顺序进行,并且可能为 10000000000 行,它会去归档很多次。我们为什么不创建处理器数量的线程,并使所有线程至少处于繁忙状态。这样一来,由于单线程的原因,可以很晚处理的记录可能会被其他线程更早地处理。这种观点不对吗?

答:

0赞 JonasH 9/15/2023 #1

SSD 和 HDD 在按顺序写入数据时效果最佳。进行并发写入将弊大于利。

我能写的最微不足道的实现花了 6 分 38 秒来写入 82Gb。因此,如果你的代码需要几个小时,那可能是一个巨大的子优化。与我的预期相反,瓶颈实际上在于随机数生成,使用内置的 .但根据您的具体需求,可能会有更快的 PRNG。Random

为了避免这种情况,我们可以使用多个线程来生成数字,但一个线程来执行实际的写入:

var bufferSize = 1024 * 1024 * 64; // 64Mb buffers
var freeBuffers = new ConcurrentBag<byte[]>();
var writeQueue = new BlockingCollection<byte[]>();

void WriteThread()
{
    using var fs = File.OpenWrite(@"c:\temp\randomFile2.bin");
    foreach (var buffer in writeQueue.GetConsumingEnumerable())
    {
        fs.Write(buffer, 0, buffer.Length);
        freeBuffers.Add(buffer);
    }
}

var writeTask = Task.Run(WriteThread);

var sw = Stopwatch.StartNew();
Parallel.For(0,
    1000,
    new ParallelOptions() { MaxDegreeOfParallelism = 8 },
    () => new Random(), // local init
    (i, state, random) => // loop body
    {
        if (!freeBuffers.TryTake(out var buffer))
        {
            buffer = new byte[bufferSize];
        }
        random.NextBytes(buffer);
        writeQueue.Add(buffer);
        return random;
    },
    random => {} // local finally
    );
writeQueue.CompleteAdding();
writeTask.Wait();
sw.Stop();

Console.WriteLine($"Total Time: {sw.Elapsed}");

在我的计算机上,这花了 53 秒才能生成一个 78Gb 的文件。我认为这对于大多数目的来说已经足够快了。

评论

0赞 Usman 9/15/2023
事实上,我并不是真的依赖数字。随机数只是其中的一部分。它仅生成为一个数字,然后是句子,然后是下一行,再次以随机数开头,然后是句子。{randomNumber}。{句子}。
0赞 Usman 9/15/2023
因此,我正在准备一个大型文本文件,以便在下一步中使用 ExternalMergeSort 对其进行排序。因此,数字 si 根本不是问题。
0赞 Usman 9/15/2023
一个问题。所有线程 (8) 都在队列中准备数据。对于 82 GB,您不认为会在某个时间点立即或稍后获得它吗 OutOfMemoryException?因为所有并行线程都可以在有限的时间内完成其工作,然后它们不会立即刷新,并且当整个内存已满时,您将在某个时间点上。因为 82 GB 在 RAM 中。
0赞 Usman 9/15/2023
此外:您似乎只生成了 1000 个数字。因为我可以看到你在并行 foreach 循环中提到的范围
0赞 JonasH 9/18/2023
@Usman,它会生成 1000 x 64Mb 缓冲区,但会根据需要调整数字。如果您担心内存问题,可以在 writeQueue 上设置一个限制,这样,如果写入线程跟不上,生产者线程就会阻塞。但最主要的一点是,并发写入会适得其反。如果生成数字的速度足够快,只需编写一个简单的循环即可。
1赞 user22191764 9/15/2023 #2

创建一个 MemoryMappedFile 并分配 100GB。 之后,您可以对文件进行分区并写入块。

在您的情况下,由于磁盘和底层连接的 I/O 边界,任何内存或 CPU 优化都几乎无关紧要。PCIe nVME或M2。SSD 将表现最佳,但我不建议将 100GB 的垃圾写入任何固态驱动器。HDD 会很慢,如果您写入通过 USB 连接的外部磁盘,则速度会更慢。

总之:传输速度和写入速度规则,对于大文件,使用内存映射文件进行分配并防止加入多个较小的文件。

private static void WriteRandom100Gb()
{
    // 100GB file size.
    long size = 100 * (long)Math.Pow(1024, 3);
    string path = "F:\\somebigfile.dat";
    using (var f = File.Create(path))
    {
        using (var mmf = MemoryMappedFile.CreateFromFile(f, "bigmmf", size, MemoryMappedFileAccess.ReadWrite, HandleInheritability.Inheritable, true))
        {

            // 16kB write size.
            long partitionSize = 64 * 1024 * 1024;
            long partitions = (size / partitionSize);

            Random rnd = new Random();

            for (int n = 0; n < partitions; n++)
            {
                using (var mms = mmf.CreateViewStream(n * partitionSize, partitionSize, MemoryMappedFileAccess.Write))
                {
                    byte[] buf = new byte[partitionSize];
                    rnd.NextBytes(buf);
                    mms.Write(buf, 0, buf.Length);
                }
            }
        }
    }
}