Azure 数据工厂、Batch Service、Python:无法使用 csv 块追加 blob

Azure Data Factory, Batch Service, Python: Unable to append blob with csv chunks

提问人:Eve 提问时间:11/16/2023 最后编辑:Eve 更新时间:11/20/2023 访问量:75

问:

Azure Data Facory 管道使用Batch 服务作为链接服务运行自定义活动,其中包含 Python 代码。该转换是为在本地运行而构建的,我想让它在 Azure 上运行,将 Blob(csv 文件)保存到 Azure 存储 (Az Data Lake) 中。

转换在块上运行,并在 for 循环中执行。

for i, in_chunk in enumerate(pd.read_csv(source, chunksize=6000, sep=";")
    # transformation happens and spits out out_chunk parameter 
    # holding the 6000 rows out of the entire file
    kwargs = {'mode':'a', 'header': False} if i>0 else {}
    out_chunk.to_csv(cdl_file, sep="|", index=False, **kwargs)

在此之后,我尝试了不同的方法,因为它写在这个问题和答案中,例如: 我原来的问题

上面的问题和答案中写的解决方案没有抛出错误,但它没有存储整个文件,只存储指定为块的 6000 行。

我做错了什么?我不明白应该如何处理这个问题。

编辑:根据JayashankarGS的要求,我添加了我尝试的代码和有关所发生情况的屏幕截图。

def transform_data(v, outputType, eventType, fileName, config, source, conn_string, dl, encoding, in_dtypes = None,  chunkSize=10000, in_sep = ";"):
folderName = 'temp'
containerName = 'input'
outputBlobName = folderName + "/" + fileName
inputBlobPath = containerName + "/" + outputBlobName
blob = BlobClient.from_connection_string(conn_str=conn_string, container_name=containerName, blob_name=outputBlobName)

credential = {'connection_string': conn_string}
accountName = conn_string.split("AccountName=")[1].split(";")[0] 

adls_path = 'abfs://' + containerName + '@' + accountName + '.dfs.core.windows.net/' + outputBlobName

template = pd.DataFrame(columns = v.headers[outputType])
transformationSchema = config[outputType + "_out_transformations"]

logging.info('Reading data chunk...')
for i, in_chunk in enumerate(pd.read_csv(source,
            chunksize = chunkSize,
            sep=in_sep,
            encoding = 'unicode_escape',
            dtype=in_dtypes)):
    logging.info('Handle duplicates and missing fields...')

    in_chunk.fillna('', inplace=True) 
    out_chunk = template.copy()

    out_chunk.fillna('', inplace=True)

    out_chunk.drop_duplicates(subset=[config["composite_key"]["key_partA"], config["composite_key"]["key_partB"], config["composite_key"]["key_partC"]],inplace=True)

    logging.info('Start data transformation for schema: ' + outputType)

    for name, spec in transformationSchema.items():
        out_chunk[name] = transform_column(in_chunk, spec)

    kwargs = {'mode': 'a', 'header': False}

    logging.info('Transformation was successful for ' + outputType)
    dateTime = time.strftime("%Y%m%d%H%M%S")

    if not os.path.exists(containerName + "/" + folderName + "/"):
            os.mkdir(containerName)
            os.mkdir(containerName + "/" + folderName + "/")

    print(f"Uploading chunk: {len(out_chunk)}")
    logging.info('Trying to store transformed file in Azure Storage...')

    out_chunk.to_csv(adls_path, storage_options=credential, sep=dl, index=False, **kwargs)

这样做的结果是生成并存储在 Azure 存储中的两个文件。正如在 Azure 数据工厂上使用 Batch 服务运行此操作的结果所示,它会根据给定的批处理大小处理 10000 行,然后尝试对第二个文件执行相同的操作。“找不到文件”错误来自转换后的步骤,该步骤是验证器(忽略该警告!

enter image description here

python azure azure-data-factory azure-storage azure-batch

评论

0赞 JayashankarGS 11/17/2023
你为什么要使用块大小。正如我所看到的,针对您之前的问题给出的解决方案是将整个 csv 直接上传到 azure datalake。
0赞 Eve 11/17/2023
分块是为文件的转换而实现的,因为否则需要在一轮中处理 4GiB 的数据。这是一种不同的文件处理类型,如果可能的话,想尝试保留分块 - 但不确定如何解决问题,然后:)
0赞 JayashankarGS 11/17/2023
此代码在本地工作正常吗?
0赞 Eve 11/17/2023
在本地,它的工作方式是,它在处理后将块存储在本地 csv 文件中,并最终对整个文件执行此操作。因此,我尝试以类似的方式将其存储到 blob 存储中:)我不确定这是否真的有可能......

答:

1赞 JayashankarGS 11/17/2023 #1

原因是具有不同的 blob 类型。

在代码中:

adls_path = 'abfs://[email protected]/outfromVS.csv'
credentials = {'connection_string': blob_conn}

for i, in_chunk in enumerate(pd.read_csv('./hts-sample-test.csv', chunksize=500, sep=";")):
    kwargs = {'mode': 'a', 'header': False} if i > 0 else {}
    print(in_chunk.count())
    in_chunk.to_csv(adls_path, storage_options=credentials, **kwargs)

对于第一次迭代,您没有传递 ;默认情况下,这将使文件类型为 Block blobmode

enter image description here

写入存储时,BlobType 应相同。

因此,只需从一开始就给出模式:kwargs = {'mode': 'a', 'header': False}

以下是我的数据计数:

enter image description here

现在以 500 的块大小上传:

for i, in_chunk in enumerate(pd.read_csv('./hts-sample-test.csv', chunksize=500)):
    kwargs = {'mode': 'a', 'header': False}
    print(f"Uploading chunk: {len(in_chunk)}")
    in_chunk.to_csv(adls_path, storage_options=credentials, **kwargs)

输出:enter image description here

enter image description here

再次,获取文件:

enter image description here

注意:首次运行输出文件时,请确保存储帐户中不存在输出文件,如果存在,则应为“追加 blob”类型。

评论

0赞 Eve 11/17/2023
好的,它变成了一个追加 blob,但它没有向它添加超过块大小 - 它在上传第一个块后停止运行。每次需要将新块追加到文件时,是否应该重新打开文件?
0赞 JayashankarGS 11/17/2023
在问题中添加输出屏幕截图。以及您尝试的代码。
0赞 Eve 11/20/2023
我根据您的要求更新了问题,如果您需要更多信息,请告诉我:)
0赞 JayashankarGS 11/21/2023
您能否在目标中看到输出文件?检查日志以查看收到此错误的确切位置。上传 10000 条记录后,它无法进一步上传,在第二次迭代中似乎出了点问题。
0赞 Eve 11/21/2023
它发生在第一次迭代之后!因此,它读取块,转换并保存到存储中,然后它不会运行下一次迭代......