提问人:John 提问时间:9/19/2008 最后编辑:John 更新时间:6/21/2022 访问量:68639
python 中数据库连接池的最佳解决方案是什么?
What is the best solution for database connection pooling in python?
问:
我开发了一些自定义的类似 DAO 的类来满足我的项目的一些非常专业的要求,这是一个不在任何类型的框架中运行的服务器端进程。
该解决方案效果很好,除了每次发出新请求时,我都会通过 MySQLdb.connect 打开一个新连接。
将其切换到在 python 中使用连接池的最佳“插入式”解决方案是什么?我正在想象类似于 Java 的 commons DBCP 解决方案。
该进程运行时间很长,并且有许多线程需要发出请求,但不是同时发出请求......具体来说,在短暂地写出大部分结果之前,他们做了相当多的工作。
编辑添加: 经过一番搜索,我找到了 anitpool.py 看起来不错的,但由于我对 python 相对较新,我想我只是想确保我没有错过一个更明显/更惯用/更好的解决方案。
答:
包装连接类。
设置建立的连接数限制。 返回未使用的连接。 Intercept close 以释放连接。
更新: 我在 dbpool.py 中放了这样的东西:
import sqlalchemy.pool as pool
import MySQLdb as mysql
mysql = pool.manage(mysql)
评论
IMO,“更明显/更惯用/更好的解决方案”是使用现有的 ORM,而不是发明类似 DAO 的类。
在我看来,ORM 比“原始”SQL 连接更受欢迎。为什么?因为 Python 是 OO,从 SQL 行到对象的映射是绝对必要的。处理未映射到 Python 对象的 SQL 行的用例并不多。
我认为SQLAlchemy或SQLObject(以及相关的连接池)是更惯用的Pythonic解决方案。
池化作为单独的功能并不常见,因为纯 SQL(没有对象映射)对于受益于连接池的复杂、长时间运行的进程来说不是很受欢迎。是的,使用纯 SQL,但它始终用于更简单或更受控的应用程序,在这些应用程序中,池化没有帮助。
我认为您可能有两种选择:
- 修改类以使用 SQLAlchemy 或 SQLObject。虽然这乍一看很痛苦(所有的工作都浪费了),但您应该能够利用所有的设计和思想。它只是采用广泛使用的 ORM 和池化解决方案的练习。
- 使用您概述的算法推出您自己的简单连接池 - 您循环浏览的简单连接集或列表。
评论
在MySQL中?
我想说的是不要为连接池而烦恼。它们通常是麻烦的根源,而对于MySQL,它们不会给你带来你所希望的性能优势。从政治上讲,这条路可能需要付出很多努力,因为在这个领域有太多的最佳实践挥手和教科书式的措辞,关于连接池的优势。
连接池只是无状态应用程序(例如 HTTP 协议)的后 Web 时代和有状态的长寿命批处理应用程序的前 Web 时代之间的桥梁。由于在Web之前的数据库中,连接非常昂贵(因为过去没有人太关心建立连接需要多长时间),因此后Web应用程序设计了这种连接池方案,以便每次点击都不会在RDBMS上产生如此巨大的处理开销。
由于MySQL更像是Web时代的RDBMS,因此连接非常轻量级和快速。我编写了许多大容量的 Web 应用程序,这些应用程序根本不对 MySQL 使用连接池。
这是一个复杂的问题,只要没有政治障碍需要克服,你可能会从中受益。
评论
我一直在寻找同样的东西。
如果您的应用程序决定开始使用多线程,那么创建自己的连接池是一个坏主意。为多线程应用程序创建连接池比为单线程应用程序创建连接池要复杂得多。在这种情况下,您可以使用类似 PySQLPool 的东西。
如果您正在寻找性能,使用 ORM 也是一个坏主意。
如果您要处理必须处理大量选择、插入、 同时更新和删除,那么您将需要性能,这意味着您需要编写自定义 SQL 来优化查找和锁定时间。使用 ORM,您通常没有这种灵活性。
所以基本上,是的,你可以创建自己的连接池并使用 ORM,但前提是你确定你不需要我刚才描述的任何内容。
旧线程,但对于通用池(连接或任何昂贵的对象),我使用类似的东西:
def pool(ctor, limit=None):
local_pool = multiprocessing.Queue()
n = multiprocesing.Value('i', 0)
@contextlib.contextmanager
def pooled(ctor=ctor, lpool=local_pool, n=n):
# block iff at limit
try: i = lpool.get(limit and n.value >= limit)
except multiprocessing.queues.Empty:
n.value += 1
i = ctor()
yield i
lpool.put(i)
return pooled
它构造懒惰,有一个可选的限制,并且应该泛化到我能想到的任何用例。当然,这是假设你真的需要任何资源的池化,而对于许多现代SQL来说,你可能不需要。用法:
# in main:
my_pool = pool(lambda: do_something())
# in thread:
with my_pool() as my_obj:
my_obj.do_something()
这确实假设 ctor 创建的任何对象在需要时都具有适当的析构函数(某些服务器不会终止连接对象,除非它们被显式关闭)。
评论
yield i
lpool.put(i)
回复一个旧线程,但上次我检查时,MySQL提供连接池作为其驱动程序的一部分。
您可以在以下位置查看它们:
https://dev.mysql.com/doc/connector-python/en/connector-python-connection-pooling.html
从 TFA 中,假设您想显式打开连接池(如 OP 所述):
dbconfig = { "database": "test", "user":"joe" }
cnxpool = mysql.connector.pooling.MySQLConnectionPool(pool_name = "mypool",pool_size = 3, **dbconfig)
然后,通过 get_connection() 函数从池中请求来访问此池。
cnx1 = cnxpool.get_connection()
cnx2 = cnxpool.get_connection()
使用方便,简单可靠。DBUtils
pip install DBUtils
我为 OpenSearch 做了它,所以你可以参考它。
from opensearchpy import OpenSearch
def get_connection():
connection = None
try:
connection = OpenSearch(
hosts=[{'host': settings.OPEN_SEARCH_HOST, 'port': settings.OPEN_SEARCH_PORT}],
http_compress=True,
http_auth=(settings.OPEN_SEARCH_USER, settings.OPEN_SEARCH_PASSWORD),
use_ssl=True,
verify_certs=True,
ssl_assert_hostname=False,
ssl_show_warn=False,
)
except Exception as error:
print("Error: Connection not established {}".format(error))
else:
print("Connection established")
return connection
class OpenSearchClient(object):
connection_pool = []
connection_in_use = []
def __init__(self):
if OpenSearchClient.connection_pool:
pass
else:
OpenSearchClient.connection_pool = [get_connection() for i in range(0, settings.CONNECTION_POOL_SIZE)]
def search_data(self, query="", index_name=settings.OPEN_SEARCH_INDEX):
available_cursor = OpenSearchClient.connection_pool.pop(0)
OpenSearchClient.connection_in_use.append(available_cursor)
response = available_cursor.search(body=query, index=index_name)
available_cursor.close()
OpenSearchClient.connection_pool.append(available_cursor)
OpenSearchClient.connection_in_use.pop(-1)
return response
评论