提问人:PracticingPython 提问时间:9/10/2021 最后编辑:PracticingPython 更新时间:9/14/2021 访问量:1127
pyspark 如何加速 toPandas() 方法
pyspark how to speed up toPandas() method
问:
我有一个相当大的 pyspark 数据帧,需要转换为 pandas(使用 toPandas() 方法),以便我可以更轻松地在我的 s3 存储桶中创建 csv。但是,当我尝试运行它时,脚本只是在那里放置了很长时间,然后我才收到一个错误,通常是“被外部信号杀死”。toPandas() 在其他较小的 pyspark 数据帧上运行良好,所以这是因为我的数据帧太大了,我遇到了麻烦。此外,我还应用了各种转换,包括 limit(100),它应该只剩下一个只有 100 行的数据帧,这应该意味着 toPandas() 应该运行得非常快,但它仍然运行得很慢。
我找到了这篇文章:https://bryancutler.github.io/toPandas/ 它给了我以下代码片段:
spark.conf.set("spark.sql.execution.arrow.enabled","true")
但这似乎也无济于事。那么,鉴于我的数据帧如此之大,有没有另一种方法可以将此 pyspark 数据帧放入 pandas 中,或者一起绕过 Pandas 并直接保存到 s3 存储桶中?
任何帮助或想法都会很棒!
谢谢
编辑:
法典:
from pyspark.sql.functions import *
import pandas as pd
words = {"blah", "blarg", "python", "thisshouldn'texist"}
maxInter = 3 # maximum intermediate words between the target words
wordSpan = len(words)+maxInter*(len(words)-1)
anyWord = "|".join(words)
allWords = "".join(r"(?=(\w+\W*){0,SPAN}WORD\b)".replace("WORD",w)
for w in words)
allWords = allWords.replace("SPAN",str(wordSpan-1))
pattern = r"\bALL(\b(ANY)(\W+\w+\W*){0,INTER}){COUNT,COUNT}"
pattern = pattern.replace("COUNT",str(len(words)))
pattern = pattern.replace("INTER",str(maxInter))
pattern = pattern.replace("ALL",allWords)
pattern = pattern.replace("ANY",anyWord)
filtered = df.withColumn("attachment_text", regexp_replace('attachment_text', pattern, 'ꙮ')).withColumn("attachment_text",regexp_replace('attachment_text', '[^ꙮ]+', ''))
ranked = filtered.withColumn('Rank', length('attachment_text'))
sort = ranked.orderBy(ranked.Rank.desc()).limit(100)
sort_pd = sort.toPandas()
目前,我的“词语”选择是保证不会得到任何匹配。但是,无论我保留这样的“单词”还是将它们更改为将给出相对较少结果的东西,toPandas() 仍然要么“杀死”脚本,要么似乎永远卡在处理中。但是,如果我注释掉转换为 pandas 的最后一行并运行剩余的脚本,它几乎会立即完成。
我还尝试生成一个更小的模拟数据集,并通过它运行相同的脚本,它一直处理到在我的 s3 存储桶中生成 csv,延迟最小。
编辑:
== Parsed Logical Plan ==
GlobalLimit 100
+- LocalLimit 100
+- Sort [Rank#3865 DESC NULLS LAST], true
+- Project [attachment_md5_checksum#3659, attachment_filename#3660, attachment_text#3813, attachment_urlsafe_base64_bytes#3662, notice_id#3663, title#3664, solicitation_number#3665, department_ind_agency#3666, cgac#3667, sub_tier#3668, fpds_code#3669, office#3670, aac_code#3671, posted_date#3672, type#3673, base_type#3674, archive_type#3675, archive_date#3676, set_aside_code#3677, set_aside#3678, response_deadline#3679, naice_code#3680, classification_code#3681, pop_street_address#3682, ... 28 more fields]
+- Project [attachment_md5_checksum#3659, attachment_filename#3660, regexp_replace(attachment_text#3761, [^ꙮ]+, ) AS attachment_text#3813, attachment_urlsafe_base64_bytes#3662, notice_id#3663, title#3664, solicitation_number#3665, department_ind_agency#3666, cgac#3667, sub_tier#3668, fpds_code#3669, office#3670, aac_code#3671, posted_date#3672, type#3673, base_type#3674, archive_type#3675, archive_date#3676, set_aside_code#3677, set_aside#3678, response_deadline#3679, naice_code#3680, classification_code#3681, pop_street_address#3682, ... 27 more fields]
+- Project [attachment_md5_checksum#3659, attachment_filename#3660, regexp_replace(attachment_text#3661, \b(?=(\w+\W*){0,0}airfield\b)(\b(airfield)(\W+\w+\W*){0,3}){1,1}, ꙮ) AS attachment_text#3761, attachment_urlsafe_base64_bytes#3662, notice_id#3663, title#3664, solicitation_number#3665, department_ind_agency#3666, cgac#3667, sub_tier#3668, fpds_code#3669, office#3670, aac_code#3671, posted_date#3672, type#3673, base_type#3674, archive_type#3675, archive_date#3676, set_aside_code#3677, set_aside#3678, response_deadline#3679, naice_code#3680, classification_code#3681, pop_street_address#3682, ... 27 more fields]
+- Relation[attachment_md5_checksum#3659,attachment_filename#3660,attachment_text#3661,attachment_urlsafe_base64_bytes#3662,notice_id#3663,title#3664,solicitation_number#3665,department_ind_agency#3666,cgac#3667,sub_tier#3668,fpds_code#3669,office#3670,aac_code#3671,posted_date#3672,type#3673,base_type#3674,archive_type#3675,archive_date#3676,set_aside_code#3677,set_aside#3678,response_deadline#3679,naice_code#3680,classification_code#3681,pop_street_address#3682,... 27 more fields] csv
== Analyzed Logical Plan ==
attachment_md5_checksum: string, attachment_filename: string, attachment_text: string, attachment_urlsafe_base64_bytes: string, notice_id: string, title: string, solicitation_number: string, department_ind_agency: string, cgac: string, sub_tier: string, fpds_code: string, office: string, aac_code: string, posted_date: string, type: string, base_type: string, archive_type: string, archive_date: string, set_aside_code: string, set_aside: string, response_deadline: string, naice_code: string, classification_code: string, pop_street_address: string, ... 28 more fields
GlobalLimit 100
+- LocalLimit 100
+- Sort [Rank#3865 DESC NULLS LAST], true
+- Project [attachment_md5_checksum#3659, attachment_filename#3660, attachment_text#3813, attachment_urlsafe_base64_bytes#3662, notice_id#3663, title#3664, solicitation_number#3665, department_ind_agency#3666, cgac#3667, sub_tier#3668, fpds_code#3669, office#3670, aac_code#3671, posted_date#3672, type#3673, base_type#3674, archive_type#3675, archive_date#3676, set_aside_code#3677, set_aside#3678, response_deadline#3679, naice_code#3680, classification_code#3681, pop_street_address#3682, ... 28 more fields]
+- Project [attachment_md5_checksum#3659, attachment_filename#3660, regexp_replace(attachment_text#3761, [^ꙮ]+, ) AS attachment_text#3813, attachment_urlsafe_base64_bytes#3662, notice_id#3663, title#3664, solicitation_number#3665, department_ind_agency#3666, cgac#3667, sub_tier#3668, fpds_code#3669, office#3670, aac_code#3671, posted_date#3672, type#3673, base_type#3674, archive_type#3675, archive_date#3676, set_aside_code#3677, set_aside#3678, response_deadline#3679, naice_code#3680, classification_code#3681, pop_street_address#3682, ... 27 more fields]
+- Project [attachment_md5_checksum#3659, attachment_filename#3660, regexp_replace(attachment_text#3661, \b(?=(\w+\W*){0,0}airfield\b)(\b(airfield)(\W+\w+\W*){0,3}){1,1}, ꙮ) AS attachment_text#3761, attachment_urlsafe_base64_bytes#3662, notice_id#3663, title#3664, solicitation_number#3665, department_ind_agency#3666, cgac#3667, sub_tier#3668, fpds_code#3669, office#3670, aac_code#3671, posted_date#3672, type#3673, base_type#3674, archive_type#3675, archive_date#3676, set_aside_code#3677, set_aside#3678, response_deadline#3679, naice_code#3680, classification_code#3681, pop_street_address#3682, ... 27 more fields]
+- Relation[attachment_md5_checksum#3659,attachment_filename#3660,attachment_text#3661,attachment_urlsafe_base64_bytes#3662,notice_id#3663,title#3664,solicitation_number#3665,department_ind_agency#3666,cgac#3667,sub_tier#3668,fpds_code#3669,office#3670,aac_code#3671,posted_date#3672,type#3673,base_type#3674,archive_type#3675,archive_date#3676,set_aside_code#3677,set_aside#3678,response_deadline#3679,naice_code#3680,classification_code#3681,pop_street_address#3682,... 27 more fields] csv
== Optimized Logical Plan ==
GlobalLimit 100
+- LocalLimit 100
+- Sort [Rank#3865 DESC NULLS LAST], true
+- Project [attachment_md5_checksum#3659, attachment_filename#3660, regexp_replace(regexp_replace(attachment_text#3661, \b(?=(\w+\W*){0,0}airfield\b)(\b(airfield)(\W+\w+\W*){0,3}){1,1}, ꙮ), [^ꙮ]+, ) AS attachment_text#3813, attachment_urlsafe_base64_bytes#3662, notice_id#3663, title#3664, solicitation_number#3665, department_ind_agency#3666, cgac#3667, sub_tier#3668, fpds_code#3669, office#3670, aac_code#3671, posted_date#3672, type#3673, base_type#3674, archive_type#3675, archive_date#3676, set_aside_code#3677, set_aside#3678, response_deadline#3679, naice_code#3680, classification_code#3681, pop_street_address#3682, ... 28 more fields]
+- Relation[attachment_md5_checksum#3659,attachment_filename#3660,attachment_text#3661,attachment_urlsafe_base64_bytes#3662,notice_id#3663,title#3664,solicitation_number#3665,department_ind_agency#3666,cgac#3667,sub_tier#3668,fpds_code#3669,office#3670,aac_code#3671,posted_date#3672,type#3673,base_type#3674,archive_type#3675,archive_date#3676,set_aside_code#3677,set_aside#3678,response_deadline#3679,naice_code#3680,classification_code#3681,pop_street_address#3682,... 27 more fields] csv
== Physical Plan ==
TakeOrderedAndProject(limit=100, orderBy=[Rank#3865 DESC NULLS LAST], output=[field names,... 28 more fields])
+- *(1) Project [attachment_md5_checksum#3659, attachment_filename#3660, regexp_replace(regexp_replace(attachment_text#3661, \b(?=(\w+\W*){0,0}airfield\b)(\b(airfield)(\W+\w+\W*){0,3}){1,1}, ꙮ), [^ꙮ]+, ) AS field names ... 28 more fields]
+- *(1) FileScan csv [field names,... 27 more fields] Batched: false, Format: CSV, Location: InMemoryFileIndex[s3a://..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<attachment_md5_checksum:string,attachment_filename:string,attachment_text:string,attachmen...
答: 暂无答案
评论
dataframe.write.csv
toPandas()
sort.explain(True)