Spark sql 相关子查询组 by

Spark sql correlated subquery group by

提问人:Andrea Campolonghi 提问时间:11/17/2023 更新时间:11/17/2023 访问量:26

问:

Spark SQL 版本 3.3。

以下查询:


select d.from_id,
       d.to_id,
       d.hts_code,
       min(d.transaction_date)                                            as earliest_transaction_date,
       max(d.transaction_date)                                            as latest_transaction_date,
       cast(months_between( current_date, max(d.transaction_date)) AS INT) AS months_since_last_transaction,
       (select count(*)
            from quarters q
            WHERE q.from_id = d.from_id
            AND q.to_id = d.to_id
            AND q.hts_code = d.hts_code
            group by q.from_id, q.to_id, q.hts_code
        ) as quarters
from data d
group by d.from_id,
         d.to_id,
         d.hts_code;

失败并出现以下错误:

AnalysisException: Correlated scalar subquery 'scalarsubquery(d.from_id, d.to_id, d.hts_code)' is neither present in the group by, nor in an aggregate function. Add it to group by using ordinal position or wrap it in first() (or first_value) if you don't care which value you get.

除了子查询的 WHERE 子句之外,我不能在任何其他位置添加外部引用。我真的不明白分析器在这里表达什么。

apache-spark pyspark apache-spark-sql

评论


答:

0赞 Chris 11/17/2023 #1

原谅 Scala:

import sparkSession.implicits._
val data = Seq(("a", 1),("b",2),("a", 2)).toDF("letter","number")
data.createOrReplaceTempView("data")
sparkSession.sql(s"""
select letter, number, (
  select count(*)
  from data a
  where o.letter = a.letter
)
from data o
""").show
}

产量,如您所料:

+------+------+----------------------+
|letter|number|scalarsubquery(letter)|
+------+------+----------------------+
|     a|     1|                     2|
|     b|     2|                     1|
|     a|     2|                     2|
+------+------+----------------------+

因此,这是一个使用 count 的简单相关值 - 它只能有一个结果。

在上面的代码之后使用此查询来模拟一个组:

sparkSession.sql(s"""
select letter, number, (
  select count(*)
  from data a
  where o.letter = a.letter
  group by a.letter
)
from data o
""").show

将导致:

A GROUP BY clause in a scalar correlated subquery cannot contain non-correlated columns: letter#7

这类似于您拥有的 sql:

select count(*)
...
group by q.from_id, q.to_id, q.hts_code

这不是有效的 SQL。您需要在选择(问题的第一部分)中具有这些值,正如您所指出的那样,在除 haveing/where 之外的任何地方使用外部引用都会出现错误:

Expressions referencing the outer query are not supported outside of WHERE/HAVING clauses

所以接下来你尝试:

sparkSession.sql(s"""
select letter, number, (
  select count(*) c, a.letter
  from data a
  where o.letter = a.letter
  group by a.letter
) thecount
from data o
""").show

这留下了错误:

Scalar subquery must return only one column, but got 2

您可以将其重写为连接:

sparkSession.sql(s"""
select o.letter, o.number, a.c
from data o join (
  select count(*) c, a.letter
  from data a
  group by a.letter
) a on a.letter = o.letter
""").show

屈服:

+------+------+---+
|letter|number|  c|
+------+------+---+
|     a|     2|  2|
|     a|     1|  2|
|     b|     2|  1|
+------+------+---+

如果您必须具有与子查询类似的查询(例如,您的字段在其他地方定义 - Quality 有这种方法),您可以尝试如下重写:

sparkSession.sql(s"""
select letter, number,
  (
    select max(c)
    from (
      select count(*) c, letter
      from data
      group by letter
    ) a where o.letter = a.letter
  ) thecount
from data o
""").show

这也会产生:

+------+------+--------+
|letter|number|thecount|
+------+------+--------+
|     a|     1|       2|
|     b|     2|       1|
|     a|     2|       2|
+------+------+--------+

在这种情况下,您仍然必须有一个总回报(max、first 等)。

注意:从 3.4/3.5 开始,相关子查询可以更强大 - 您可以使用存在和自由使用 SPARK-36114