如何使用Calcite优化SQL并将其从一个数据库引擎重写到另一个数据库引擎

How to use Calcite to optimise and rewrite SQL from one DB engine to another

提问人:Chitral Verma 提问时间:11/14/2023 更新时间:11/14/2023 访问量:13

问:

我正在尝试开发一个通用系统,该系统公开和 API,如下所示,

val outputSql: String = SQLRewriter
  .inputSql(query = " <query> ", engine = Engine.SparkSQL, connectionHook = < connection >)
  .rewriteTo(engine = Engine.MySQL)

这个系统应该,

  1. 能够读取一些受支持引擎的输入 SQL 查询字符串
  2. 优化目标引擎的查询(项目下推)
  3. 将目标引擎的优化查询重写为字符串

我认为 Apache Calcite 非常适合这一点,或者它已经具备了这些功能。 为此,我尝试在代码中浏览文档、博客和文档字符串,但我觉得我在兜圈子。

我想知道,如果,

  1. 方解石已经具备了这些能力
  2. 如果它非常适合此用例
  3. 您可以向我指出的任何代码示例

一些专家可以帮助我吗?谢谢。

java sql apache-calcite

评论


答:

0赞 Chitral Verma 11/14/2023 #1

这个想法是,

  • 通用规划结构中的代码
  • 优化(基于成本)此结构,几乎不需要配置
  • 改写为任何方言

今天我又在胡闹,并设法使它成功地用于示例查询 -

  • 在Calcite DSL中编写了一个随机的Join+Filter查询,
  • 使用基于成本的 VolcanoPlanner 对其进行优化
  • 将此优化计划转换为 SparkSql、Teradata、Snowflake 等方言。

我设法得到了一个这样的工作 POC,但不确定这是否是最好的方法。任何方解石专家都想审查这种方法吗?

import org.apache.calcite.adapter.enumerable.EnumerableConvention
import org.apache.calcite.jdbc.CalciteSchema
import org.apache.calcite.plan.volcano.VolcanoPlanner
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.core.JoinRelType
import org.apache.calcite.rel.externalize.RelWriterImpl
import org.apache.calcite.rel.rel2sql.RelToSqlConverter
import org.apache.calcite.sql.dialect.*
import org.apache.calcite.test.CalciteAssert
import org.apache.calcite.tools.{Frameworks, RelBuilder}

import java.io.PrintWriter

   
val rootSchema = CalciteSchema.createRootSchema(true).plus()
val config = Frameworks
  .newConfigBuilder()
  .defaultSchema(CalciteAssert.addSchema(rootSchema, CalciteAssert.SchemaSpec.HR))
  .build()
val builder = RelBuilder.create(config)

// Create a example plan using calcite, this should be replaced with real business logic
val opTree: RelNode = builder
  .scan("emps") // scan table 1
  .scan("depts") // scan table 2
  .join(JoinRelType.INNER, "deptno") // inner join between the 2 tables on deptno
  .filter(builder.equals(builder.field("empid"), builder.literal(100))) // filter on empid
  .build

val rw = new RelWriterImpl(new PrintWriter(System.out, true))

// Print basic Logical Plan
opTree.explain(rw)

val cluster = opTree.getCluster
val planner = cluster.getPlanner.asInstanceOf[VolcanoPlanner]

val desiredTraits = cluster.traitSet.replace(EnumerableConvention.INSTANCE)
val newRoot = planner.changeTraits(opTree, desiredTraits)
planner.setRoot(newRoot)

val optimized: RelNode = planner.findBestExp

// Print optimized Logical Plan
// filter happens before join to reduce the amount of data joined. Rules can be configured.
optimized.explain(rw)

// Rewrite Logical Plan as SQL queries based on different dialects
// Each of these dialects can be configured, with UDFs/ procedures etc.
val sqlDialects = Seq(
  SparkSqlDialect.DEFAULT,
  MysqlSqlDialect.DEFAULT,
  PostgresqlSqlDialect.DEFAULT,
  SnowflakeSqlDialect.DEFAULT,
  TeradataSqlDialect.DEFAULT,
  RedshiftSqlDialect.DEFAULT,
  HiveSqlDialect.DEFAULT)

sqlDialects.foreach(dialect => {
  // print name of dialect
  println(dialect)

  // print SQL as per the dialect. dialect parser can be heavily configured.
  val conv = RelToSqlConverter(dialect)
  println(conv.visitRoot(optimized).asQueryOrValues().toString)
})