具有 Seq 和 case 类的 Scala Spark 列解析器

Scala Spark Column Parser with Seq and case class

我有一个包含三列的地址数据帧,例如: “addressId”、“customerId”、“address”。 Address.csv中的值如下所示: A100,C100,“100,ABC街,MyCity,MyCountry”。

Address AddressRaw 的案例类如下所示:

  // Define your case class for AddressRaw
  case class AddressRawData(
    addressId: String,
    customerId: String,
    address: String


case class AddressData(
    addressId: String,
    customerId: String,
    address: String,
    number: Option[Int],
    road: Option[String],
    city: Option[String],
    country: Option[String]

我想创建一个数据帧“AddressData”,其中包含 AddressData 中提到的所有相应列。Addressraw Dataframe 中的地址需要传递到解析器中才能获取数字、道路、城市和国家/地区。 以下是我使用的代码,它给了我错误:

Can not resolve 'number'.它来了,因为数字不是地址数据帧的一部分。


// ... (other imports and definitions)

object CustomerAddress extends App {
 val spark = SparkSession.builder().master("local[*]").appName("CustomerAddress").getOrCreate()
 import spark.implicits._

 // Define your case classes
 // ... (addressParser and other definitions)

 val addressDF: DataFrame = spark.read.option("header", "false").csv("src/main/resources/address_data.csv")
   .toDF("addressId", "customerId", "address")

 val customerAccountDS = spark.read.parquet("src/main/resources/customerAccountOutputDS.parquet").as[CustomerAccountOutput]

    // Define your addressParser function
 def addressParser(unparsedAddress: Seq[AddressData]): Seq[AddressData] = {
   unparsedAddress.map(address => {
     val split = address.address.split(", ")
       number = Some(split(0).toInt),
       road = Some(split(1)),
       city = Some(split(2)),
       country = Some(split(3))

 // Apply the addressParser function to the address column
 val parsedAddress = addressDF.as[AddressData].groupByKey(_.customerId).mapGroups {
   case (customerId, addresses) => customerId -> addressParser(addresses.toSeq)
 }.toDF("customerId", "address")

 // Join the customerAccountDS and parsedAddress to create the final CustomerDocument
 val finalDF = customerAccountDS.join(parsedAddress, Seq("customerId"), "inner")

 // Show the records in the final DataFrame
0赞 Chris 8/14/2023 #1

错误消息不是很清楚,但代码方面,您不能将案例类嵌套在对象中,spark.implicit 用于生成编码器的函数将无法正确找到它们。不幸的是,这会导致运行时错误,而不是编译时。


object CustomerAddress extends App {
 val spark = SparkSession.builder().master("local[*]").appName("CustomerAddress").getOrCreate()
 import spark.implicits._
