提问人:Caelia 提问时间:7/11/2023 最后编辑:Caelia 更新时间:7/11/2023 访问量:71
pyspark 尝试写入 csv 文件时出现 EOF 错误
EOF error when pyspark trying to write csv file
问:
我使用 pyspark 合并了 4 个表,合并过程完成。然后我尝试将数据帧转换为 csv 文件。但是,我收到 EOF 错误。
我尝试合并较小的表,但没有发生此错误。不确定它是否与记忆或其他有关。
任何想法都非常感谢!
火花设置:
# part of python file
cmd = f"""/spark/bin/spark-submit --master yarn --deploy-mode cluster \
--conf spark.yarn.submit.waitAppCompletion=false \
--conf spark.network.timeout=1200000s \
--conf spark.executor.heartbeatInterval=1000000s \
--conf spark.shuffle.io.retryWait=60s \
--conf spark.shuffle.io.maxRetries=10 \
--conf spark.driver.memory=4g \
--conf spark.driver.memoryOverhead=0 \
--conf spark.executor.instances=3 \
--conf spark.executor.memory=3g \
--conf spark.executor.memoryOverhead=0 \
--conf spark.sql.shuffle.partitions=200 \
--conf spark.yarn.maxAppAttempts=1 \
--conf spark.driver.maxResultSize=0 \
--conf spark.yarn.submit.file.replication=1 \
--conf spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=2 \
--conf spark.sql.adaptive.enabled=true \
--conf spark.sql.adaptive.skewJoin.enabled=true \
--conf spark.eventLog.enabled=true \
--conf spark.eventLog.dir=hdfs:///spark/eventLog \
--conf spark.ui.enabled=false \
--conf spark.sql.pivotMaxValues=10000 \
/run/spark_join.py true {task_id}.json"""
错误日志:
2023-07-11 03:31:50,170 - [ERROR]
Converts spark dataframe to CSV file error:Traceback (most recent call last):
File "/tmp/hadoop-root/nm-local-dir/usercache/root/appcache/application_1689038859224_0002/container_1689038859224_0002_01_000001/spark_join.py", line 986, in <module>
df_joined.write.csv(tmp_target)
File "/tmp/hadoop-root/nm-local-dir/usercache/root/appcache/application_1689038859224_0002/container_1689038859224_0002_01_000001/pyspark.zip/pyspark/sql/readwriter.py", line 1240, in csv
self._jwrite.csv(path)
File "/tmp/hadoop-root/nm-local-dir/usercache/root/appcache/application_1689038859224_0002/container_1689038859224_0002_01_000001/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1321, in __call__
return_value = get_return_value(
File "/tmp/hadoop-root/nm-local-dir/usercache/root/appcache/application_1689038859224_0002/container_1689038859224_0002_01_000001/pyspark.zip/pyspark/sql/utils.py", line 190, in deco
return f(*a, **kw)
File "/tmp/hadoop-root/nm-local-dir/usercache/root/appcache/application_1689038859224_0002/container_1689038859224_0002_01_000001/py4j-0.10.9.5-src.zip/py4j/protocol.py", line 326, in get_return_value
raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o117.csv.
: org.apache.spark.SparkException: Job aborted.
at org.apache.spark.sql.errors.QueryExecutionErrors$.jobAbortedError(QueryExecutionErrors.scala:651)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:278)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:186)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:116)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:860)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:390)
at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:363)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
at org.apache.spark.sql.DataFrameWriter.csv(DataFrameWriter.scala:851)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 16 in stage 6.0 failed 4 times, most recent failure: Lost task 16.3 in stage 6.0 (TID 41) (dcfs-worker1 executor 2): java.io.EOFException: Cannot seek after EOF
at org.apache.hadoop.hdfs.DFSInputStream.seek(DFSInputStream.java:1598)
at org.apache.hadoop.fs.FSDataInputStream.seek(FSDataInputStream.java:71)
at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.initialize(LineRecordReader.java:121)
at org.apache.spark.sql.execution.datasources.HadoopFileLinesReader.<init>(HadoopFileLinesReader.scala:65)
at org.apache.spark.sql.execution.datasources.csv.TextInputCSVDataSource$.readFile(CSVDataSource.scala:97)
at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.$anonfun$buildReader$2(CSVFileFormat.scala:137)
at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:149)
at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:134)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:209)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:270)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:116)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:136)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2860)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: java.io.EOFException: Cannot seek after EOF
at org.apache.hadoop.hdfs.DFSInputStream.seek(DFSInputStream.java:1598)
at org.apache.hadoop.fs.FSDataInputStream.seek(FSDataInputStream.java:71)
at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.initialize(LineRecordReader.java:121)
at org.apache.spark.sql.execution.datasources.HadoopFileLinesReader.<init>(HadoopFileLinesReader.scala:65)
at org.apache.spark.sql.execution.datasources.csv.TextInputCSVDataSource$.readFile(CSVDataSource.scala:97)
at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.$anonfun$buildReader$2(CSVFileFormat.scala:137)
at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:149)
at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:134)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:209)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:270)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:116)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:136)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
[/tmp/hadoop-root/nm-local-dir/usercache/root/appcache/application_1689038859224_0002/container_1689038859224_0002_01_000001/spark_join.py <module> 1027]
spark_join.py的代码如下:
def pull_and_join(task_id, task_dict):
for merge_idx, merge_info in enumerate(task_dict['mergeCondition']):
set_merge_data(merge_idx, 0, merge_info[0])
set_merge_data(merge_idx, 1, merge_info[1])
col_l, col_r = globals()['df0'].columns, globals()['df1'].columns
cond = create_condition(col_l, col_r)
df_joined = globals()['df0'].join(globals()['df1'], cond, "left")
for right_col_name in merge_info[1]['merge_col']:
df_joined = df_joined.drop(f"{right_col_name}")
globals()['df0'] = df_joined
return df_joined
try:
os.makedirs(tmp_dir, exist_ok=True) # Create a join folder in HDFS for tmp file
cmd = '/opt/hadoop-3.3.4/bin/hadoop fs -mkdir /join'
process = subprocess.Popen(cmd, shell=True, stderr=subprocess.PIPE, stdout=subprocess.PIPE)
process.wait()
df_joined = pull_and_join(task_id, task_dict) # call join function
except Exception as e:
logging.error("join error:" + traceback.format_exc())
exit(1)
try:
tmp_target = f"/join/joined_{task_id}_{ts}"
df_joined.write.csv(tmp_target) # write csv & where error occurs
except Exception as e:
logging.error("Converts spark dataframe to CSV file error:" + traceback.format_exc())
exit(1)
答: 暂无答案
评论