提问人:Rchee 提问时间:11/10/2023 更新时间:11/10/2023 访问量:58
尝试将表从容器传递到 pyspark 变量中,并在 select 语句中使用其列
Trying to pass a table from a container into a pyspark variable and use it's columns in a select statement
问:
我在 ADLS 容器中有多个增量表,对于每个表,我想生成一个脚本,将它们转换为 parquet 文件并显式列出每个表中的列。这是我目前所拥有的,但它并没有创建不同的 select 语句,只是将所有表中的所有列转储到一个 select 语句中:
df = spark.sql("SHOW TABLES IN gold")
df_rows = df.collect()
table_names = [row["tableName"] for row in df_rows]
for table_name in table_names:
df = spark.sql("SHOW COLUMNS IN gold." + table_name)
df_rows = df.collect()
column_names = [row["col_name"] for row in df_rows]
select_statement ="spark.sql('SELECT " + ",".join(column_names) + "FROM gold." +tablename + "').write.format('parquet').mode('overwrite').save('/mnt/gold/toParquet/" + tablename +"'"
如何获得如下所示的输出:
spark.sql('SELECT column1,column2,etc FROM gold.table1').write.format('parquet').mode('overwrite').save('/mnt/gold/toParquet/Table1')
spark.sql('SELECT column1,column2,etc FROM gold.table2').write.format('parquet').mode('overwrite').save('/mnt/gold/toParquet/Table2')
spark.sql('SELECT column1,column2,etc FROM gold.table3').write.format('parquet').mode('overwrite').save('/mnt/gold/toParquet/Table3')
答:
1赞
eminemesis
11/10/2023
#1
每次迭代都会覆盖变量。您需要使用列表并附加到它。select_statement
df = spark.sql("SHOW TABLES IN gold")
df_rows = df.collect()
table_names = [row["tableName"] for row in df_rows]
select_statements = []
for table_name in table_names:
df = spark.sql("SHOW COLUMNS IN gold." + table_name)
df_rows = df.collect()
column_names = [row["col_name"] for row in df_rows]
select_statements.append("spark.sql('SELECT " + ", ".join(column_names) + " FROM gold." + table_name + "').write.format('parquet').mode('overwrite').save('/mnt/gold/toParquet/" + table_name +"')")
for statement in select_statements:
print(statement+"\n")
您的值中也存在一些语法错误。此代码应该有效。select_statement
评论
0赞
Rchee
11/11/2023
是的,它有效,谢谢!也看到你现在提到的语法错误,真的很感激。
评论