提问人:VGH 提问时间:11/11/2023 更新时间:11/11/2023 访问量:25
从 Spark Java 代码将文件写入 MinIO 时出现错误
Writing file to MinIO from Spark Java code gives an error
问:
我正在从 spark Java 代码连接到 MinIO
static void minIOReadWriteTester()
{
SparkSession spark = SparkSession.builder()
.appName("spark-with-nats")
.master("local")
.config("spark.jars",
"libs/nats-spark-connector-balanced_2.12-1.1.4.jar,"+"libs/jnats-2.17.1.jar"
)
.config("spark.sql.streaming.checkpointLocation","tmp/checkpoint")
.config("fs.s3a.access.key", "minioadmin")
.config("fs.s3a.secret.key", "minioadmin")
.config("fs.s3a.endpoint", "http://127.0.0.1:9000")
.config("fs.s3a.connection.ssl.enabled", "true")
.config("fs.s3a.path.style.access", "true")
.config("fs.s3a.attempts.maximum", "1")
.config("fs.s3a.connection.establish.timeout", "5000")
.config("fs.s3a.connection.timeout", "10000")
.config("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
.getOrCreate();
// Dataset<Row> dftest = spark.read().load("s3a://testbucket/SampleFiles/output/parquet");
// dftest.show();
Dataset<Row> df = spark.readStream()
.format("nats")
.option("nats.host", "localhost")
.option("nats.port", 4222)
.option("nats.stream.name", "my_stream4")
.option("nats.stream.subjects", "mysub4")
.option("nats.durable.name", "my_consumer")
// wait 90 seconds for an ack before resending a message
.option("nats.msg.ack.wait.secs", 90)
.load();
System.out.println("Successfully read nats stream !");
df.createOrReplaceTempView("natsmessages");
Dataset<Row> filteredDf = spark.sql("select * from natsmessages");
StreamingQuery query;
try {
query = filteredDf
.writeStream()
.outputMode("append")
.format("parquet")
.option("truncate", false)
.option("path", "s3a://testbucket/SampleFiles/output/parquet")
.start();
query.awaitTermination();
} catch (Exception e) {
e.printStackTrace();
}
}
这段代码昨天起作用了。在没有任何代码更改的情况下,它开始给出 java.nio.file.AccessDeniedException !
java.nio.file.AccessDeniedException: s3a://testbucket/SampleFiles/output/parquet/_spark_metadata: getFileStatus on s3a://testbucket/SampleFiles/output/parquet/_spark_metadata: com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service: Amazon S3; Status Code: 403; Error Code: 403 Forbidden; Request ID: 1796779F1EDE98D3; S3 Extended Request ID: dd9025bab4ad464b049177c95eb6ebf374d3b3fd1af9251148b658df7ac2e3e8; Proxy: null), S3 Extended Request ID: dd9025bab4ad464b049177c95eb6ebf374d3b3fd1af9251148b658df7ac2e3e8:403 Forbidden
at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:255)
at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:175)
at org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:3796)
at org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:3688)
at org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$getFileStatus$24(S3AFileSystem.java:3556)
at org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.lambda$trackDurationOfOperation$5(IOStatisticsBinding.java:499)
at org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration(IOStatisticsBinding.java:444)
at org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2337)
at org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2356)
at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:3554)
at org.apache.hadoop.fs.DelegateToFileSystem.getFileStatus(DelegateToFileSystem.java:128)
at org.apache.hadoop.fs.FileContext$15.next(FileContext.java:1247)
at org.apache.hadoop.fs.FileContext$15.next(FileContext.java:1243)
at org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90)
at org.apache.hadoop.fs.FileContext.getFileStatus(FileContext.java:1249)
at org.apache.hadoop.fs.FileContext$Util.exists(FileContext.java:1747)
at org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.exists(CheckpointFileManager.scala:341)
at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.<init>(HDFSMetadataLog.scala:63)
at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.<init>(CompactibleFileStreamLog.scala:48)
at org.apache.spark.sql.execution.streaming.FileStreamSinkLog.<init>(FileStreamSinkLog.scala:86)
at org.apache.spark.sql.execution.streaming.FileStreamSink.<init>(FileStreamSink.scala:139)
at org.apache.spark.sql.execution.datasources.DataSource.createSink(DataSource.scala:326)
at org.apache.spark.sql.streaming.DataStreamWriter.createV1Sink(DataStreamWriter.scala:436)
at org.apache.spark.sql.streaming.DataStreamWriter.startInternal(DataStreamWriter.scala:403)
at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:249)
有趣的是,如果我取消注释从 minio 代码读取文件,上面的代码不会给出任何错误并按预期工作!(从MinIO作品中读取和写入)
Dataset<Row> dftest = spark.read().load("s3a://testbucket/SampleFiles/output/parquet");
不知道昨天工作的工作代码是如何在没有任何代码更改的情况下开始出现此错误的!此外,如果我继续读取文件代码,然后如果我将文件写入 MinIO,它就可以工作!无法理解这种奇怪的行为。 我在 Windows 机器上本地运行我的 spark Java 代码,而 MinIO 在我的本地 Kubernetes ( minikube ) 上运行 不仅仅是使用火花流,即使这样它也无法正常工作,也尝试在 MinIO 上保存一个普通的 csv 文件。知道如何修复此错误吗?
答: 暂无答案
评论