带有 where 子句的 pyspark sql 抛出列不存在错误

pyspark sql with where clause throws column does not exist error

提问人:Shekar Tippur 提问时间:8/7/2018 最后编辑:ChrisFShekar Tippur 更新时间:8/7/2018 访问量:3082

问:

我正在使用 pyspark 将 csv 加载到 redshift。我想查询一下 manny 行是如何添加的。 我使用以下函数创建一个新列:withcolumn

csvdata=df.withColumn("file_uploaded", lit("test"))

我看到这个列被创建,我可以使用 psql 进行查询。但是,当我尝试使用 pyspark sql 上下文进行查询时,出现错误:

py4j.protocol.Py4JJavaError: An error occurred while calling o77.showString.
: java.sql.SQLException: [Amazon](500310) Invalid operation: column "test" does not exist in billingreports;

有趣的是,我可以查询其他列,而不仅仅是我添加的新列。

感谢有关如何解决此问题的任何指示。

完整代码:

df=spark.read.option("header","true").csv('/mnt/spark/redshift/umcompress/' + 
filename)
csvdata=df.withColumn("fileuploaded", lit("test"))

countorig=csvdata.count()

## This executes without error
csvdata.write \
    .format("com.databricks.spark.redshift") \
    .option("url", jdbc_url) \
    .option("dbtable", dbname) \
    .option("tempformat", "CSV") \
    .option("tempdir", "s3://" + s3_bucket + "/temp") \
    .mode("append") \
    .option("aws_iam_role", iam_role).save()

select="select count(*) from " + dbname + " where fileuploaded='test'"

## Error occurs
df = spark.read \
.format("com.databricks.spark.redshift") \
.option("url", jdbc_url) \
.option("query", select) \
.option("tempdir", "s3://" + s3_bucket + "/test") \
.option("aws_iam_role", iam_role) \
.load()
newcounnt=df.count()

感谢您的回复。 Dataframe 确实有名为 file_uploaded 的新列 查询如下: select=“select count(*) from billingreports,其中 file_uploaded='test'”

我已经打印了架构

|-- file_uploaded: 字符串 (nullable = true)

df.show() 显示添加了新列。 我只想将一个预先确定的字符串作为值添加到此列。

apache-Spark 数据帧 pyspark databricks

评论

0赞 Shaido 8/7/2018
这里一定没有显示一些其他转换。例外情况发生在将列添加到 csvdata 时名为 billingreports 的数据帧上。
0赞 Shekar Tippur 8/7/2018
我有一个名为 csvdata 的数据帧,我用它来写入 redshift。您拥有该 DataFrame 的计数。我想用写入 redshift 的内容进行验证。我尝试从 sql 查询 select=“select count(*) from billingreports where file_uploaded='test'” 中构造的数据帧。这是显示错误的地方。
1赞 Shaido 8/7/2018
请将所有代码添加到问题中。如何创建结算报告?给出错误的命令是什么?

答:

-2赞 Ravi Mishra 8/7/2018 #1

您的 Dataframe csvdata 将有一个名为 file_uploaded 的新列,在 df 的所有行中都具有默认值“test”。此错误表明它正在尝试访问名为 test 的列,该列在 dataframe billingreports 中不存在,因此出现错误。在使用 billingreports.dtypes 查询列之前打印架构,或者更好地尝试使用 billingreports.show() 获取 DataFrame 的样本,并查看该列的名称和值是否正确。

如果共享导致此异常的查询会更好,因为 DataFrame Billingreports 会引发异常。

评论

0赞 Shekar Tippur 8/7/2018
感谢您的回复。Dataframe 确实有名为 file_uploaded 的新列 这是查询: select=“select count(*) from billingreports where file_uploaded='test'” 我已经打印了架构 |-- file_uploaded: string (nullable = true) df.show() 显示添加了新列。我只想将一个预先确定的字符串作为值添加到此列。
0赞 Ravi Mishra 8/7/2018
withColumn 要求第二个参数为 Column。您分配默认列的方式对我来说看起来不错,一个包裹在 lit 函数中的字符串。你能尝试使用Spark SQL看看它是否有任何区别吗?像这样的东西:df.createOrReplaceTempView(“df”);csvdata=spark.sql(“SELECT a.*, '{0}' AS 文件上传自 df a”).format(“test”))