是否可以有效地从大型固定宽度的 CSV 文件中获取行的子集?

Is it possible to efficiently get a subset of rows from a large fixed-width CSV file?

提问人:Many Questions 提问时间:4/26/2022 最后编辑:Ken WhiteMany Questions 更新时间:4/27/2022 访问量:157

问:

我有一个非常大的固定宽度 CSV 文件(130 万行和 80K 列)。它的大小约为 230 GB。我需要能够获取这些行的子集。我有一个我需要的行索引向量。但是,我现在需要弄清楚如何遍历如此庞大的文件来获取它们。

按照我的理解,C++ 将逐行遍历文件,直到它到达换行符(或给定的分隔符),此时,它将清除缓冲区,然后移动到下一行。我还听说过一个函数可以转到流中的给定位置。那么,是否可以以某种方式使用此函数来快速获取指向正确行号的指针?seek()

我认为,由于程序基本上不必运行数十亿个 if 语句来检查换行符,因此如果我简单地告诉程序在固定宽度文件中的位置,它可能会提高速度。但我不知道该怎么做。

假设我的文件有字符宽度,我的行号是 (where )。在这种情况下,我可以简单地告诉文件指针去 ,对吧?但是对于下一行,我是从行的末尾还是从下一行的开头计算下一个跳跃?在计算跳跃时,我应该包括换行符吗?n{l_1, l_2, l_3, ... l_m}l_1 < l_2 < l_3, ... < l_m(l_1 - 1) * nl_1

这是否有助于提高速度,或者我只是误解了这里的东西?

感谢您抽出宝贵时间提供帮助

编辑:文件将如下所示:

id0000001,AB,AB,AA,--,BB
id0000002,AA,--,AB,--,BB
id0000003,AA,AA,--,--,BB
id0000004,AB,AB,AA,AB,BB
C++ CSV IO 固定宽度

评论

3赞 WhozCraig 4/26/2022
seek除非您处理的是固定的记录长度,否则您没有好处。如果您有固定的记录长度,则只需简单的数学计算即可。如果不这样做,那么你别无选择,只能以一种或另一种方式枚举换行符,直到到达你感兴趣的行。你如何做到这一点取决于你。
2赞 Ted Lyngmo 4/26/2022
...但要继续 @WhozCraig 的评论,如果你经常处理这个文件,你可以从中创建一个二进制文件,以便更快地读取和定位,或者创建一个包含行索引(二进制形式)的第二个文件。
0赞 Many Questions 4/26/2022
感谢您的回复。有没有办法确定这一点?所有行的字符数相同,我认为每个字符在内存中占用的空间相同。所以我认为所有行也应该具有相同的记录长度,对吧?
3赞 WhozCraig 4/26/2022
如果每行的字符数完全相同,包括换行符,则基本数学将起作用。否则,可以选择像@TedLyngmo所述的备用索引版本。你仍然会付钱给吹笛人,但它只会发生一次(假设你没有做一些疯狂的事情,比如试图修改这个“东西”)。
0赞 Ryan M 4/26/2022
评论不用于扩展讨论;此对话已移至 Chat

答:

1赞 Vlad Feinstein 4/26/2022 #1

正如我在评论中建议的那样,您可以将数据字段压缩为两位:

-- 00
AA 01
AB 10
BB 11

这会将您的文件大小减少 12 倍,因此它将是 ~20GB。考虑到您的处理可能是 IO 绑定的,您可以将处理速度提高 12 倍。

生成的文件将具有 20,000 字节的记录长度,因此很容易计算到任何给定记录的偏移量。没有需要考虑的新行符号:)

以下是我构建该二进制文件的方法:

#include <fstream>
#include <iostream>
#include <string>
#include <chrono>

int main()
{
    auto t1 = std::chrono::high_resolution_clock::now();
    std::ifstream src("data.txt", std::ios::binary);
    std::ofstream bin("data.bin", std::ios::binary);
    size_t length = 80'000 * 3 + 9 + 2; // the `2` is a length of CR/LF on my Windows; use `1` for other systems
    std::string str(length, '\0');
    while (src.read(&str[0], length))
    {
        size_t pos = str.find(',') + 1;
        for (int group = 0; group < 2500; ++group) {
            uint64_t compressed(0), field(0);
            for (int i = 0; i < 32; ++i, pos += 3) {
                if (str[pos] == '-')
                    field = 0;
                else if (str[pos] == 'B')
                    field = 3;
                else if (str[pos + 1] == 'B')
                    field = 2;
                else
                    field = 1;

                compressed <<= 2;
                compressed |= field;
            }
            bin.write(reinterpret_cast<char*>(&compressed), sizeof compressed);
        }
    }
    auto t2 = std::chrono::high_resolution_clock::now();
    std::cout << std::chrono::duration_cast<std::chrono::microseconds>(t2 - t1).count() << std::endl;

    // clear `bad` bit set by trying to read past EOF
    src.clear();
    // rewind to the first record
    src.seekg(0);
    src.read(&str[0], length);
    // read next (second) record
    src.read(&str[0], length);
    // read forty second record from start (skip 41)
    src.seekg(41 * length, std::ios_base::beg);
    src.read(&str[0], length);
    // read next (forty third) record
    src.read(&str[0], length);
    // read fifties record (skip 6 from current position)
    src.seekg(6 * length, std::ios_base::cur);
    src.read(&str[0], length);

    return 0;
}

这可以在一秒钟内编码大约 1,600 条记录,因此整个文件将需要 ~15 分钟。您现在需要多长时间来处理它?

更新:

添加了如何从 中读取单个记录的示例。src

我只设法在二进制模式下工作。seekg()

评论

0赞 Many Questions 4/26/2022
谢谢你@VladFeinstein。我一定会尝试实现文件编码。但这很可能是未来的实现。目前,我想了解如何使用 seek() 函数在固定长度的文件之间移动。src.read() 函数(似乎复制到缓冲区,固定宽度的字符数)会和 seek() 做同样的事情吗?还是seek()更好?如果 seek 仍然是这里的首选方法,你能不能麻烦你举个例子?非常感谢您的帮助:)
0赞 Vlad Feinstein 4/27/2022
@ManyQuestions请看我更新的答案
1赞 Red.Wave 4/26/2022 #2

类中的函数系列通常是面向字节的。您可以使用它们,前提是您绝对确信您的记录(在本例中为行)具有固定的字节数;在这种情况下,您可以以二进制形式打开文件,而不是 ,并使用它可以将指定数量的字节读取到具有足够容量的字节数组中。但是 - 因为文件毕竟存储的是文本 - 如果即使一条记录的大小不同,您也会错位;如果 ID 字段保证等于行号 - 或者至少是它的不断增加的映射 - 有根据的猜测和后续的试错会有所帮助。您需要快速切换到更好的数据库管理;即使是 10GB 的单个二进制文件也太大,容易快速损坏。您可以考虑将其切成更小的切片(可能是 100MB 的数量级),以尽量减少损坏传播的机会。另外,您必须需要一些冗余机制来恢复/纠正。seek<iostream>getline.read