提问人:anonymus1205 提问时间:11/8/2023 更新时间:11/8/2023 访问量:40
PySpark:无法将 MutableLong 转换为 MutableInt
PySpark: MutableLong cannot be cast to MutableInt
问:
我正在尝试使用 PySpark 和 Glue API 从 AWS Glue 作业中读取多个 parquet 文件,如下所示:
snapshot_as_dynamic_frame = glueContext.create_dynamic_frame.from_options(
format_options={},
connection_type="s3",
format="parquet",
connection_options={"paths": files_to_compact_s3a, "recurse": False},
transformation_ctx="snapshot_as_dynamic_frame",
)
files_*to_*compact_s3a 是 S3 中要读取的文件名列表。其中一个文件是由 DuckDB API 生成的,用于创建 parquet 文件(从其他 AWS lambda 函数运行,其目的是压缩 parquet 文件以进行代码优化):
使用的 DuckDB 查询是:
" COPY (" f"SELECT * FROM read_parquet({files_to_compact})) "
f"TO '{compact_file_name}' (FORMAT 'parquet');"
当 AWS Glue 任务尝试读取此 parquet 文件(和其他 parquet 文件)时,它会引发以下异常:
org.apache.spark.sql.catalyst.expressions.MutableLong cannot be cast to org.apache.spark.sql.catalyst.expressions.MutableIn
此文件可由 AWS Athena 查询,似乎它没有损坏或其他东西。
该文件在 AWS Glue PySpark 作业中读取该文件存在一个问题。
我使用另一个 DuckDB 查询检查了文件架构:
┌──────────────────────┬──────────────────────┬────────────┬───┬───────┬───────────┬──────────┬──────────────────────┐
│ file_name │ name │ type │ … │ scale │ precision │ field_id │ logical_type │
│ varchar │ varchar │ varchar │ │ int64 │ int64 │ int64 │ varchar │
├──────────────────────┼──────────────────────┼────────────┼───┼───────┼───────────┼──────────┼──────────────────────┤
│ s3://bmc-saas-dp-r… │ duckdb_schema │ NULL │ … │ NULL │ NULL │ NULL │ NULL │
│ s3://bmc-saas-dp-r… │ id │ BYTE_ARRAY │ … │ NULL │ NULL │ NULL │ NULL │
│ s3://bmc-saas-dp-r… │ server │ BYTE_ARRAY │ … │ NULL │ NULL │ NULL │ NULL │
│ s3://bmc-saas-dp-r… │ run_id │ BYTE_ARRAY │ … │ NULL │ NULL │ NULL │ NULL │
│ s3://bmc-saas-dp-r… │ run_count │ INT32 │ … │ NULL │ NULL │ NULL │ NULL │
│ s3://bmc-saas-dp-r… │ text │ BYTE_ARRAY │ … │ NULL │ NULL │ NULL │ NULL │
│ s3://bmc-saas-dp-r… │ text_length │ INT64 │ … │ NULL │ NULL │ NULL │ NULL │
│ s3://bmc-saas-dp-r… │ text_file_path │ BYTE_ARRAY │ … │ NULL │ NULL │ NULL │ NULL │
│ s3://bmc-saas-dp-r… │ task_type │ BYTE_ARRAY │ … │ NULL │ NULL │ NULL │ NULL │
│ s3://bmc-saas-dp-r… │ collection_time │ INT64 │ … │ NULL │ NULL │ NULL │ TimestampType(isAd… │
│ s3://bmc-saas-dp-r… │ task_handle_duration │ INT64 │ … │ NULL │ NULL │ NULL │ NULL │
│ s3://bmc-saas-dp-r… │ data_collector_host │ BYTE_ARRAY │ … │ NULL │ NULL │ NULL │ NULL │
│ s3://bmc-saas-dp-r… │ status_code │ INT32 │ … │ NULL │ NULL │ NULL │ NULL │
│ s3://bmc-saas-dp-r… │ failure_code │ BYTE_ARRAY │ … │ NULL │ NULL │ NULL │ NULL │
│ s3://bmc-saas-dp-r… │ failure_description │ BYTE_ARRAY │ … │ NULL │ NULL │ NULL │ NULL │
│ s3://bmc-saas-dp-r… │ version │ BYTE_ARRAY │ … │ NULL │ NULL │ NULL │ NULL │
│ s3://bmc-saas-dp-r… │ rule │ BYTE_ARRAY │ … │ NULL │ NULL │ NULL │ NULL │
│ s3://bmc-saas-dp-r… │ dynamic_partition_… │ INT64 │ … │ NULL │ NULL │ NULL │ NULL │
│ s3://bmc-saas-dp-r… │ day │ INT64 │ … │ NULL │ NULL │ NULL │ NULL │
│ s3://bmc-saas-dp-r… │ month │ INT64 │ … │ NULL │ NULL │ NULL │ NULL │
│ s3://bmc-saas-dp-r… │ rule_name │ BYTE_ARRAY │ … │ NULL │ NULL │ NULL │ NULL │
│ s3://bmc-saas-dp-r… │ year │ INT64 │ … │ NULL │ NULL │ NULL │ NULL │
├──────────────────────┴──────────────────────┴────────────┴───┴───────┴───────────┴──────────┴──────────────────────┤
│ 22 rows 11 columns (7 shown) │
└────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘
我怀疑列“collection_time”的时间戳类型,但我不确定哪一列产生了问题。
我尝试在胶水作业中设置火花会话属性,例如:
spark.conf.set("spark.sql.parquet.mergeSchema", "true")
但没有任何帮助。
如果能提供解决方案,我将不胜感激。
谢谢。
答: 暂无答案
评论