SQLAlchemy 重复键更新时

SQLAlchemy ON DUPLICATE KEY UPDATE

提问人:MrD 提问时间:7/7/2011 更新时间:2/23/2023 访问量:76211

问:

在SQLAlchemy中有一种优雅的方法可以做吗?我的意思是语法类似于 ?INSERT ... ON DUPLICATE KEY UPDATEinserter.insert().execute(list_of_dictionaries)

python mysql sql炼金术

评论


答:

57赞 phsource 5/12/2012 #1

ON DUPLICATE KEY UPDATEMySQL 的 post version-1.2

此功能现在仅内置于 SQLAlchemy for MySQL 中。SomadA141 下面的答案有最好的解决方案: https://stackoverflow.com/a/48373874/319066

ON DUPLICATE KEY UPDATE在 SQL 语句中

如果希望生成的 SQL 实际包含 ,最简单的方法是使用装饰器。ON DUPLICATE KEY UPDATE@compiles

示例代码(从 reddit 上关于该主题的良好线程链接)可以在 github 上找到:

from sqlalchemy.ext.compiler import compiles
from sqlalchemy.sql.expression import Insert

@compiles(Insert)
def append_string(insert, compiler, **kw):
    s = compiler.visit_insert(insert, **kw)
    if 'append_string' in insert.kwargs:
        return s + " " + insert.kwargs['append_string']
    return s


my_connection.execute(my_table.insert(append_string = 'ON DUPLICATE KEY UPDATE foo=foo'), my_values)

但请注意,在这种方法中,您必须手动创建append_string。您可以更改 append_string 函数,以便它自动将插入字符串更改为带有“ON DUPLICATE KEY UPDATE”字符串的插入,但由于懒惰,我不会在这里这样做。

ON DUPLICATE KEY UPDATEORM 中的功能

SQLAlchemy 不在其 ORM 层中提供接口或任何其他类似功能。尽管如此,它具有 session.merge() 函数,仅当相关键是主键时,该函数才能复制该功能。ON DUPLICATE KEY UPDATEMERGE

session.merge(ModelObject)首先,通过发送查询(或在本地查找)来检查是否存在具有相同主键值的行。如果是这样,它将在某处设置一个标志,指示 ModelObject 已在数据库中,并且 SQLAlchemy 应使用查询。请注意,合并比这复杂得多,但它很好地复制了主键的功能。SELECTUPDATE

但是,如果您想要使用非主键(例如,另一个唯一键)的功能,该怎么办?不幸的是,SQLAlchemy没有任何这样的功能。相反,你必须创建类似于 Django 的 .另一个 StackOverflow 答案涵盖了它,为了方便起见,我将在这里粘贴一个修改后的工作版本。ON DUPLICATE KEY UPDATEget_or_create()

def get_or_create(session, model, defaults=None, **kwargs):
    instance = session.query(model).filter_by(**kwargs).first()
    if instance:
        return instance
    else:
        params = dict((k, v) for k, v in kwargs.iteritems() if not isinstance(v, ClauseElement))
        if defaults:
            params.update(defaults)
        instance = model(**params)
        return instance

评论

