提问人:PineNuts0 提问时间:7/28/2018 最后编辑:paultPineNuts0 更新时间:5/23/2019 访问量:67906
PySpark:创建新列并根据其他两列的条件进行填充
PySpark: Create New Column And Fill In Based on Conditions of Two Other Columns
问:
我有以下数据框:
+---+---+------+
| id| ts|days_r|
+---+---+------+
|123| T| 32|
|342| I| 3|
|349| L| 10|
+---+---+------+
我想创建一个新列并根据“ts”列和“days_r”列是否满足某些条件来填充值。
这是我想要的数据帧:
+---+---+------+----------+
| id| ts|days_r|0to2_count|
+---+---+------+----------+
|123| T| 32| 1|
|342| I| 3| 0|
|349| L| 10| 0|
+---+---+------+----------+
我在pyspark中尝试了以下代码:
df = df.withColumn('0to2_count', F.when((F.col("ts") == 'I') & (F.col('days_r') >=0) & (F.col('days_r') <= 2), 1) \
.otherwise(F.when((F.col("ts") == 'T') & (F.col('days_r') >=0) & (F.col('days_r') <= 48), 1) \
.otherwise(F.when((F.col("ts") == 'L') & (F.col('days_r') >=0 & F.col('days_r') <= 7), 1) \
.otherwise(0))))
我收到以下错误:
Traceback (most recent call last):
File "perc_0to2", line 1, in <module>
File "perc_0to2", line 9, in perc_0to2
File "/tmp/conda-4df0bea5-3a72-444c-b3c5-f1562d678953/real/envs/conda-env/lib/python2.7/site-packages/pyspark/sql/column.py", line 115, in _
njc = getattr(self._jc, name)(jc)
File "/tmp/conda-4df0bea5-3a72-444c-b3c5-f1562d678953/real/envs/conda-env/lib/python2.7/site-packages/py4j/java_gateway.py", line 1257, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "/tmp/conda-4df0bea5-3a72-444c-b3c5-f1562d678953/real/envs/conda-env/lib/python2.7/site-packages/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
File "/tmp/conda-4df0bea5-3a72-444c-b3c5-f1562d678953/real/envs/conda-env/lib/python2.7/site-packages/py4j/protocol.py", line 332, in get_return_value
format(target_id, ".", name, value))
Py4JError: An error occurred while calling o826.and. Trace:
py4j.Py4JException: Method and([class java.lang.Integer]) does not exist
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
at py4j.Gateway.invoke(Gateway.java:274)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.sendCommand(ClientServerConnection.java:244)
at py4j.CallbackClient.sendCommand(CallbackClient.java:384)
at py4j.CallbackClient.sendCommand(CallbackClient.java:356)
at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:106)
at com.sun.proxy.$Proxy94.execute(Unknown Source)
at com.palantir.arrow.module.compute.DelegatedComputeService.lambda$execute$0(DelegatedComputeService.java:63)
at com.palantir.foundry.spark.api.SparkAuthorization.runAsUserInternal(SparkAuthorization.java:164)
at com.palantir.foundry.spark.api.SparkAuthorization.runAsUser(SparkAuthorization.java:105)
at com.palantir.arrow.module.compute.DelegatedComputeService.execute(DelegatedComputeService.java:62)
at com.palantir.arrow.module.ArrowSparkModuleResource.lambda$executeAsync$0(ArrowSparkModuleResource.java:106)
at com.palantir.remoting3.tracing.DeferredTracer.withTrace(DeferredTracer.java:43)
at com.palantir.remoting3.tracing.Tracers$TracingAwareCallable.call(Tracers.java:219)
at com.codahale.metrics.InstrumentedExecutorService$InstrumentedCallable.call(InstrumentedExecutorService.java:197)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
答:
38赞
pault
7/28/2018
#1
您的代码有一个错误 - 您在第三行缺少一组括号。下面是一种修复代码的方法,并使用链接的 when()
语句而不是使用多个语句:otherwise()
df = df.withColumn(
'0to2_count',
F.when((F.col("ts") == 'I') & (F.col("days_r") >=0) & (F.col("days_r") <= 2), 1)\
.when((F.col("ts") == 'T') & (F.col('days_r') >=0) & (F.col('days_r') <= 48), 1)\
.when((F.col("ts") == 'L') & (F.col('days_r') >=0) & (F.col('days_r') <= 7), 1)\
.otherwise(0)
)
编写此逻辑的更好方法是使用 pyspark.sql.Column.between()
:
df = df.withColumn(
'0to2_count',
F.when((F.col("ts") == 'I') & F.col("days_r").between(0, 2), 1)\
.when((F.col("ts") == 'T') & F.col('days_r').between(0,48), 1)\
.when((F.col("ts") == 'L') & F.col('days_r').between(0,7), 1)\
.otherwise(0)
)
df.show()
#+---+---+------+----------+
#| id| ts|days_r|0to2_count|
#+---+---+------+----------+
#|123| T| 32| 1|
#|342| I| 3| 0|
#|349| L| 10| 0|
#+---+---+------+----------+
当然,由于前三个条件返回相同的值,因此您可以将其进一步简化为一个布尔逻辑条件。
评论
otherwise(0)
.otherwise(F.lit(0))
.otherwise(when().otherwise(when()))
when