尝试将表从容器传递到 pyspark 变量中,并在 select 语句中使用其列

Trying to pass a table from a container into a pyspark variable and use it's columns in a select statement

提问人:Rchee 提问时间:11/10/2023 更新时间:11/10/2023 访问量:58

问:

我在 ADLS 容器中有多个增量表,对于每个表,我想生成一个脚本,将它们转换为 parquet 文件并显式列出每个表中的列。这是我目前所拥有的,但它并没有创建不同的 select 语句,只是将所有表中的所有列转储到一个 select 语句中:

df = spark.sql("SHOW TABLES IN gold")
df_rows = df.collect()
table_names = [row["tableName"] for row in df_rows]
 
for table_name in table_names:
    df = spark.sql("SHOW COLUMNS IN gold." + table_name)
    df_rows = df.collect()
    column_names = [row["col_name"] for row in df_rows]
    select_statement ="spark.sql('SELECT " + ",".join(column_names) + "FROM gold." +tablename + "').write.format('parquet').mode('overwrite').save('/mnt/gold/toParquet/" + tablename +"'" 

如何获得如下所示的输出:

spark.sql('SELECT column1,column2,etc FROM gold.table1').write.format('parquet').mode('overwrite').save('/mnt/gold/toParquet/Table1')

spark.sql('SELECT column1,column2,etc FROM gold.table2').write.format('parquet').mode('overwrite').save('/mnt/gold/toParquet/Table2')

spark.sql('SELECT column1,column2,etc FROM gold.table3').write.format('parquet').mode('overwrite').save('/mnt/gold/toParquet/Table3')

循环 apache-spark pyspark azure-databricks

评论

0赞 DileeprajnarayanThumula 11/10/2023
df = spark.sql(“SHOW TABLES IN gold”) df_rows = df.collect() table_names = [row[“tableName”] for row in df_rows] for row in ] for table_name in table_names: df = spark.sql(“SHOW COLUMNS IN gold.” + table_name) df_rows = df.collect() column_names = [row[“col_name”] for row in df_rows] select_statement = “spark.sql('SELECT ” + “,”.join(column_names) + “ FROM gold.” + table_name + “').write.format('parquet').mode('overwrite').save('/mnt/gold/toParquet/” + table_name + “')” print(select_statement) 你可以试试吗这
0赞 Rchee 11/10/2023
这将所有列的相同结果生成到单个 select 语句中
0赞 JayashankarGS 11/10/2023
提供输出。

答:

1赞 eminemesis 11/10/2023 #1

每次迭代都会覆盖变量。您需要使用列表并附加到它。select_statement

df = spark.sql("SHOW TABLES IN gold")
df_rows = df.collect()
table_names = [row["tableName"] for row in df_rows]

select_statements = []
for table_name in table_names:
    df = spark.sql("SHOW COLUMNS IN gold." + table_name)
    df_rows = df.collect()
    column_names = [row["col_name"] for row in df_rows]
    select_statements.append("spark.sql('SELECT " + ", ".join(column_names) + " FROM gold." + table_name + "').write.format('parquet').mode('overwrite').save('/mnt/gold/toParquet/" + table_name +"')")

for statement in select_statements:
    print(statement+"\n")

您的值中也存在一些语法错误。此代码应该有效。select_statement

评论

0赞 Rchee 11/11/2023
是的,它有效,谢谢!也看到你现在提到的语法错误,真的很感激。