提问人:Mohit Rane 提问时间:8/11/2023 最后编辑:Mohit Rane 更新时间:8/14/2023 访问量:62
具有 Seq 和 case 类的 Scala Spark 列解析器
Scala Spark Column Parser with Seq and case class
问:
我有一个包含三列的地址数据帧,例如: “addressId”、“customerId”、“address”。 Address.csv中的值如下所示: A100,C100,“100,ABC街,MyCity,MyCountry”。
Address AddressRaw 的案例类如下所示:
// Define your case class for AddressRaw
case class AddressRawData(
addressId: String,
customerId: String,
address: String
)
还有一个案例类,如下所示:
case class AddressData(
addressId: String,
customerId: String,
address: String,
number: Option[Int],
road: Option[String],
city: Option[String],
country: Option[String]
)
我想创建一个数据帧“AddressData”,其中包含 AddressData 中提到的所有相应列。Addressraw Dataframe 中的地址需要传递到解析器中才能获取数字、道路、城市和国家/地区。 以下是我使用的代码,它给了我错误:
Can not resolve 'number'
.它来了,因为数字不是地址数据帧的一部分。
这是我尝试过的示例代码:
// ... (other imports and definitions)
object CustomerAddress extends App {
val spark = SparkSession.builder().master("local[*]").appName("CustomerAddress").getOrCreate()
import spark.implicits._
Logger.getRootLogger.setLevel(Level.WARN)
// Define your case classes
case class AddressRawData(
addressId: String,
customerId: String,
address: String
)
case class AddressData(
addressId: String,
customerId: String,
address: String,
number: Option[Int],
road: Option[String],
city: Option[String],
country: Option[String]
)
case class AccountData(
customerId: String,
accountId: String,
balance: Long
)
case class CustomerAccountOutput(
customerId: String,
forename: String,
surname: String,
accounts: Seq[AccountData]
)
// ... (addressParser and other definitions)
val addressDF: DataFrame = spark.read.option("header", "false").csv("src/main/resources/address_data.csv")
.toDF("addressId", "customerId", "address")
val customerAccountDS = spark.read.parquet("src/main/resources/customerAccountOutputDS.parquet").as[CustomerAccountOutput]
// Define your addressParser function
def addressParser(unparsedAddress: Seq[AddressData]): Seq[AddressData] = {
unparsedAddress.map(address => {
val split = address.address.split(", ")
address.copy(
number = Some(split(0).toInt),
road = Some(split(1)),
city = Some(split(2)),
country = Some(split(3))
)
})
}
// Apply the addressParser function to the address column
val parsedAddress = addressDF.as[AddressData].groupByKey(_.customerId).mapGroups {
case (customerId, addresses) => customerId -> addressParser(addresses.toSeq)
}.toDF("customerId", "address")
// Join the customerAccountDS and parsedAddress to create the final CustomerDocument
val finalDF = customerAccountDS.join(parsedAddress, Seq("customerId"), "inner")
.select(
$"customerId",
$"forename",
$"surname",
$"accounts",
$"address"
).as[CustomerDocument]
// Show the records in the final DataFrame
finalDF.show(false)
}
答:
0赞
Chris
8/14/2023
#1
错误消息不是很清楚,但代码方面,您不能将案例类嵌套在对象中,spark.implicit 用于生成编码器的函数将无法正确找到它们。不幸的是,这会导致运行时错误,而不是编译时。
重新编写它,以便将案例类作为顶层,例如:
// Define your case classes
case class AddressRawData(
addressId: String,
customerId: String,
address: String
)
case class AddressData(
addressId: String,
customerId: String,
address: String,
number: Option[Int],
road: Option[String],
city: Option[String],
country: Option[String]
)
case class AccountData(
customerId: String,
accountId: String,
balance: Long
)
...
object CustomerAddress extends App {
val spark = SparkSession.builder().master("local[*]").appName("CustomerAddress").getOrCreate()
import spark.implicits._
Logger.getRootLogger.setLevel(Level.WARN)
....
评论