提问人:Joseph Hwang 提问时间:7/24/2023 最后编辑:Joseph Hwang 更新时间:7/27/2023 访问量:109
如何将架构设置为 spark.sql.function.from_csv?
How to set schema into spark.sql.function.from_csv?
问:
我在 Windows 3.4.1 上使用 spark-3.4.1-hadoop3-hadoop1。我尝试生成要传递到函数参数from_csv架构。 以下是我的代码。
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.from_csv;
import static org.apache.spark.sql.functions.not;
import java.util.HashMap;
import java.util.Map;
SparkSession spark = SparkSession.builder().appName("FromCsvStructExample").getOrCreate();
Dataset<Row> df = spark.read().format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load("/path/to/csv/file");
Map<String, String> options = new HashMap<String, String>();
String schemaString = "name string, age int, job string";
Column schema = from_csv(col("csv"), col(schemaString), options);
Dataset<Row> parsed = df.select(schema.as("data"));
parsed.printSchema();
spark.close();
但是这些代码会引发以下异常。
Exception in thread "main" org.apache.spark.sql.AnalysisException: [INVALID_SCHEMA.NON_STRING_LITERAL] The input schema "name string, age int, job string" is not a valid schema string. The input expression must be string literal and not null.
at org.apache.spark.sql.errors.QueryCompilationErrors$.unexpectedSchemaTypeError(QueryCompilationErrors.scala:1055)
at org.apache.spark.sql.catalyst.expressions.ExprUtils$.evalTypeExpr(ExprUtils.scala:42)
at org.apache.spark.sql.catalyst.expressions.ExprUtils$.evalSchemaExpr(ExprUtils.scala:47)
at org.apache.spark.sql.catalyst.expressions.CsvToStructs.<init>(csvExpressions.scala:72)
at org.apache.spark.sql.functions$.from_csv(functions.scala:4955)
at org.apache.spark.sql.functions.from_csv(functions.scala)
at com.aaa.etl.processor.Test_CSV.main(Test_CSV.java:43)
恐怕schemaString对于函数不正确。请告诉我如何使用函数生成架构。我知道架构参数类型为 StructType from_csv函数重载。但是在使用这个函数时,我必须制作 scala 函数,我什至没有 scala 的基本知识。org.apache.spark.sql.functions.col
org.apache.spark.sql.functions.col
== 更新的零件
我尝试使用特定于 Java 的from_csv方法。
from_csv(Column e, Column schema, java.util.Map<String,String> options)
如您所知,架构的类型不是 StructType,而是 Column。我被困在这部分。我不知道如何在 java 中生成列类型模式。 如果您有任何生成java列类型模式的参考,请告诉我如何。
答:
1赞
abiratsis
7/26/2023
#1
你是对的,你不能直接生成给定的 DDL 字符串。一种方法是使用 lit 或 StructType.fromDDL 函数。正如您已经提到的from_csv函数的一个签名接受架构的 StructType。然后 Scala 代码将如下所示:Column
import org.apache.spark.sql.types.StructType
var schema: StructType = StructType.fromDDL("name string, age int, job string")
// StructType(
// StructField(name,StringType,true),
// StructField(age,IntegerType,true),
// StructField(job,StringType,true)
// )
val targetCol = from_csv(col("csv"), schema, options)
Java 的代码应该非常相似。
根据 from_csv 的另一个签名,它接受 Column 而不是 ,它与相应的单元测试中所示的函数结合使用。这适用于您希望将架构作为字符串传递的情况。StructType
lit
对于您的情况,这将是:
val schema = "name string, age int, job string"
val targetCol = from_csv(col("csv"), lit(schema), options)
评论
0赞
Joseph Hwang
7/27/2023
谢谢你的回复,@abiratsis。我还有另一个问题,请检查我更新的部分。
0赞
Bernhard Stadler
7/27/2023
通常,如果您的原始问题已得到回答,您应该将其标记为已回答,而不是添加另一个子问题,而是为此打开另一个问题。
0赞
abiratsis
7/27/2023
@JosephHwang请检查我更新的答案,我为接受列作为架构的 from_csv 版本添加了一些其他详细信息
1赞
Joseph Hwang
7/28/2023
它有效!非常感谢。
上一个:重定向到其他网站
评论