提问人:Peewee 733 提问时间:8/3/2020 更新时间:8/5/2020 访问量:1356
将 Mongo 集合加载为 Spark 数据集时,如何在架构中指定 BigDecimal 小数位数和精度
How to specify BigDecimal scale and precision in schema when loading a Mongo collection as a Spark Dataset
问:
我正在尝试使用 Scala Mongo 连接器将大型 Mongo 集合加载到 Apache Spark 中。
我正在使用以下版本:
libraryDependencies += "org.apache.spark" %% "spark-core" % "3.0.0"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.0.0"
libraryDependencies += "org.mongodb.spark" %% "mongo-spark-connector" % "2.4.2"
scalaVersion := "2.12.12"
openjdk version "11.0.8" 2020-07-14
该集合包含大于 的大型整数十进制值。我想获取的数据集是一个集合,其中包含一个名为 的相应案例类,定义:1e13
Output
case class Output(time: Long, pubKeyId: Long, value: BigDecimal, outIndex: Long, outTxId: Long)
如果我在不指定案例类的情况下使用 MongoSpark.load:
val ds = MongoSpark.load(sc, rc).toDS[Output]
然后 Mongo 通过随机抽样集合来推断架构。这会导致该属性的随机比例,并且任何溢出随机获取的比例的文档在生成的数据集中都缺少属性。这显然是不可取的。value
value
value
或者,根据 Mongo Spark 连接器的文档,我可以通过指定 case 类作为 的类型参数化来显式设置架构,例如:load
val ds = MongoSpark.load[Output](sc, rc).toDS[Output]
但是,在案例类定义中,我只能指定 as 的类型,这不允许我明确说明所需的比例和精度。生成的架构使用默认精度和小数位数 (38,18),这并不总是需要的:value
BigDecimal
root
|-- time: long (nullable = false)
|-- pubKeyId: long (nullable = false)
|-- value: decimal(38,18) (nullable = true)
|-- outIndex: long (nullable = false)
|-- outTxId: long (nullable = false)
这与 Spark SQL API 形成鲜明对比,后者允许使用 DecimalType
显式指定小数位数和精度,例如:
val mySchema = StructType(StructField("value", DecimalType(30, 0)) :: Nil)
在将 Mongo 集合加载到 Apache Spark 中时,如何为架构中的大十进制类型请求特定的小数位数和精度,类似于上面的代码?
答:
据我所知,Decimal128 中的尾数和指数是固定大小的。因此,除非您能找到相反的证据,否则MongoDB允许为其小数点指定小数位数和精度是没有意义的。
我的理解是关系数据库会根据规模和精度使用不同的浮点类型(例如 32 位与 64 位浮点数),但在 MongoDB 中,数据库保留了它给出的类型,所以如果你想要一个更短的浮点数,你需要让你的应用程序发送它而不是十进制类型。
评论
我能够通过绕过帮助程序方法并直接调用实例来做到这一点:load
toDF(schema)
MongoSpark
val schema = StructType(
List(StructField("time", LongType, false),
StructField("pubKeyId", LongType, false),
StructField("value", DecimalType(30, 0), false),
StructField("outIndex", LongType, false),
StructField("outTxId", LongType, false)
))
val outputs =
builder().sparkContext(sc).readConfig(rc).build().toDF(schema).as[Output]
这会产生正确的架构,并且数据会正确读入 Spark,而不会出现任何缺失值:
outputs.printSchema()
|-- time: long (nullable = false)
|-- pubKeyId: long (nullable = false)
|-- value: decimal(30,0) (nullable = false)
|-- outIndex: long (nullable = false)
|-- outTxId: long (nullable = false)
评论
load[Output]
Decimal(38,18)
评论