1赞 Fake Name 8/10/2015
请注意,该代码在 postgres 上不起作用(它是 9.5 中的新功能,因为 ORM 会自动将 a 附加到 inserts,这会导致无效的 SQL。append_stringON CONFLICT [IGNORE|UPDATE]RETURNING {primary key}
0赞 nhinkle 1/7/2017
这里的零件在做什么,我会在我自己的表格中用什么替换?foo=foofoo
0赞 wyx 5/27/2019
append_string不工作得到SAWarning: Can't validate argument 'append_string'; can't locate any SQLAlchemy dialect named 'append' % (k, dialect_name)
1赞 Korenz 2/15/2021
请注意,get_or_create示例暴露在并发系统上的争用条件中。相反,您应该尝试先插入,捕获密钥重复的异常并查询结果。
1赞 Frank He 8/1/2012 #2

得到了一个更简单的解决方案:

from sqlalchemy.ext.compiler import compiles
from sqlalchemy.sql.expression import Insert

@compiles(Insert)
def replace_string(insert, compiler, **kw):
    s = compiler.visit_insert(insert, **kw)
    s = s.replace("INSERT INTO", "REPLACE INTO")
    return s

my_connection.execute(my_table.insert(replace_string=""), my_values)

评论

12赞 Dennis S Hennen 3/26/2014
当心。 并做不同的事情。REPLACE INTOINSERT ... ON DUPLICATE KEY UPDATE
3赞 Naltharial 3/31/2014
值得注意的是,它删除了该行,因此此解决方案在(或任何其他事务引擎)表上通常毫无用处,因为它会阻塞大多数约束InnoDBFOREIGN KEY
0赞 algarecu 10/15/2015
它与MySql配合得很好。话虽如此,我该表上没有任何外键。
-1赞 Peter Lonjers 6/29/2013 #3

因为这些解决方案似乎都不是优雅的。一种暴力破解方法是查询该行是否存在。如果确实删除该行,然后插入,否则只需插入即可。显然涉及一些开销,但它不依赖于修改原始 sql,它适用于非 orm 的东西。

评论

17赞 strangeqargo 9/17/2017
你知道,你可以为此去DBA-Hell。
0赞 Nick 11/1/2022
@strangeqargo 作为数据库管理的新手,我很想更多地了解为什么这是一个坏主意。
1赞 strangeqargo 3/27/2023
@Nick“检查行是否存在”意味着你可以 1) 开始交易,尝试找到一行,结束交易。这很慢。2)你可能会忘记包装一个容易出错的事务(你搜索的行可能会在你搜索时出现),最好的方法是使用数据库提供的机制 - 在重复的密钥更新时(如果你有正确的密钥结构)。
0赞 Shoeb Ahmed Mogal 8/1/2015 #4

我只是使用普通 sql 作为:

insert_stmt = "REPLACE INTO tablename (column1, column2) VALUES (:column_1_bind, :columnn_2_bind) "
session.execute(insert_stmt, data)
1赞 Manoj Sahu 2/4/2016 #5

这取决于你。如果要替换,请传入前缀OR REPLACE

  def bulk_insert(self,objects,table):
    #table: Your table class and objects are list of dictionary [{col1:val1, col2:vale}] 
    for counter,row in enumerate(objects):
        inserter = table.__table__.insert(prefixes=['OR IGNORE'], values=row)
        try:
            self.db.execute(inserter)
        except Exception as E:
            print E
        if counter % 100 == 0:
            self.db.commit()                    
    self.db.commit()

在这里,提交间隔可以更改为加速或减速

2赞 sheba 12/13/2016 #6

根据 phsource 的回答,对于使用 MySQL 并在不执行语句的情况下完全覆盖同一键的数据的特定用例,可以使用以下修饰的插入表达式:DELETE@compiles

from sqlalchemy.ext.compiler import compiles
from sqlalchemy.sql.expression import Insert

@compiles(Insert)
def append_string(insert, compiler, **kw):
    s = compiler.visit_insert(insert, **kw)
    if insert.kwargs.get('on_duplicate_key_update'):
        fields = s[s.find("(") + 1:s.find(")")].replace(" ", "").split(",")
        generated_directive = ["{0}=VALUES({0})".format(field) for field in fields]
        return s + " ON DUPLICATE KEY UPDATE " + ",".join(generated_directive)
    return s

评论

0赞 phsource 12/26/2018
此示例不能很好地转义字段值。您可能应该使用内置的转义方法: stackoverflow.com/a/25107658/319066
1赞 sheba 12/28/2018
@phsource请注意,在此示例中,我们使用原始字段的值覆盖字段(引用字段名称而不是值),因此不需要转义。显然,使用 now-part-of-the-ORM 功能会更好(除非使用它不能按预期工作)INSERTINSERT FROM SELECT
37赞 somada141 1/22/2018 #7

