具有不一致列的 pyspark 数据帧的字典

Dictionary to pyspark dataframe with inconsistent columns

提问人:Slite 提问时间:11/2/2023 更新时间:11/3/2023 访问量:34

问:

我目前正在 pyspark 中构建一个转换脚本,这是工作流程:

  • 获取原始 xml 文件
  • 将 XSLT 应用于原始文件,这将生成另一个包含不同表的数据的 XML
  • 将转换后的 xml 加载到多个数据帧
  • 在湖中写入数据帧。

目前,我正在为每一行构建字典,并尝试将它们加载到一行中,然后加载到数据帧中。我遇到了麻烦,因为每个 null 列都没有出现在 xml 行中。因此,有些行将有 20 列,有些有 35 列。

这导致

IllegalStateException:输入行没有架构所需的预期值数。35 个字段是必需的,而提供了 24 个值。

以下是我目前如何解析我的 XML 以口述到 Dataframe 行:

xmlTransformedRoot = xml_transformed.getroot() #XML after xslt being applied
list_of_tables = {child.tag for child in xmlTransformedRoot} #Set to get unique table names
Database = []
for table in list_of_tables:
    listData = []
    for child in xmlTransformedRoot.findall(table):
        data = dict()
        for subelem in child:
            column = subelem.tag.replace("{urn:schemas-microsoft-com:sql:SqlRowSet}","")
            text = subelem.text
            data[column] = [text]
        listData.append(data)
    Database.append((table,spark.createDataFrame(Row(**x) for x in listData)))

是否可以使用允许模式更新的函数加载这些行,或者我应该事先创建具有最大数量的列的数据帧结构?

非常感谢您的帮助。

python xml 数据帧 pyspark

评论

0赞 jdweng 11/2/2023
您的数据无效。创建架构的人员需要某些数据片段,而您的 xml 缺少重要数据。创建 XML 数据时,应由创建 xml 的人员对其进行验证,以便使用它的人不会收到像您得到的错误。请与创建 xml 的人员联系并修复它。

答:

0赞 Slite 11/3/2023 #1

我通过修改开箱线解决了我的问题

Database.append((table,spark.createDataFrame(Row(**x) for x in listData)))

Database.append((table,spark.createDataFrame(listData)))

有效地使我在一个大字典中插入每个 dataFrame。

显然,即使缺少某些字段,psypark 也可以很好地处理从字典创建 DataFrames。