从 Spark Java 代码将文件写入 MinIO 时出现错误

Writing file to MinIO from Spark Java code gives an error

提问人:VGH 提问时间:11/11/2023 更新时间:11/11/2023 访问量:25

问:

我正在从 spark Java 代码连接到 MinIO

static void minIOReadWriteTester()
     {
         SparkSession spark = SparkSession.builder()
                    .appName("spark-with-nats")
                    .master("local")
                      .config("spark.jars",
                      "libs/nats-spark-connector-balanced_2.12-1.1.4.jar,"+"libs/jnats-2.17.1.jar"
                      )
                     .config("spark.sql.streaming.checkpointLocation","tmp/checkpoint")
                    .config("fs.s3a.access.key", "minioadmin")
                    .config("fs.s3a.secret.key", "minioadmin")
                    .config("fs.s3a.endpoint", "http://127.0.0.1:9000")
                    .config("fs.s3a.connection.ssl.enabled", "true")
                    .config("fs.s3a.path.style.access", "true")
                    .config("fs.s3a.attempts.maximum", "1")
                    .config("fs.s3a.connection.establish.timeout", "5000")
                    .config("fs.s3a.connection.timeout", "10000")
                    .config("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
                      .getOrCreate();
        
//       Dataset<Row> dftest = spark.read().load("s3a://testbucket/SampleFiles/output/parquet");
//       dftest.show();
        
         Dataset<Row> df = spark.readStream()
                    .format("nats")
                    .option("nats.host", "localhost")
                    .option("nats.port", 4222)
                    .option("nats.stream.name", "my_stream4")
                    .option("nats.stream.subjects", "mysub4")
                    .option("nats.durable.name", "my_consumer")
                    // wait 90 seconds for an ack before resending a message
                    .option("nats.msg.ack.wait.secs", 90)
                    .load();
            System.out.println("Successfully read nats stream !");
            df.createOrReplaceTempView("natsmessages");
            Dataset<Row> filteredDf = spark.sql("select * from natsmessages");
           
            StreamingQuery query;
            try {
                query = filteredDf
                          .writeStream()
                          .outputMode("append")
                          .format("parquet")
                          .option("truncate", false)
                          .option("path", "s3a://testbucket/SampleFiles/output/parquet")
                          .start();
                query.awaitTermination();
            } catch (Exception e) {
                e.printStackTrace();
            }
     }

这段代码昨天起作用了。在没有任何代码更改的情况下,它开始给出 java.nio.file.AccessDeniedException !

java.nio.file.AccessDeniedException: s3a://testbucket/SampleFiles/output/parquet/_spark_metadata: getFileStatus on s3a://testbucket/SampleFiles/output/parquet/_spark_metadata: com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service: Amazon S3; Status Code: 403; Error Code: 403 Forbidden; Request ID: 1796779F1EDE98D3; S3 Extended Request ID: dd9025bab4ad464b049177c95eb6ebf374d3b3fd1af9251148b658df7ac2e3e8; Proxy: null), S3 Extended Request ID: dd9025bab4ad464b049177c95eb6ebf374d3b3fd1af9251148b658df7ac2e3e8:403 Forbidden
at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:255)
at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:175)
at org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:3796)
at org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:3688)
at org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$getFileStatus$24(S3AFileSystem.java:3556)
at org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.lambda$trackDurationOfOperation$5(IOStatisticsBinding.java:499)
at org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration(IOStatisticsBinding.java:444)
at org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2337)
at org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2356)
at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:3554)
at org.apache.hadoop.fs.DelegateToFileSystem.getFileStatus(DelegateToFileSystem.java:128)
at org.apache.hadoop.fs.FileContext$15.next(FileContext.java:1247)
at org.apache.hadoop.fs.FileContext$15.next(FileContext.java:1243)
at org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90)
at org.apache.hadoop.fs.FileContext.getFileStatus(FileContext.java:1249)
at org.apache.hadoop.fs.FileContext$Util.exists(FileContext.java:1747)
at org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.exists(CheckpointFileManager.scala:341)
at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.<init>(HDFSMetadataLog.scala:63)
at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.<init>(CompactibleFileStreamLog.scala:48)
at org.apache.spark.sql.execution.streaming.FileStreamSinkLog.<init>(FileStreamSinkLog.scala:86)
at org.apache.spark.sql.execution.streaming.FileStreamSink.<init>(FileStreamSink.scala:139)
at org.apache.spark.sql.execution.datasources.DataSource.createSink(DataSource.scala:326)
at org.apache.spark.sql.streaming.DataStreamWriter.createV1Sink(DataStreamWriter.scala:436)
at org.apache.spark.sql.streaming.DataStreamWriter.startInternal(DataStreamWriter.scala:403)
at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:249)

有趣的是,如果我取消注释从 minio 代码读取文件,上面的代码不会给出任何错误并按预期工作!(从MinIO作品中读取和写入)

Dataset<Row> dftest = spark.read().load("s3a://testbucket/SampleFiles/output/parquet");

不知道昨天工作的工作代码是如何在没有任何代码更改的情况下开始出现此错误的!此外,如果我继续读取文件代码,然后如果我将文件写入 MinIO,它就可以工作!无法理解这种奇怪的行为。 我在 Windows 机器上本地运行我的 spark Java 代码,而 MinIO 在我的本地 Kubernetes ( minikube ) 上运行 不仅仅是使用火花流,即使这样它也无法正常工作,也尝试在 MinIO 上保存一个普通的 csv 文件。知道如何修复此错误吗?

Java Apache - Spark 迷你

评论

0赞 stevel 11/13/2023
检查时钟是否同步。商店通常会拒绝签名时间戳太旧的呼叫,以防御重放攻击。
0赞 VGH 11/16/2023
@stevel感谢您的回复。但是,我在 minikube 中尝试了不同的东西,并且不得不删除在我的本地系统上运行的 minio pod。使用新的 pod 写入 minio 工作正常。如果我再次遇到相同的错误,将检查时钟是否同步。

答: 暂无答案