如何使用 pyspark 将数据写入 Azure Sql 数据库?

How to write data with pyspark to Azure Sql database?

提问人:Herwini 提问时间:11/17/2023 最后编辑:jarlhHerwini 更新时间:11/20/2023 访问量:81

问:

我已使用以下代码将数据写入 Azure SQL 数据库:

def write_to_sqldatabase(final_table, target_table):
    #Write table data into a spark dataframe
    final_table.write.format("jdbc") \
        .option("url", f"jdbc:sqlserver://{SERVER};databaseName={DATABASE}") \
        .option("dbtable", f'....{target_table}') \
        .option("user", USERNAME) \
        .option("password", PASSWORD) \
        .mode("append") \
        .save()

这不再起作用了。我现在收到以下错误:

Py4JJavaError:调用 o11913.save 时出错。: org.apache.spark.SparkException:作业因阶段失败而中止: 阶段 63.0 中的任务 2 失败了 4 次,最近一次失败:丢失的任务 2.3 在阶段 63.0 (TID 322) (vm-32517978 执行程序 1) 中:java.sql.BatchUpdateException:违反 PRIMARY KEY 约束 'PK_tblSFDC_Account'。无法在对象中插入重复的键 '.....表'。重复的键值为 (0xxxxxx)。

以前我也遇到过这个问题,但通过截断sql数据库中的表来解决它。

现在我尝试做同样的事情,但我保留了违规的错误。

这是怎么回事?

pyspark azure-sql-database

评论

0赞 Thom A 11/17/2023
您正在尝试使用重复值。据推测,用户已经存在,因此再次尝试会导致失败。INSERTINSERT
0赞 Herwini 11/17/2023
是的,这就是错误所说的。但是 sql 数据库中的表是空的。并且我要写入 sql 数据库的数据没有重复值。
0赞 Thom A 11/17/2023
如果表为空,则表示您正在执行的数据在主键列中有 2+ 行具有相同的值。INSERT
0赞 Thom A 11/17/2023
举个简化的例子:db<>fiddle 您需要检查您正在 ing 的数据,并确保您的列没有重复项。INSERTPRIMARY KEY
0赞 Deepak 11/18/2023
由于加载前的截断,表可能为空,但如果表具有 PRIMARY KEY,请使用主键检查您的列,请按照以下步骤操作 SET IDENTITY_INSERT <YourTable> ON 插入数据 SET IDENTITY_INSERT <YourTable> OFF

答:

0赞 DileeprajnarayanThumula 11/18/2023 #1

违反 PRIMARY KEY 约束“PK_tblSFDC_Account”。无法在对象“.....”中插入重复的密钥表'。重复的键值为 (0xxxxxx)。

您看到的错误消息表明 SQL Server 数据库中的主键约束存在问题。此约束用于确保表中的每一行都是唯一的,并且可以通过唯一键进行标识。错误消息表明存在违反此约束的情况,这意味着表中存在重复的行或具有非唯一键的行。您可能需要查看数据,并确保每一行都具有唯一的键值。

我同意@ThomA当您在尝试将数据插入空表时遇到此错误时,这意味着您尝试添加的信息在指定为主键的列中具有多个具有相同值的行。 主键确保每一行都是不同的,并且可以通过唯一的键进行识别。 由于表最初是空的,因此该错误表示您尝试插入的数据与此唯一性要求相矛盾。 您的数据中似乎存在具有重复键值的行。

我正在插入SQL数据库中的空表:示例数据: enter image description here

data = [(1, "Movie 1"), (2, "Movie 2"), (1, "Duplicate Movie")]
columns = ["movie_id", "title"]

(1、“Duplicate Movie”)是具有相同PK的重复记录。

final_table = final_table.dropDuplicates(["movie_id"])
target_table = "movies"
final_table.write.format("jdbc") \
    .option("url", jdbcUrl) \
    .option("dbtable", target_table) \
    .option("user", connectionProperties["user"]) \
    .option("password", connectionProperties["password"]) \
    .mode("append") \
    .save()

final_table = final_table.dropDuplicates([“movie_id”])

如果数据帧中恰好存在任何重复项,此过程将确保将其删除。

结果: enter image description here

在 SQL 中,您还可以使用 (CTE) 来删除重复的行:

WITH CTE AS (
    SELECT 
        movie_id,
        title,
        ROW_NUMBER() OVER (PARTITION BY movie_id ORDER BY (SELECT NULL)) AS RowNum
    FROM [dbo].[movies]
)
DELETE FROM CTE WHERE RowNum > 1;