当pyspark数据帧有嵌套列时,如何将NONES转换为空字符串?

How to convert NONEs to an empty string in a pyspark dataframe when it has nested columns?

提问人:Metadata 提问时间:11/10/2023 最后编辑:KashyapMetadata 更新时间:11/14/2023 访问量:128

问:

我有一个带有嵌套列的数据帧,如下所示:

df_schema = StructType([
    StructField("response", StringType(), True),
    StructField("id", StringType(), True),
    StructField("data", StructType([
      StructField("type", StringType(), True),
      StructField("record", StringType(), True),
      StructField("enteredCycle", StringType(), True),
      StructField("timestamp", StringType(), True),
      StructField("modifiedById", StringType(), True),
      StructField("years", IntegerType(), True),
      StructField("attributes", StructType([
        StructField("mass", ArrayType(DoubleType()), True),
        StructField("pace", ArrayType(IntegerType()), True),
        StructField("reflex", ArrayType(StringType()), True)
      ]))
    ]))
])

我得到这个数据帧是API调用的结果,如下所示。

def api_call(parameter: str):
    response = session.get(f"https:url={parameter}", headers=header_data)
    return json.dumps(json.loads(response.text))

udf_call = udf(lambda z:api_call(z),StringType())

我将此 UDF 调用作为额外列添加到我的一个数据帧中,如下所示:

df = inputDf.withColumn("api_response", udf_call(col("employee_id")))

# Creating an empty df
# I have multiple api calls. So I am appending all of them into 
# one single dataframe and then writing all of them at once
# rather than write one record at a time (I have 500,00 records)

empty_rdd = spark.sparkContext.emptyRDD()
empty_df = spark.createDataFrame(empty_rdd, df_schema)

应用架构:

json_df = (
    df
    .withColumn("main",from_json(col('api_response'), df_schema)
    .select('response', 'id', 'data.type', 'data.record',....'data.attributes.reflex')
)
empty_df = empty_df.unionAll(json_df)

当我尝试将数据帧作为表引入时,出现问题:

empty_df.write.mode('overwrite').format('parquet').saveAsTable('dbname.tablename')

我看到这个错误:

Job aborted due to stage failure: Task 24 in stage 161.0 failed 4 times, most recent failure: Lost task 24.3 in stage 161.0 (TID 1119) (100.66.2.91 executor 0): org.apache.spark.api.python.PythonException: 'TypeError: can only concatenate str (not "NoneType") to str', from year_loads.py, line 21. Full traceback below:
Traceback (most recent call last):
  File "<year_loads.py>", line 21, in <lambda>
  File "<year_loads.py>", line 44, in api_call
TypeError: can only concatenate str (not "NoneType") to str

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:642)
    at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:86)
    at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:68)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:595)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:511)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage8.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:757)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
    at org.apache.spark.ContextAwareIterator.hasNext(ContextAwareIterator.scala:39)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
    at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1209)
    at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1215)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
    at scala.collection.Iterator.foreach(Iterator.scala:941)
    at scala.collection.Iterator.foreach$(Iterator.scala:941)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
    at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:442)
    at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.writeIteratorToStream(PythonUDFRunner.scala:53)
    at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:521)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2241)
    at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:313)

我的假设是输出中有不能附加到 StringType() 的 NONE。因此,我实现了一个逻辑,将所有 NONE 转换为空字符串,如下所示。我从这里的另一个stackoverflow帖子中得到了这个实现,并根据我的要求进行了更改。

# Get only String Columns
def replace_none_with_empty_str(df: DataFrame):
  string_fields = []
  for i, f in enumerate(df.schema.fields):
      if isinstance(f.dataType, StringType):
          string_fields.append(f.name)
  exprs = [none_as_blank(x).alias(x) if x in string_fields else x for x in df.columns]
  df.select(*exprs)
  return df

# NULL/NONE to blank logic
def none_as_blank(x):
    return when(col(x) != None, col(x)).otherwise('')


non_nulls_df = replace_none_with_empty_str(empty_df)
non_nulls_df.write.mode('overwrite').format('parquet').saveAsTable('dbname.tablename')

但是即使在应用上述方法后,我仍然看到相同的错误。 我的假设和解决方法是否正确?我的逻辑是否正确应用于所有 String 列,尤其是嵌套的 stirng 列?如果没有,谁能告诉我我在这里犯了什么错误,我该如何纠正它? 非常感谢任何帮助。NULL/NONE to blank logic

python apache-spark pyspark apache-spark-sql

评论

2赞 werner 11/14/2023
去年你没有接受任何答案。奇怪。。。
2赞 Kashyap 11/14/2023
@werner正要回答这个问题,但我注意到了你的评论。我看到 OP 是一个多产的提问者,但他最后一次接受答案是在 2022 年 4 月 22 日。
1赞 Metadata 11/27/2023
@werner & kashyap,在我尝试了他们的建议后,我会在评论中回复人们。就像你检查我的问题一样,我几乎在我所有的旧帖子中都这样做了。在过去的一年里,对于他们中的大多数人来说,我都无法得到一个可以接受的答案,尽管如果我至少得到提示,即使不是全部答案,我也会投赞成票。但在最近,我同意你的观点。我的角色发生了变化,我花在设计上的时间更少,而花在代码上的时间更少,这让我忘记了回到我提出的问题。我会从我这边改变这部分,并更多地参与。谢谢你提出来。

答:

0赞 Koedlt 11/13/2023 #1

如果您查看错误堆栈跟踪,更具体地说,查看堆栈上的最后条目:

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:642)
    at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:86)

您会看到,在后处理之前,执行 UDF 时似乎发生了错误。根据我们在这里获得的信息,我猜您的功能是问题所在。更具体地说,您的返回值可能是 。尝试如下操作怎么样:api_callNone

def api_call(parameter: str):
    response = session.get(f"https:url={parameter}", headers=header_data)
    return json.dumps(json.loads(response.text)) or ""

会发现你返回一个,只返回一个空字符串。这意味着您将有一个完全空的记录,但无论如何都是这种情况,因为您的值是 NoneType。or ""NoneType