将 Mongo 集合加载为 Spark 数据集时,如何在架构中指定 BigDecimal 小数位数和精度

How to specify BigDecimal scale and precision in schema when loading a Mongo collection as a Spark Dataset

提问人:Peewee 733 提问时间:8/3/2020 更新时间:8/5/2020 访问量:1356

问:

我正在尝试使用 Scala Mongo 连接器将大型 Mongo 集合加载到 Apache Spark 中。

我正在使用以下版本:

libraryDependencies += "org.apache.spark" %% "spark-core" % "3.0.0" 
libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.0.0" 
libraryDependencies += "org.mongodb.spark" %% "mongo-spark-connector" % "2.4.2"
scalaVersion := "2.12.12"
openjdk version "11.0.8" 2020-07-14

该集合包含大于 的大型整数十进制值。我想获取的数据集是一个集合,其中包含一个名为 的相应案例类,定义:1e13Output

case class Output(time: Long, pubKeyId: Long, value: BigDecimal, outIndex: Long, outTxId: Long)

如果我在不指定案例类的情况下使用 MongoSpark.load:

val ds = MongoSpark.load(sc, rc).toDS[Output]

然后 Mongo 通过随机抽样集合来推断架构。这会导致该属性的随机比例,并且任何溢出随机获取的比例的文档在生成的数据集中都缺少属性。这显然是不可取的。valuevaluevalue

或者,根据 Mongo Spark 连接器的文档,我可以通过指定 case 类作为 的类型参数化来显式设置架构,例如:load

val ds = MongoSpark.load[Output](sc, rc).toDS[Output]

但是,在案例类定义中,我只能指定 as 的类型,这不允许我明确说明所需的比例和精度。生成的架构使用默认精度和小数位数 (38,18),这并不总是需要的:valueBigDecimal

root
 |-- time: long (nullable = false)
 |-- pubKeyId: long (nullable = false)
 |-- value: decimal(38,18) (nullable = true)
 |-- outIndex: long (nullable = false)
 |-- outTxId: long (nullable = false)

这与 Spark SQL API 形成鲜明对比,后者允许使用 DecimalType 显式指定小数位数和精度,例如:

val mySchema = StructType(StructField("value", DecimalType(30, 0)) :: Nil)

在将 Mongo 集合加载到 Apache Spark 中时,如何为架构中的大十进制类型请求特定的小数位数和精度,类似于上面的代码?

mongodb apache-spark-sql 架构 精度 bigdecimal

评论


答:

0赞 D. SM 8/3/2020 #1

据我所知,Decimal128 中的尾数和指数是固定大小的。因此,除非您能找到相反的证据,否则MongoDB允许为其小数点指定小数位数和精度是没有意义的。

我的理解是关系数据库会根据规模和精度使用不同的浮点类型(例如 32 位与 64 位浮点数),但在 MongoDB 中,数据库保留了它给出的类型,所以如果你想要一个更短的浮点数,你需要让你的应用程序发送它而不是十进制类型。

评论

0赞 Peewee 733 8/3/2020
问题不在于如何在 Mongo 中内部存储值,而在于它在 Spark 应用程序中的表示方式。Spark SQL 架构指定 Spark 如何表示数据,而不是数据在 Mongo 中的存储方式。在 Scala Spark 应用程序中,从 Mongo 加载数据时,BSON Decimal128 类型将转换为具有特定比例和精度的 BigDecimal。能够指定规模和精度非常重要,因为这将对群集在 RAM 和磁盘空间方面的大小产生影响。
0赞 Peewee 733 8/3/2020
请参阅以下第 309-313 行,了解 Mongo Spark 连接器在使用架构推理时如何推断 BigDecimal 小数位数和精度。github.com/mongodb/mongo-spark/blob/master/src/main/scala/com/......
0赞 Peewee 733 8/3/2020
为了澄清上述内容,我应该说“将数据从 Mongo 加载到 Spark 时,Spark SQL Schema 指定 Spark 如何表示数据,而不是数据在 Mongo 中的存储方式。
0赞 D. SM 8/3/2020
对不起,我不知道。
0赞 Peewee 733 8/5/2020 #2

我能够通过绕过帮助程序方法并直接调用实例来做到这一点:loadtoDF(schema)MongoSpark

 val schema = StructType(
                             List(StructField("time", LongType, false),
                                  StructField("pubKeyId", LongType, false),
                                  StructField("value", DecimalType(30, 0), false),
                                  StructField("outIndex", LongType, false),
                                  StructField("outTxId", LongType, false)
                             ))
    val outputs =    
      builder().sparkContext(sc).readConfig(rc).build().toDF(schema).as[Output]

这会产生正确的架构,并且数据会正确读入 Spark,而不会出现任何缺失值:

    outputs.printSchema()
 |-- time: long (nullable = false)
 |-- pubKeyId: long (nullable = false)
 |-- value: decimal(30,0) (nullable = false)
 |-- outIndex: long (nullable = false)
 |-- outTxId: long (nullable = false)

评论

0赞 Peewee 733 8/5/2020
除了能够指定小数位数和范围之外,这种方法还有一个优点,那就是我们可以指定字段是不可为空的。这与将始终使用类型为 null 的字段相反。load[Output]Decimal(38,18)