提问人:MrD 提问时间:7/7/2011 更新时间:2/23/2023 访问量:76211
SQLAlchemy 重复键更新时
SQLAlchemy ON DUPLICATE KEY UPDATE
问:
在SQLAlchemy中有一种优雅的方法可以做吗?我的意思是语法类似于 ?INSERT ... ON DUPLICATE KEY UPDATE
inserter.insert().execute(list_of_dictionaries)
答:
ON DUPLICATE KEY UPDATE
MySQL 的 post version-1.2
此功能现在仅内置于 SQLAlchemy for MySQL 中。SomadA141 下面的答案有最好的解决方案: https://stackoverflow.com/a/48373874/319066
ON DUPLICATE KEY UPDATE
在 SQL 语句中
如果希望生成的 SQL 实际包含 ,最简单的方法是使用装饰器。ON DUPLICATE KEY UPDATE
@compiles
示例代码(从 reddit 上关于该主题的良好线程链接)可以在 github 上找到:
from sqlalchemy.ext.compiler import compiles
from sqlalchemy.sql.expression import Insert
@compiles(Insert)
def append_string(insert, compiler, **kw):
s = compiler.visit_insert(insert, **kw)
if 'append_string' in insert.kwargs:
return s + " " + insert.kwargs['append_string']
return s
my_connection.execute(my_table.insert(append_string = 'ON DUPLICATE KEY UPDATE foo=foo'), my_values)
但请注意,在这种方法中,您必须手动创建append_string。您可以更改 append_string 函数,以便它自动将插入字符串更改为带有“ON DUPLICATE KEY UPDATE”字符串的插入,但由于懒惰,我不会在这里这样做。
ON DUPLICATE KEY UPDATE
ORM 中的功能
SQLAlchemy 不在其 ORM 层中提供接口或任何其他类似功能。尽管如此,它具有 session.merge()
函数,仅当相关键是主键时,该函数才能复制该功能。ON DUPLICATE KEY UPDATE
MERGE
session.merge(ModelObject)
首先,通过发送查询(或在本地查找)来检查是否存在具有相同主键值的行。如果是这样,它将在某处设置一个标志,指示 ModelObject 已在数据库中,并且 SQLAlchemy 应使用查询。请注意,合并比这复杂得多,但它很好地复制了主键的功能。SELECT
UPDATE
但是,如果您想要使用非主键(例如,另一个唯一键)的功能,该怎么办?不幸的是,SQLAlchemy没有任何这样的功能。相反,你必须创建类似于 Django 的 .另一个 StackOverflow 答案涵盖了它,为了方便起见,我将在这里粘贴一个修改后的工作版本。ON DUPLICATE KEY UPDATE
get_or_create()
def get_or_create(session, model, defaults=None, **kwargs):
instance = session.query(model).filter_by(**kwargs).first()
if instance:
return instance
else:
params = dict((k, v) for k, v in kwargs.iteritems() if not isinstance(v, ClauseElement))
if defaults:
params.update(defaults)
instance = model(**params)
return instance
评论
append_string
ON CONFLICT [IGNORE|UPDATE]
RETURNING {primary key}
foo=foo
foo
append_string
不工作得到SAWarning: Can't validate argument 'append_string'; can't locate any SQLAlchemy dialect named 'append' % (k, dialect_name)
得到了一个更简单的解决方案:
from sqlalchemy.ext.compiler import compiles
from sqlalchemy.sql.expression import Insert
@compiles(Insert)
def replace_string(insert, compiler, **kw):
s = compiler.visit_insert(insert, **kw)
s = s.replace("INSERT INTO", "REPLACE INTO")
return s
my_connection.execute(my_table.insert(replace_string=""), my_values)
评论
REPLACE INTO
INSERT ... ON DUPLICATE KEY UPDATE
InnoDB
FOREIGN KEY
因为这些解决方案似乎都不是优雅的。一种暴力破解方法是查询该行是否存在。如果确实删除该行,然后插入,否则只需插入即可。显然涉及一些开销,但它不依赖于修改原始 sql,它适用于非 orm 的东西。
评论
我只是使用普通 sql 作为:
insert_stmt = "REPLACE INTO tablename (column1, column2) VALUES (:column_1_bind, :columnn_2_bind) "
session.execute(insert_stmt, data)
这取决于你。如果要替换,请传入前缀OR REPLACE
def bulk_insert(self,objects,table):
#table: Your table class and objects are list of dictionary [{col1:val1, col2:vale}]
for counter,row in enumerate(objects):
inserter = table.__table__.insert(prefixes=['OR IGNORE'], values=row)
try:
self.db.execute(inserter)
except Exception as E:
print E
if counter % 100 == 0:
self.db.commit()
self.db.commit()
在这里,提交间隔可以更改为加速或减速
根据 phsource 的回答,对于使用 MySQL 并在不执行语句的情况下完全覆盖同一键的数据的特定用例,可以使用以下修饰的插入表达式:DELETE
@compiles
from sqlalchemy.ext.compiler import compiles
from sqlalchemy.sql.expression import Insert
@compiles(Insert)
def append_string(insert, compiler, **kw):
s = compiler.visit_insert(insert, **kw)
if insert.kwargs.get('on_duplicate_key_update'):
fields = s[s.find("(") + 1:s.find(")")].replace(" ", "").split(",")
generated_directive = ["{0}=VALUES({0})".format(field) for field in fields]
return s + " ON DUPLICATE KEY UPDATE " + ",".join(generated_directive)
return s
评论
INSERT
INSERT FROM SELECT
值得一提的是,自 v1.2 版本以来,SQLAlchemy“核心”就内置了上述解决方案,可以在此处看到(下面复制的片段):
from sqlalchemy.dialects.mysql import insert
insert_stmt = insert(my_table).values(
id='some_existing_id',
data='inserted value')
on_duplicate_key_stmt = insert_stmt.on_duplicate_key_update(
data=insert_stmt.inserted.data,
status='U'
)
conn.execute(on_duplicate_key_stmt)
评论
values
list
dict
我的方式
import typing
from datetime import datetime
from sqlalchemy.dialects import mysql
class MyRepository:
def model(self):
return MySqlAlchemyModel
def upsert(self, data: typing.List[typing.Dict]):
if not data:
return
model = self.model()
if hasattr(model, 'created_at'):
for item in data:
item['created_at'] = datetime.now()
stmt = mysql.insert(getattr(model, '__table__')).values(data)
for_update = []
for k, v in data[0].items():
for_update.append(k)
dup = {k: getattr(stmt.inserted, k) for k in for_update}
stmt = stmt.on_duplicate_key_update(**dup)
self.db.session.execute(stmt)
self.db.session.commit()
用法:
myrepo.upsert([
{
"field11": "value11",
"field21": "value21",
"field31": "value31",
},
{
"field12": "value12",
"field22": "value22",
"field32": "value32",
},
])
其他答案已经涵盖了这一点,但我想我会引用我在这个要点中找到的另一个很好的 mysql 示例。这还包括使用 ,这可能很有用,具体取决于您的 innodb 自动增量设置以及您的表是否具有唯一键。此处的代码为便于参考,但如果您觉得有用,请给作者一颗星。LAST_INSERT_ID
from app import db
from sqlalchemy import func
from sqlalchemy.dialects.mysql import insert
def upsert(model, insert_dict):
"""model can be a db.Model or a table(), insert_dict should contain a primary or unique key."""
inserted = insert(model).values(**insert_dict)
upserted = inserted.on_duplicate_key_update(
id=func.LAST_INSERT_ID(model.id), **{k: inserted.inserted[k]
for k, v in insert_dict.items()})
res = db.engine.execute(upserted)
return res.lastrowid
ORM 使用基于upset
on_duplicate_key_update
class Model():
__input_data__ = dict()
def __init__(self, **kwargs) -> None:
self.__input_data__ = kwargs
self.session = Session(engine)
def save(self):
self.session.add(self)
self.session.commit()
def upsert(self, *, ingore_keys = []):
column_keys = self.__table__.columns.keys()
udpate_data = dict()
for key in self.__input_data__.keys():
if key not in column_keys:
continue
else:
udpate_data[key] = self.__input_data__[key]
insert_stmt = insert(self.__table__).values(**udpate_data)
all_ignore_keys = ['id']
if isinstance(ingore_keys, list):
all_ignore_keys =[*all_ignore_keys, *ingore_keys]
else:
all_ignore_keys.append(ingore_keys)
udpate_columns = dict()
for key in self.__input_data__.keys():
if key not in column_keys or key in all_ignore_keys:
continue
else:
udpate_columns[key] = insert_stmt.inserted[key]
on_duplicate_key_stmt = insert_stmt.on_duplicate_key_update(
**udpate_columns
)
# self.session.add(self)
self.session.execute(on_duplicate_key_stmt)
self.session.commit()
class ManagerAssoc(ORM_Base, Model):
def __init__(self, **kwargs):
self.id = idWorker.get_id()
column_keys = self.__table__.columns.keys()
udpate_data = dict()
for key in kwargs.keys():
if key not in column_keys:
continue
else:
udpate_data[key] = kwargs[key]
ORM_Base.__init__(self, **udpate_data)
Model.__init__(self, **kwargs, id = self.id)
....
# you can call it as following:
manager_assoc.upsert()
manager.upsert(ingore_keys = ['manager_id'])
评论
udpate
update
2023 年 2 月更新:SQLAlchemy 版本 2 最近发布并支持 MySQL 方言。非常感谢 SQLAlchemy 项目的 Federico Caselli,他在 https://github.com/sqlalchemy/sqlalchemy/discussions/9328 上的讨论中帮助我开发了示例代码on_duplicate_key_update
请参阅 https://stackoverflow.com/a/75538576/1630244
如果可以发布两次相同的答案 (?),这是我的小型独立代码示例:
import sqlalchemy as db
import sqlalchemy.dialects.mysql as mysql
from sqlalchemy import delete, select, String
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
class Base(DeclarativeBase):
pass
class User(Base):
__tablename__ = "foo"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(30))
engine = db.create_engine('mysql+mysqlconnector://USER-NAME-HERE:PASS-WORD-HERE@localhost/SCHEMA-NAME-HERE')
conn = engine.connect()
# setup step 0 - ensure the table exists
Base().metadata.create_all(bind=engine)
# setup step 1 - clean out rows with id 1..5
del_stmt = delete(User).where(User.id.in_([1, 2, 3, 4, 5]))
conn.execute(del_stmt)
conn.commit()
sel_stmt = select(User)
users = list(conn.execute(sel_stmt))
print(f'Table size after cleanout: {len(users)}')
# setup step 2 - insert 4 rows
ins_stmt = mysql.insert(User).values(
[
{"id": 1, "name": "x"},
{"id": 2, "name": "y"},
{"id": 3, "name": "w"},
{"id": 4, "name": "z"},
]
)
conn.execute(ins_stmt)
conn.commit()
users = list(conn.execute(sel_stmt))
print(f'Table size after insert: {len(users)}')
# demonstrate upsert
ups_stmt = mysql.insert(User).values(
[
{"id": 1, "name": "xx"},
{"id": 2, "name": "yy"},
{"id": 3, "name": "ww"},
{"id": 5, "name": "new"},
]
)
ups_stmt = ups_stmt.on_duplicate_key_update(name=ups_stmt.inserted.name)
# if you want to see the compiled result
# x = ups_stmt.compile(dialect=mysql.dialect())
# print(x.string, x.construct_params())
conn.execute(ups_stmt)
conn.commit()
users = list(conn.execute(sel_stmt))
print(f'Table size after upsert: {len(users)}')
评论