提问人:Slite 提问时间:11/2/2023 更新时间:11/3/2023 访问量:34
具有不一致列的 pyspark 数据帧的字典
Dictionary to pyspark dataframe with inconsistent columns
问:
我目前正在 pyspark 中构建一个转换脚本,这是工作流程:
- 获取原始 xml 文件
- 将 XSLT 应用于原始文件,这将生成另一个包含不同表的数据的 XML
- 将转换后的 xml 加载到多个数据帧
- 在湖中写入数据帧。
目前,我正在为每一行构建字典,并尝试将它们加载到一行中,然后加载到数据帧中。我遇到了麻烦,因为每个 null 列都没有出现在 xml 行中。因此,有些行将有 20 列,有些有 35 列。
这导致
IllegalStateException:输入行没有架构所需的预期值数。35 个字段是必需的,而提供了 24 个值。
以下是我目前如何解析我的 XML 以口述到 Dataframe 行:
xmlTransformedRoot = xml_transformed.getroot() #XML after xslt being applied
list_of_tables = {child.tag for child in xmlTransformedRoot} #Set to get unique table names
Database = []
for table in list_of_tables:
listData = []
for child in xmlTransformedRoot.findall(table):
data = dict()
for subelem in child:
column = subelem.tag.replace("{urn:schemas-microsoft-com:sql:SqlRowSet}","")
text = subelem.text
data[column] = [text]
listData.append(data)
Database.append((table,spark.createDataFrame(Row(**x) for x in listData)))
是否可以使用允许模式更新的函数加载这些行,或者我应该事先创建具有最大数量的列的数据帧结构?
非常感谢您的帮助。
答:
0赞
Slite
11/3/2023
#1
我通过修改开箱线解决了我的问题
Database.append((table,spark.createDataFrame(Row(**x) for x in listData)))
自
Database.append((table,spark.createDataFrame(listData)))
有效地使我在一个大字典中插入每个 dataFrame。
显然,即使缺少某些字段,psypark 也可以很好地处理从字典创建 DataFrames。
上一个:呈现 xml 接口
下一个:XML 数据类型的派生
评论