Spark 动态帧显示方法不产生任何结果

Spark dynamic frame show method yields nothing

提问人:PyRaider 提问时间:5/7/2019 最后编辑:Matt AndruffPyRaider 更新时间:8/15/2023 访问量:23088

问:

因此,我使用 AWS Glue 自动生成的代码从 S3 读取 csv 文件,并通过 JDBC 连接将其写入表。看起来很简单,Job 成功运行,没有错误,但它什么也没写。当我检查 Glue Spark 动态帧时,它会包含所有行的内容(使用 .count())。但是当它执行 .show() 时,它什么也没产生。

.printSchema() 工作正常。尝试在使用 .show() 时记录错误,但没有错误或未打印任何内容。使用 .toDF 和 show 方法将 DynamicFrame 转换为数据帧。 我认为文件存在一些问题,试图缩小到某些列。但即使文件中只有 2 列,也是一样的。用双引号清楚地标记字符串,仍然没有成功。

我们有像JDBC连接这样的东西,需要从Glue配置中挑选出来。我想常规的 spark 数据帧无法做到。因此需要动态帧工作。

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
import logging
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)

glueContext = GlueContext(SparkContext.getOrCreate())
spark = glueContext.spark_session

datasource0 = glueContext.create_dynamic_frame.from_options('s3', {'paths': ['s3://bucket/file.csv']}, 'csv', format_options={'withHeader': True,'skipFirst': True,'quoteChar':'"','escaper':'\\'})

datasource0.printSchema()
datasource0.show(5)

输出

root
|-- ORDERID: string
|-- EVENTTIMEUTC: string

以下是转换为常规数据框的结果。

datasource0.toDF().show()

输出

+-------+-----------------+
|ORDERID|     EVENTTIMEUTC|
+-------+-----------------+
|      2| "1/13/2018 7:50"|
|      3| "1/13/2018 7:50"|
|      4| "1/13/2018 7:50"|
|      5| "1/13/2018 7:50"|
|      6| "1/13/2018 8:52"|
|      7| "1/13/2018 8:52"|
|      8| "1/13/2018 8:53"|
|      9| "1/13/2018 8:53"|
|     10| "1/16/2018 1:33"|
|     11| "1/16/2018 2:28"|
|     12| "1/16/2018 2:37"|
|     13| "1/17/2018 1:17"|
|     14| "1/17/2018 2:23"|
|     15| "1/17/2018 4:33"|
|     16| "1/17/2018 6:28"|
|     17| "1/17/2018 6:28"|
|     18| "1/17/2018 6:36"|
|     19| "1/17/2018 6:38"|
|     20| "1/17/2018 7:26"|
|     21| "1/17/2018 7:28"|
+-------+-----------------+
only showing top 20 rows

以下是一些数据。

ORDERID, EVENTTIMEUTC
1, "1/13/2018 7:10"
2, "1/13/2018 7:50"
3, "1/13/2018 7:50"
4, "1/13/2018 7:50"
5, "1/13/2018 7:50"
6, "1/13/2018 8:52"
7, "1/13/2018 8:52"
8, "1/13/2018 8:53"
9, "1/13/2018 8:53"
10, "1/16/2018 1:33"
11, "1/16/2018 2:28"
12, "1/16/2018 2:37"
13, "1/17/2018 1:17"
14, "1/17/2018 2:23"
15, "1/17/2018 4:33"
16, "1/17/2018 6:28"
17, "1/17/2018 6:28"
18, "1/17/2018 6:36"
19, "1/17/2018 6:38"
20, "1/17/2018 7:26"
21, "1/17/2018 7:28"
22, "1/17/2018 7:29"
23, "1/17/2018 7:46"
24, "1/17/2018 7:51"
25, "1/18/2018 2:22"
26, "1/18/2018 5:48"
27, "1/18/2018 5:50"
28, "1/18/2018 5:50"
29, "1/18/2018 5:51"
30, "1/18/2018 5:53"
100, "1/18/2018 10:32"
101, "1/18/2018 10:33"
102, "1/18/2018 10:33"
103, "1/18/2018 10:42"
104, "1/18/2018 10:59"
105, "1/18/2018 11:16"
蟒蛇 pyspark apache-spark-sql aws-glue

评论

2赞 Arran Duff 8/5/2019
我遇到了完全相同的问题。dynamicFrame.show() 没有打印任何东西。PrintSchema() 有效,如果我转换为 spark 数据帧,show() 有效。我不知道为什么会这样。在我的特殊情况下,当我读取使用 gzip 压缩的 json 文件时,就会发生这种情况。未压缩的 json 文件不会发生这种情况。当我尝试使用 write_dynamic_frame.from_options 将 dynamicframe 写入 S3 时,出现以下错误 org.apache.spark.SparkException:由于阶段故障,作业中止:阶段 24.0 中的任务 0 失败 4 次,最近一次失败:阶段 24.0 中的任务 0.3 丢失
0赞 sandeep rawat 6/29/2022
在编写 .它的数据类型在写入时对数据进行验证。这个问题看起来数据有问题
0赞 abolotnov 12/28/2022
您的作业书签是否已启用?docs.aws.amazon.com/glue/latest/dg/monitor-continuations.html

答:

1赞 Alex W 5/19/2023 #1

阅读文档,它们对此不是很明确,但在某些情况下,在您调用之前没有处理底层,因此您实际上是在调用可能为空白的东西:DynamicFrameDataFrametoDF().show()

为了解决这些限制,AWS Glue 引入了 DynamicFrame。一个 DynamicFrame 类似于 DataFrame,只不过每条记录都是 自描述,因此最初不需要架构。取而代之的是 AWS Glue 在需要时动态计算架构,并显式编码 使用选择(或联合)类型的架构不一致。您可以解决 这些不一致使数据集与数据兼容 需要固定架构的存储。

.toDF()文件:

通过将 DynamicFrame 转换为 Apache Spark DataFrame,方法是将 DynamicRecords 添加到 DataFrame 字段中。返回新的 DataFrame。

检查此处的代码,似乎存在当您尝试打印底层 Java 数据框时为空的情况: https://github.com/awslabs/aws-glue-libs/blob/f973095b9f2aa784cbcc87681a00da3127125337/awsglue/dynamicframe.py#L78

def show(self, num_rows=20):
    print(self._jdf.showString(num_rows))

其中依赖于传入的参数(),该参数可能尚未被收集:__init__jdf

def __init__(self, jdf, glue_ctx, name=""):
    self._jdf = jdf
    self.glue_ctx = glue_ctx
    self._ssql_ctx = glue_ctx._ssql_ctx
    self._sc = glue_ctx and glue_ctx._sc
    self._schema = None
    self._lazy_rdd = None
    self.name = name

调用基础数据帧时,将处理:toDF()

... SNIP ...
scala_options.append(self.glue_ctx.convert_resolve_option(option.path, option.action, option.target))

    return DataFrame(self._jdf.toDF(self.glue_ctx._jvm.PythonUtils.toSeq(scala_options)), self.glue_ctx)

Java 文档中提到此方法从 RDD 转换(即它正在从工作线程收集结果):.toDF()

这在从元组的 RDD 转换为具有有意义的名称时非常方便。DataFrame