值得一提的是,自 v1.2 版本以来,SQLAlchemy“核心”就内置了上述解决方案,可以在此处看到(下面复制的片段):

from sqlalchemy.dialects.mysql import insert

insert_stmt = insert(my_table).values(
    id='some_existing_id',
    data='inserted value')

on_duplicate_key_stmt = insert_stmt.on_duplicate_key_update(
    data=insert_stmt.inserted.data,
    status='U'
)

conn.execute(on_duplicate_key_stmt)

评论

0赞 bl79 5/21/2018
似乎它仅适用于 MySQL,请查看文档链接。
1赞 somada141 6/5/2018
是的,我应该澄清一下。以上仅适用于 MySQL,例如 Postgres 已经有一段时间了,docs.sqlalchemy.org/en/latest/dialects/... 和 docs.sqlalchemy.org/en/latest/dialects/...
5赞 sheba 11/28/2018
这也适用于值数组,如果有人需要该功能。这意味着它也接受 s 的对象。valueslistdict
2赞 Kailegh 2/11/2020
这也适用于批量更新吗?因为我还没有设法让它工作
1赞 M.Abulsoud 4/22/2020
我有一个唯一的索引和一个自动递增的 id 主键。?就我而言,id是不断焚烧,如何解决这个问题?
2赞 Nick 8/26/2020 #8

我的方式

import typing
from datetime import datetime
from sqlalchemy.dialects import mysql

class MyRepository:

    def model(self):
        return MySqlAlchemyModel

    def upsert(self, data: typing.List[typing.Dict]):
        if not data:
            return
        model = self.model()
        if hasattr(model, 'created_at'):
            for item in data:
                item['created_at'] = datetime.now()

        stmt = mysql.insert(getattr(model, '__table__')).values(data)
        for_update = []
        for k, v in data[0].items():
            for_update.append(k)

        dup = {k: getattr(stmt.inserted, k) for k in for_update}
        stmt = stmt.on_duplicate_key_update(**dup)
        self.db.session.execute(stmt)
        self.db.session.commit()

用法:

myrepo.upsert([
    {
        "field11": "value11",
        "field21": "value21",
        "field31": "value31",
    },
    {
        "field12": "value12",
        "field22": "value22",
        "field32": "value32",
    },
])
1赞 totalhack 11/25/2020 #9

其他答案已经涵盖了这一点,但我想我会引用我在这个要点中找到的另一个很好的 mysql 示例。这还包括使用 ,这可能很有用,具体取决于您的 innodb 自动增量设置以及您的表是否具有唯一键。此处的代码为便于参考,但如果您觉得有用,请给作者一颗星。LAST_INSERT_ID

from app import db
from sqlalchemy import func
from sqlalchemy.dialects.mysql import insert

def upsert(model, insert_dict):
    """model can be a db.Model or a table(), insert_dict should contain a primary or unique key."""
    inserted = insert(model).values(**insert_dict)
    upserted = inserted.on_duplicate_key_update(
        id=func.LAST_INSERT_ID(model.id), **{k: inserted.inserted[k]
                               for k, v in insert_dict.items()})
    res = db.engine.execute(upserted)
    return res.lastrowid
1赞 Xuemin LU 10/30/2022 #10

ORM 使用基于upseton_duplicate_key_update

