提问人:Ronnie 提问时间:11/16/2023 更新时间:11/16/2023 访问量:49
我正在尝试将数据帧写入 s3 中的单个文件,并在 pyspark 中使用所需的文件名。我能够在 scala 中执行此操作,但在 py 中给出以下错误
I am trying to write a dataframe to a single file in s3 with a desired file name in pyspark. I am able to do this in scala but gives below error in py
问:
我的 pyspark 代码尝试创建一个 DataFrame 并将 DataFrame 写入 s3 位置。完成此操作后,我将有一个名称为 part-*** 的文件,我正在尝试使用 hadoop 文件实用库重命名此文件,但总是出现以下错误。在pyspark中可以进行此操作吗? 注意:我不能在这里使用 boto3,因为我将在 EMR 上运行它。
我在pyspark中使用的代码如下:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("test_rename").getOrCreate()
sc = spark.sparkContext
l = [['Column1', 'Column2', 'Column3'], ['Col1Value1', 'Col2Value1', 'Col3Value1'], ['Col1Value2', 'Col2Value2', 'Col3Value2']]
#Read the string data into a DataFrame
df = spark.createDataFrame(l[1:],l[0])
df.coalesce(1) \
.write.format("com.databricks.spark.csv") \
.option("header", "true") \
.mode("overwrite") \
.save("s3://<bucket>/<prefix>")
from py4j.java_gateway import java_import
java_import(spark._jvm, 'org.apache.hadoop.fs.Path')
fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration())
file = fs.globStatus(sc._jvm.Path('s3://<bucket>/<prefix>/part*'))[0].getPath().getName()
fs.rename(sc._jvm.Path('s3://<bucket>/<prefix>/' + file), sc._jvm.Path('mydata.csv'))
fs.delete(sc._jvm.Path('s3://<bucket>/<prefix>'), True)
错误信息:
File "/mnt/tmp/spark-471166fb-d7c7-4839-a308-2e3f5c01c185/test_rename.py", line 20, in <module>
file = fs.globStatus(sc._jvm.Path('s3://<bucket>/<prefix>/part*'))[0].getPath().getName()
File "/usr/lib/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1322, in __call__
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 196, in deco
pyspark.sql.utils.IllegalArgumentException: Wrong FS: s3://<bucket>/<prefix>, expected: hdfs://<emr-ip>:8020```
答:
1赞
Lionia Vasilev
11/16/2023
#1
Hadoop 的文件系统需要一个 URI 来确定客户端实现。您可以在对象初始化期间使用 constructor 参数或配置参数提供它。FileSystem
uri
fs.defaultFS
fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jvm.java.net.URI("s3://bucket/"), spark._jsc.hadoopConfiguration())
fs.listStatus(sc._jvm.org.apache.hadoop.fs.Path("s3://bucket/prefix/"))
// Or
spark._jsc.hadoopConfiguration().set("fs.defaultFS", "s3://bucket/")
fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration())
Python 和 Scala 都是一样的;你真的在使用 Py4J 访问相同的 Java 对象。如果相同的代码在 Scala 中有效,那么您在 Hadoop 配置中可能具有不同的值。fs.defaultFS
请参阅 的源代码和说明。FileSystem
fs.defaultFS
默认文件系统的名称。一个 URI,其方案和权限决定了 FileSystem 实现。uri 的方案确定 config 属性 (fs.SCHEME.impl) 命名 FileSystem 实现类。uri 的权限用于确定文件系统的主机、端口等。
评论
0赞
Ronnie
11/16/2023
这奏效了。多谢。一整天都在努力让它工作
0赞
Lionia Vasilev
11/16/2023
别客气。请随时将其标记为可接受的答案。
评论