具有 Seq 和 case 类的 Scala Spark 列解析器

Scala Spark Column Parser with Seq and case class

提问人:Mohit Rane 提问时间:8/11/2023 最后编辑:Mohit Rane 更新时间:8/14/2023 访问量:62

问:

我有一个包含三列的地址数据帧,例如: “addressId”、“customerId”、“address”。 Address.csv中的值如下所示: A100,C100,“100,ABC街,MyCity,MyCountry”。

Address AddressRaw 的案例类如下所示:

  // Define your case class for AddressRaw
  case class AddressRawData(
    addressId: String,
    customerId: String,
    address: String
  )

还有一个案例类,如下所示:

case class AddressData(
    addressId: String,
    customerId: String,
    address: String,
    number: Option[Int],
    road: Option[String],
    city: Option[String],
    country: Option[String]
  )

我想创建一个数据帧“AddressData”,其中包含 AddressData 中提到的所有相应列。Addressraw Dataframe 中的地址需要传递到解析器中才能获取数字、道路、城市和国家/地区。 以下是我使用的代码,它给了我错误:

Can not resolve 'number'.它来了,因为数字不是地址数据帧的一部分。

这是我尝试过的示例代码:


// ... (other imports and definitions)

object CustomerAddress extends App {
 val spark = SparkSession.builder().master("local[*]").appName("CustomerAddress").getOrCreate()
 import spark.implicits._
 Logger.getRootLogger.setLevel(Level.WARN)

 // Define your case classes
 case class AddressRawData(
   addressId: String,
   customerId: String,
   address: String
 )

 case class AddressData(
   addressId: String,
   customerId: String,
   address: String,
   number: Option[Int],
   road: Option[String],
   city: Option[String],
   country: Option[String]
 )

 case class AccountData(
   customerId: String,
   accountId: String,
   balance: Long
 )

 case class CustomerAccountOutput(
   customerId: String,
   forename: String,
   surname: String,
   accounts: Seq[AccountData]
 )

 // ... (addressParser and other definitions)

 val addressDF: DataFrame = spark.read.option("header", "false").csv("src/main/resources/address_data.csv")
   .toDF("addressId", "customerId", "address")

 val customerAccountDS = spark.read.parquet("src/main/resources/customerAccountOutputDS.parquet").as[CustomerAccountOutput]

    // Define your addressParser function
 def addressParser(unparsedAddress: Seq[AddressData]): Seq[AddressData] = {
   unparsedAddress.map(address => {
     val split = address.address.split(", ")
     address.copy(
       number = Some(split(0).toInt),
       road = Some(split(1)),
       city = Some(split(2)),
       country = Some(split(3))
     )
   })
 }

 // Apply the addressParser function to the address column
 val parsedAddress = addressDF.as[AddressData].groupByKey(_.customerId).mapGroups {
   case (customerId, addresses) => customerId -> addressParser(addresses.toSeq)
 }.toDF("customerId", "address")

 // Join the customerAccountDS and parsedAddress to create the final CustomerDocument
 val finalDF = customerAccountDS.join(parsedAddress, Seq("customerId"), "inner")
   .select(
     $"customerId",
     $"forename",
     $"surname",
     $"accounts",
     $"address"
   ).as[CustomerDocument]

 // Show the records in the final DataFrame
 finalDF.show(false)
}
DataFrame Scala Apache-Spark 序列

评论

0赞 Srinivas 8/11/2023
请附上您在代码中使用的文件。它将帮助我们在本地测试并提供解决方案。
0赞 Mohit Rane 8/11/2023
@Srinivas 这是AddressRaw.csv文件如下所示: “addressId”, “customerId”, “address” A100,C100,“100,ABC Street,MyCity,MyCountry” A200,C200,“200,ABC StreetOne,MyCityone,MyCountryone”
0赞 Mikhail Ionkin 8/12/2023
你能只阅读和打印customerAccountOutputDS吗?此外,您的拆分不适合提供的示例:例如,逗号后没有空格。

答:

0赞 Chris 8/14/2023 #1

错误消息不是很清楚,但代码方面,您不能将案例类嵌套在对象中,spark.implicit 用于生成编码器的函数将无法正确找到它们。不幸的是,这会导致运行时错误,而不是编译时。

重新编写它,以便将案例类作为顶层,例如:

 // Define your case classes
 case class AddressRawData(
   addressId: String,
   customerId: String,
   address: String
 )

 case class AddressData(
   addressId: String,
   customerId: String,
   address: String,
   number: Option[Int],
   road: Option[String],
   city: Option[String],
   country: Option[String]
 )

 case class AccountData(
   customerId: String,
   accountId: String,
   balance: Long
 )
...

object CustomerAddress extends App {
 val spark = SparkSession.builder().master("local[*]").appName("CustomerAddress").getOrCreate()
 import spark.implicits._
 Logger.getRootLogger.setLevel(Level.WARN)

....