我正在尝试将数据帧写入 s3 中的单个文件,并在 pyspark 中使用所需的文件名。我能够在 scala 中执行此操作,但在 py 中给出以下错误

I am trying to write a dataframe to a single file in s3 with a desired file name in pyspark. I am able to do this in scala but gives below error in py

提问人:Ronnie 提问时间:11/16/2023 更新时间:11/16/2023 访问量:49

问:

我的 pyspark 代码尝试创建一个 DataFrame 并将 DataFrame 写入 s3 位置。完成此操作后,我将有一个名称为 part-*** 的文件,我正在尝试使用 hadoop 文件实用库重命名此文件,但总是出现以下错误。在pyspark中可以进行此操作吗? 注意:我不能在这里使用 boto3,因为我将在 EMR 上运行它。

我在pyspark中使用的代码如下:

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("test_rename").getOrCreate()
sc = spark.sparkContext

l = [['Column1', 'Column2', 'Column3'], ['Col1Value1', 'Col2Value1', 'Col3Value1'], ['Col1Value2', 'Col2Value2', 'Col3Value2']]
#Read the string data into a DataFrame
df = spark.createDataFrame(l[1:],l[0])

df.coalesce(1) \
    .write.format("com.databricks.spark.csv") \
    .option("header", "true") \
    .mode("overwrite") \
    .save("s3://<bucket>/<prefix>")

from py4j.java_gateway import java_import
java_import(spark._jvm, 'org.apache.hadoop.fs.Path')

fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration())
file = fs.globStatus(sc._jvm.Path('s3://<bucket>/<prefix>/part*'))[0].getPath().getName()
fs.rename(sc._jvm.Path('s3://<bucket>/<prefix>/' + file), sc._jvm.Path('mydata.csv'))
fs.delete(sc._jvm.Path('s3://<bucket>/<prefix>'), True)

错误信息:

File "/mnt/tmp/spark-471166fb-d7c7-4839-a308-2e3f5c01c185/test_rename.py", line 20, in <module>
    file = fs.globStatus(sc._jvm.Path('s3://<bucket>/<prefix>/part*'))[0].getPath().getName()
  File "/usr/lib/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1322, in __call__
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 196, in deco
pyspark.sql.utils.IllegalArgumentException: Wrong FS: s3://<bucket>/<prefix>, expected: hdfs://<emr-ip>:8020```
apache-spark amazon-s3 pyspark

评论

0赞 Lionia Vasilev 11/16/2023
这回答了你的问题吗?IllegalArgumentException,指定来自 s3 而不是 hdfs 的输入/输出时 FS 错误
0赞 Ronnie 11/16/2023
该链接仅引用 scala spark 代码。我在 scala 中这样做没有问题

答:

1赞 Lionia Vasilev 11/16/2023 #1

Hadoop 的文件系统需要一个 URI 来确定客户端实现。您可以在对象初始化期间使用 constructor 参数或配置参数提供它。FileSystemurifs.defaultFS

fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jvm.java.net.URI("s3://bucket/"), spark._jsc.hadoopConfiguration())
fs.listStatus(sc._jvm.org.apache.hadoop.fs.Path("s3://bucket/prefix/"))

// Or
spark._jsc.hadoopConfiguration().set("fs.defaultFS", "s3://bucket/")
fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration())

Python 和 Scala 都是一样的;你真的在使用 Py4J 访问相同的 Java 对象。如果相同的代码在 Scala 中有效,那么您在 Hadoop 配置中可能具有不同的值。fs.defaultFS

请参阅 的源代码说明FileSystemfs.defaultFS

默认文件系统的名称。一个 URI,其方案和权限决定了 FileSystem 实现。uri 的方案确定 config 属性 (fs.SCHEME.impl) 命名 FileSystem 实现类。uri 的权限用于确定文件系统的主机、端口等。

评论

0赞 Ronnie 11/16/2023
这奏效了。多谢。一整天都在努力让它工作
0赞 Lionia Vasilev 11/16/2023
别客气。请随时将其标记为可接受的答案。