提问人:Shekar Tippur 提问时间:8/7/2018 最后编辑:ChrisFShekar Tippur 更新时间:8/7/2018 访问量:3082
带有 where 子句的 pyspark sql 抛出列不存在错误
pyspark sql with where clause throws column does not exist error
问:
我正在使用 pyspark 将 csv 加载到 redshift。我想查询一下 manny 行是如何添加的。
我使用以下函数创建一个新列:withcolumn
csvdata=df.withColumn("file_uploaded", lit("test"))
我看到这个列被创建,我可以使用 psql 进行查询。但是,当我尝试使用 pyspark sql 上下文进行查询时,出现错误:
py4j.protocol.Py4JJavaError: An error occurred while calling o77.showString.
: java.sql.SQLException: [Amazon](500310) Invalid operation: column "test" does not exist in billingreports;
有趣的是,我可以查询其他列,而不仅仅是我添加的新列。
感谢有关如何解决此问题的任何指示。
完整代码:
df=spark.read.option("header","true").csv('/mnt/spark/redshift/umcompress/' +
filename)
csvdata=df.withColumn("fileuploaded", lit("test"))
countorig=csvdata.count()
## This executes without error
csvdata.write \
.format("com.databricks.spark.redshift") \
.option("url", jdbc_url) \
.option("dbtable", dbname) \
.option("tempformat", "CSV") \
.option("tempdir", "s3://" + s3_bucket + "/temp") \
.mode("append") \
.option("aws_iam_role", iam_role).save()
select="select count(*) from " + dbname + " where fileuploaded='test'"
## Error occurs
df = spark.read \
.format("com.databricks.spark.redshift") \
.option("url", jdbc_url) \
.option("query", select) \
.option("tempdir", "s3://" + s3_bucket + "/test") \
.option("aws_iam_role", iam_role) \
.load()
newcounnt=df.count()
感谢您的回复。 Dataframe 确实有名为 file_uploaded 的新列 查询如下: select=“select count(*) from billingreports,其中 file_uploaded='test'”
我已经打印了架构
|-- file_uploaded: 字符串 (nullable = true)
df.show() 显示添加了新列。 我只想将一个预先确定的字符串作为值添加到此列。
答:
-2赞
Ravi Mishra
8/7/2018
#1
您的 Dataframe csvdata 将有一个名为 file_uploaded 的新列,在 df 的所有行中都具有默认值“test”。此错误表明它正在尝试访问名为 test 的列,该列在 dataframe billingreports 中不存在,因此出现错误。在使用 billingreports.dtypes 查询列之前打印架构,或者更好地尝试使用 billingreports.show() 获取 DataFrame 的样本,并查看该列的名称和值是否正确。
如果共享导致此异常的查询会更好,因为 DataFrame Billingreports 会引发异常。
评论
0赞
Shekar Tippur
8/7/2018
感谢您的回复。Dataframe 确实有名为 file_uploaded 的新列 这是查询: select=“select count(*) from billingreports where file_uploaded='test'” 我已经打印了架构 |-- file_uploaded: string (nullable = true) df.show() 显示添加了新列。我只想将一个预先确定的字符串作为值添加到此列。
0赞
Ravi Mishra
8/7/2018
withColumn 要求第二个参数为 Column。您分配默认列的方式对我来说看起来不错,一个包裹在 lit 函数中的字符串。你能尝试使用Spark SQL看看它是否有任何区别吗?像这样的东西:df.createOrReplaceTempView(“df”);csvdata=spark.sql(“SELECT a.*, '{0}' AS 文件上传自 df a”).format(“test”))
评论