class Model():
    __input_data__ = dict()

    def __init__(self, **kwargs) -> None:
        self.__input_data__ = kwargs
        self.session = Session(engine)

    def save(self):
        self.session.add(self)
        self.session.commit()
    
    def upsert(self, *, ingore_keys = []):
        column_keys = self.__table__.columns.keys()

        udpate_data = dict()
        for key in self.__input_data__.keys():
            if key not in column_keys:
                continue
            else:
                udpate_data[key] = self.__input_data__[key]

        insert_stmt = insert(self.__table__).values(**udpate_data)

        all_ignore_keys = ['id']
        if isinstance(ingore_keys, list):
            all_ignore_keys =[*all_ignore_keys, *ingore_keys]
        else:
            all_ignore_keys.append(ingore_keys)

        udpate_columns = dict()
        for key in self.__input_data__.keys():
            if key not in column_keys or key in all_ignore_keys:
                continue
            else:
                udpate_columns[key] = insert_stmt.inserted[key]
        
        on_duplicate_key_stmt = insert_stmt.on_duplicate_key_update(
            **udpate_columns
        )
        # self.session.add(self)
        self.session.execute(on_duplicate_key_stmt)
        self.session.commit()

class ManagerAssoc(ORM_Base, Model):
    def __init__(self, **kwargs):
        self.id = idWorker.get_id()
        column_keys = self.__table__.columns.keys()
        udpate_data = dict()
        for key in kwargs.keys():
            if key not in column_keys:
                continue
            else:
                udpate_data[key] = kwargs[key]
        ORM_Base.__init__(self, **udpate_data)
        Model.__init__(self, **kwargs, id = self.id)

   ....
# you can call it as following:
manager_assoc.upsert()
manager.upsert(ingore_keys = ['manager_id'])

评论

0赞 chrisinmtown 2/18/2023
如果这太挑剔了,很抱歉,也许你想纠正一下?udpateupdate
1赞 chrisinmtown 2/23/2023 #11

2023 年 2 月更新:SQLAlchemy 版本 2 最近发布并支持 MySQL 方言。非常感谢 SQLAlchemy 项目的 Federico Caselli,他在 https://github.com/sqlalchemy/sqlalchemy/discussions/9328 上的讨论中帮助我开发了示例代码on_duplicate_key_update

请参阅 https://stackoverflow.com/a/75538576/1630244

如果可以发布两次相同的答案 (?),这是我的小型独立代码示例:

import sqlalchemy as db
import sqlalchemy.dialects.mysql as mysql
from sqlalchemy import delete, select, String
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column


class Base(DeclarativeBase):
    pass


class User(Base):
    __tablename__ = "foo"
    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str] = mapped_column(String(30))


engine = db.create_engine('mysql+mysqlconnector://USER-NAME-HERE:PASS-WORD-HERE@localhost/SCHEMA-NAME-HERE')
conn = engine.connect()

# setup step 0 - ensure the table exists
Base().metadata.create_all(bind=engine)

# setup step 1 - clean out rows with id 1..5
del_stmt = delete(User).where(User.id.in_([1, 2, 3, 4, 5]))
conn.execute(del_stmt)
conn.commit()
sel_stmt = select(User)
users = list(conn.execute(sel_stmt))
print(f'Table size after cleanout: {len(users)}')

# setup step 2 - insert 4 rows
ins_stmt = mysql.insert(User).values(
    [
        {"id": 1, "name": "x"},
        {"id": 2, "name": "y"},
        {"id": 3, "name": "w"},
        {"id": 4, "name": "z"},
    ]
)
conn.execute(ins_stmt)
conn.commit()
users = list(conn.execute(sel_stmt))
print(f'Table size after insert: {len(users)}')

# demonstrate upsert
ups_stmt = mysql.insert(User).values(
    [
        {"id": 1, "name": "xx"},
        {"id": 2, "name": "yy"},
        {"id": 3, "name": "ww"},
        {"id": 5, "name": "new"},
    ]
)
ups_stmt = ups_stmt.on_duplicate_key_update(name=ups_stmt.inserted.name)
# if you want to see the compiled result
# x = ups_stmt.compile(dialect=mysql.dialect())
# print(x.string, x.construct_params())
conn.execute(ups_stmt)
conn.commit()

users = list(conn.execute(sel_stmt))
print(f'Table size after upsert: {len(users)}')