提问人:PracticingPython 提问时间:10/2/2021 最后编辑:PracticingPython 更新时间:10/8/2021 访问量:546
pyspark 数据帧中的文件存在问题
Issues with files in pyspark data frame
问:
我一直在研究pyspark工具上的一个工具,该工具可以根据搜索进行过滤,然后对这些结果进行排序。数据帧是 1,400 多个 csv 的汇编。当我尝试运行代码时,我收到一条很长的错误消息。它似乎分解为意外 EOF 的 java 错误:
Py4JJavaError: An error occurred while calling o1331.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 202 in stage 56.0 failed 4 times, most recent failure: Lost task 202.3 in stage 56.0 (TID 7632, emr-master-f35-eels.sss.local, executor 31): com.univocity.parsers.common.TextParsingException: java.lang.IllegalStateException - Error reading from input
Parser Configuration: CsvParserSettings:
Auto configuration enabled=true
Autodetect column delimiter=false
Autodetect quotes=false
Column reordering enabled=true
Delimiters for detection=null
Empty value=
Escape unquoted values=false
Header extraction enabled=null
Headers=null
Ignore leading whitespaces=false
Ignore leading whitespaces in quotes=false
Ignore trailing whitespaces=false
Ignore trailing whitespaces in quotes=false
Input buffer size=128
Input reading on separate thread=false
Keep escape sequences=false
Keep quotes=false
Length of content displayed on error=-1
Line separator detection enabled=false
Maximum number of characters per column=-1
Maximum number of columns=20480
Normalize escaped line separators=true
Null value=
Number of records to read=all
Processor=none
Restricting data in exceptions=false
RowProcessor error handler=null
Selected fields=field selection: [2]
Skip bits as whitespace=true
Skip empty lines=true
Unescaped quote handling=STOP_AT_DELIMITERFormat configuration:
CsvFormat:
Comment character=\0
Field delimiter=,
Line separator (normalized)=\n
Line separator sequence=\n
Quote character="
Quote escape character="
Quote escape escape character=null
Internal state when error was thrown: line=737459, column=3, record=235, charIndex=457399297, headers=[attachment_md5_checksum, attachment_filename, attachment_text, attachment_urlsafe_base64_bytes, notice_id, title, solicitation_number, department_ind_agency, cgac, sub_tier, fpds_code, office, aac_code, posted_date, type, base_type, archive_type, archive_date, set_aside_code, set_aside, response_deadline, naice_code, classification_code, pop_street_address, pop_city, pop_state, pop_zip, pop_country, active, award_number, award_date, award_dollars, awardee, primary_contact_title, primary_contact_full_name, primary_contact_email, primary_contact_phone, primary_contact_fax, secondary_contact_title, secondary_contact_full_name, secondary_contact_email, secondary_contact_phone, secondary_contact_fax, organization_type, state, city, zip_code, country_code, additional_info_link, link, description]
at com.univocity.parsers.common.AbstractParser.handleException(AbstractParser.java:369)
at com.univocity.parsers.common.AbstractParser.parseNext(AbstractParser.java:595)
at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anon$1.next(UnivocityParser.scala:330)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
at scala.collection.TraversableOnce$FlattenOps$$anon$1.hasNext(TraversableOnce.scala:464)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:130)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.sort_addToSorter_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:585)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.hasNext(SerDeUtil.scala:153)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:148)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224)
at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:561)
at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:346)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1990)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:195)
Caused by: java.lang.IllegalStateException: Error reading from input
at com.univocity.parsers.common.input.DefaultCharInputReader.reloadBuffer(DefaultCharInputReader.java:80)
at com.univocity.parsers.common.input.AbstractCharInputReader.updateBuffer(AbstractCharInputReader.java:192)
at com.univocity.parsers.common.input.AbstractCharInputReader.nextChar(AbstractCharInputReader.java:269)
at com.univocity.parsers.common.input.NoopCharAppender.appendUntil(NoopCharAppender.java:170)
at com.univocity.parsers.csv.CsvParser.parseRecord(CsvParser.java:186)
at com.univocity.parsers.common.AbstractParser.parseNext(AbstractParser.java:560)
... 18 more
Caused by: java.io.EOFException: Unexpected end of input stream
at org.apache.hadoop.io.compress.DecompressorStream.decompress(DecompressorStream.java:165)
at org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:105)
at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
at java.io.InputStreamReader.read(InputStreamReader.java:184)
at com.univocity.parsers.common.input.DefaultCharInputReader.reloadBuffer(DefaultCharInputReader.java:78)
... 23 more
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2043)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2031)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2030)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2030)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:967)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:967)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:967)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2264)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2213)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2202)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:778)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2158)
at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1080)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
at org.apache.spark.rdd.RDD.reduce(RDD.scala:1062)
at org.apache.spark.rdd.RDD$$anonfun$takeOrdered$1.apply(RDD.scala:1484)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
at org.apache.spark.rdd.RDD.takeOrdered(RDD.scala:1471)
at org.apache.spark.sql.execution.TakeOrderedAndProjectExec.executeCollect(limit.scala:136)
at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:3267)
at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:3264)
at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3370)
at org.apache.spark.sql.execution.SQLExecution$.org$apache$spark$sql$execution$SQLExecution$$executeQuery$1(SQLExecution.scala:83)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1$$anonfun$apply$1.apply(SQLExecution.scala:94)
at org.apache.spark.sql.execution.QueryExecutionMetrics$.withMetrics(QueryExecutionMetrics.scala:141)
at org.apache.spark.sql.execution.SQLExecution$.org$apache$spark$sql$execution$SQLExecution$$withMetrics(SQLExecution.scala:178)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:93)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:200)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:92)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3369)
at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3264)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.univocity.parsers.common.TextParsingException: java.lang.IllegalStateException - Error reading from input
Parser Configuration: CsvParserSettings:
Auto configuration enabled=true
Autodetect column delimiter=false
Autodetect quotes=false
Column reordering enabled=true
Delimiters for detection=null
Empty value=
Escape unquoted values=false
Header extraction enabled=null
Headers=null
Ignore leading whitespaces=false
Ignore leading whitespaces in quotes=false
Ignore trailing whitespaces=false
Ignore trailing whitespaces in quotes=false
Input buffer size=128
Input reading on separate thread=false
Keep escape sequences=false
Keep quotes=false
Length of content displayed on error=-1
Line separator detection enabled=false
Maximum number of characters per column=-1
Maximum number of columns=20480
Normalize escaped line separators=true
Null value=
Number of records to read=all
Processor=none
Restricting data in exceptions=false
RowProcessor error handler=null
Selected fields=field selection: [2]
Skip bits as whitespace=true
Skip empty lines=true
Unescaped quote handling=STOP_AT_DELIMITERFormat configuration:
CsvFormat:
Comment character=\0
Field delimiter=,
Line separator (normalized)=\n
Line separator sequence=\n
Quote character="
Quote escape character="
Quote escape escape character=null
Internal state when error was thrown: line=737459, column=3, record=235, charIndex=457399297, headers=[attachment_md5_checksum, attachment_filename, attachment_text, attachment_urlsafe_base64_bytes, notice_id, title, solicitation_number, department_ind_agency, cgac, sub_tier, fpds_code, office, aac_code, posted_date, type, base_type, archive_type, archive_date, set_aside_code, set_aside, response_deadline, naice_code, classification_code, pop_street_address, pop_city, pop_state, pop_zip, pop_country, active, award_number, award_date, award_dollars, awardee, primary_contact_title, primary_contact_full_name, primary_contact_email, primary_contact_phone, primary_contact_fax, secondary_contact_title, secondary_contact_full_name, secondary_contact_email, secondary_contact_phone, secondary_contact_fax, organization_type, state, city, zip_code, country_code, additional_info_link, link, description]
at com.univocity.parsers.common.AbstractParser.handleException(AbstractParser.java:369)
at com.univocity.parsers.common.AbstractParser.parseNext(AbstractParser.java:595)
at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anon$1.next(UnivocityParser.scala:330)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
at scala.collection.TraversableOnce$FlattenOps$$anon$1.hasNext(TraversableOnce.scala:464)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:130)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.sort_addToSorter_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:585)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.hasNext(SerDeUtil.scala:153)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:148)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224)
at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:561)
at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:346)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1990)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:195)
Caused by: java.lang.IllegalStateException: Error reading from input
at com.univocity.parsers.common.input.DefaultCharInputReader.reloadBuffer(DefaultCharInputReader.java:80)
at com.univocity.parsers.common.input.AbstractCharInputReader.updateBuffer(AbstractCharInputReader.java:192)
at com.univocity.parsers.common.input.AbstractCharInputReader.nextChar(AbstractCharInputReader.java:269)
at com.univocity.parsers.common.input.NoopCharAppender.appendUntil(NoopCharAppender.java:170)
at com.univocity.parsers.csv.CsvParser.parseRecord(CsvParser.java:186)
at com.univocity.parsers.common.AbstractParser.parseNext(AbstractParser.java:560)
... 18 more
Caused by: java.io.EOFException: Unexpected end of input stream
at org.apache.hadoop.io.compress.DecompressorStream.decompress(DecompressorStream.java:165)
at org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:105)
at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
at java.io.InputStreamReader.read(InputStreamReader.java:184)
at com.univocity.parsers.common.input.DefaultCharInputReader.reloadBuffer(DefaultCharInputReader.java:78)
... 23 more
(<class 'py4j.protocol.Py4JJavaError'>, Py4JJavaError('An error occurred while calling o1331.collectToPython.\n', JavaObject id=o1338), <traceback object at 0x7ff3f2ff9230>)
我经历了在每个 csv 上单独运行此代码的过程,并将其缩小到导致此错误的 6 个。显然,我可以从列表中删除这 6 个文件,然后代码可以正常运行,但如果有办法使用代码来诊断和可能修复这些文件,我想先尝试该路线。关于我如何做到这一点的任何建议/想法?
编辑
根据下面的建议,我尝试使用以下代码打开文件,然后打印内容:
with open('file.csv.bz2', 'r', encoding='ISO-8859-1') as f:
lines = f.readlines()
print(lines)
这可以毫无问题地运行。但是,我随后尝试在熊猫中打开它并得到了一个 EOFError。
答:
0赞
Yosi Dahari
10/8/2021
#1
您可以加载文件,同时保留所有记录:
您可以将没有这 6 个文件的所有记录加载到单个数据框中。
使用 #1 中的架构加载 6 个文件,同时使用模式(请参阅示例)并保留格式错误的列。
PERMISSIVE
(可选)可以重命名 currepted 列名称。请参阅无法使用 PERMISSIVE 模式在 pyspark 中保留损坏的行
现在你可以看到哪些是畸形的,并决定是否删除它们,将它们写入交易信函队列,记录你决定的任何内容。
评论