提问人:Eve 提问时间:11/16/2023 最后编辑:Eve 更新时间:11/20/2023 访问量:75
Azure 数据工厂、Batch Service、Python:无法使用 csv 块追加 blob
Azure Data Factory, Batch Service, Python: Unable to append blob with csv chunks
问:
Azure Data Facory 管道使用Batch 服务作为链接服务运行自定义活动,其中包含 Python 代码。该转换是为在本地运行而构建的,我想让它在 Azure 上运行,将 Blob(csv 文件)保存到 Azure 存储 (Az Data Lake) 中。
转换在块上运行,并在 for 循环中执行。
for i, in_chunk in enumerate(pd.read_csv(source, chunksize=6000, sep=";")
# transformation happens and spits out out_chunk parameter
# holding the 6000 rows out of the entire file
kwargs = {'mode':'a', 'header': False} if i>0 else {}
out_chunk.to_csv(cdl_file, sep="|", index=False, **kwargs)
在此之后,我尝试了不同的方法,因为它写在这个问题和答案中,例如: 我原来的问题
上面的问题和答案中写的解决方案没有抛出错误,但它没有存储整个文件,只存储指定为块的 6000 行。
我做错了什么?我不明白应该如何处理这个问题。
编辑:根据JayashankarGS的要求,我添加了我尝试的代码和有关所发生情况的屏幕截图。
def transform_data(v, outputType, eventType, fileName, config, source, conn_string, dl, encoding, in_dtypes = None, chunkSize=10000, in_sep = ";"):
folderName = 'temp'
containerName = 'input'
outputBlobName = folderName + "/" + fileName
inputBlobPath = containerName + "/" + outputBlobName
blob = BlobClient.from_connection_string(conn_str=conn_string, container_name=containerName, blob_name=outputBlobName)
credential = {'connection_string': conn_string}
accountName = conn_string.split("AccountName=")[1].split(";")[0]
adls_path = 'abfs://' + containerName + '@' + accountName + '.dfs.core.windows.net/' + outputBlobName
template = pd.DataFrame(columns = v.headers[outputType])
transformationSchema = config[outputType + "_out_transformations"]
logging.info('Reading data chunk...')
for i, in_chunk in enumerate(pd.read_csv(source,
chunksize = chunkSize,
sep=in_sep,
encoding = 'unicode_escape',
dtype=in_dtypes)):
logging.info('Handle duplicates and missing fields...')
in_chunk.fillna('', inplace=True)
out_chunk = template.copy()
out_chunk.fillna('', inplace=True)
out_chunk.drop_duplicates(subset=[config["composite_key"]["key_partA"], config["composite_key"]["key_partB"], config["composite_key"]["key_partC"]],inplace=True)
logging.info('Start data transformation for schema: ' + outputType)
for name, spec in transformationSchema.items():
out_chunk[name] = transform_column(in_chunk, spec)
kwargs = {'mode': 'a', 'header': False}
logging.info('Transformation was successful for ' + outputType)
dateTime = time.strftime("%Y%m%d%H%M%S")
if not os.path.exists(containerName + "/" + folderName + "/"):
os.mkdir(containerName)
os.mkdir(containerName + "/" + folderName + "/")
print(f"Uploading chunk: {len(out_chunk)}")
logging.info('Trying to store transformed file in Azure Storage...')
out_chunk.to_csv(adls_path, storage_options=credential, sep=dl, index=False, **kwargs)
这样做的结果是生成并存储在 Azure 存储中的两个文件。正如在 Azure 数据工厂上使用 Batch 服务运行此操作的结果所示,它会根据给定的批处理大小处理 10000 行,然后尝试对第二个文件执行相同的操作。“找不到文件”错误来自转换后的步骤,该步骤是验证器(忽略该警告!
答:
原因是具有不同的 blob 类型。
在代码中:
adls_path = 'abfs://[email protected]/outfromVS.csv'
credentials = {'connection_string': blob_conn}
for i, in_chunk in enumerate(pd.read_csv('./hts-sample-test.csv', chunksize=500, sep=";")):
kwargs = {'mode': 'a', 'header': False} if i > 0 else {}
print(in_chunk.count())
in_chunk.to_csv(adls_path, storage_options=credentials, **kwargs)
对于第一次迭代,您没有传递 ;默认情况下,这将使文件类型为 Block blob。mode
写入存储时,BlobType 应相同。
因此,只需从一开始就给出模式:kwargs = {'mode': 'a', 'header': False}
以下是我的数据计数:
现在以 500 的块大小上传:
for i, in_chunk in enumerate(pd.read_csv('./hts-sample-test.csv', chunksize=500)):
kwargs = {'mode': 'a', 'header': False}
print(f"Uploading chunk: {len(in_chunk)}")
in_chunk.to_csv(adls_path, storage_options=credentials, **kwargs)
输出:
和
再次,获取文件:
注意:首次运行输出文件时,请确保存储帐户中不存在输出文件,如果存在,则应为“追加 blob”类型。
评论
上一个:创建没有数据流的管道
下一个:如何将ADF中的时区转换为CST
评论