提问人:Balaji 提问时间:11/16/2023 最后编辑:Balaji 更新时间:11/23/2023 访问量:80
如何使用 python 以更快的方式获取 postgres 表中新添加的行
How to fetch newly added rows in postgres tables in faster way using python
问:
我们有一个 postgres 数据库表,每秒 50 行将从实时应用程序连续添加到该表中。现在我们已经使用 python 脚本读取了每个新添加的行(一次一行)。是否可以读取所有行而不会遗漏任何行?
如果可能的话,请帮助我使用 python 脚本从 postgres 数据库中读取实时更新的行(50 行/秒)。
尝试了下面的脚本。
print('\nConnecting to the PostgreSQL database...\n')
conn = psycopg2.connect(**params)
# create a cursor
cur = conn.cursor()
while TRUE:
cur.execute('SELECT * from postgrestable ORDER BY TIMESTAMP DESC LIMIT 1')
result = cur.fetchone();
print(result)
#Commit your changes in the database
conn.commit()
响应:数据库访问正在工作,但我能够每到几秒钟获得一行。每 2 秒,我的数据库 ID 就会获得 100 行。但我只能从 100 行中获取 1 行。
这是因为 cur.execute() 函数吗?此函数需要时间才能执行吗?
答:
试试这个更新的代码,在这段代码中,我将 OrderBy: [asc(document.columns.latestUpdatedAt)] 替换为 orderBy: [desc(document.columns.latestUpdatedAt)] 。您将首先获得最新的聊天室,因为这将根据 latestUpdatedAt 列按降序对结果集进行排序。
import psycopg2
import time
# Connection parameters
params = {
'dbname': 'your_database_name',
'user': 'your_username',
'password': 'your_password',
'host': 'your_host',
'port': 'your_port'
}
# Connect to the PostgreSQL database
print('\nConnecting to the PostgreSQL database...\n')
conn = psycopg2.connect(**params)
# Create a cursor
cur = conn.cursor()
# Initial value for the last processed primary key or timestamp
last_processed_id = 0 # Assuming the primary key is an integer
while True:
# Fetch new rows based on the primary key or timestamp
cur.execute('SELECT * FROM postgrestable WHERE id > %s ORDER BY id', (last_processed_id,))
# Fetch all new rows
new_rows = cur.fetchall()
# Process the new rows
for row in new_rows:
print(row)
# Update the last processed primary key or timestamp
last_processed_id = row[0] # Assuming the primary key is the first column
# Sleep for a short duration before checking for new rows again
time.sleep(1/50)
# Close the cursor and connection (not reached in this example due to the infinite loop)
cur.close()
conn.close()
希望它能:)
评论
您可以将 python 客户端设置为侦听
从触发器广播的 pg_notify()
...在表格上插入或更新后
,收集您的信号。
create table table_all_signals (
source_id int not null,
signal_id uuid default gen_random_uuid() primary key,
signal_timestamp timestamp not null,
value numeric not null );
create function f_signal_collector()returns trigger as $f$
begin perform pg_notify('incoming_signals_feed',to_jsonb(new)::text);
return new;
end $f$ language plpgsql;
create trigger t_signal_collector after insert or update on table_all_signals
for each row execute function f_signal_collector();
插入示例将触发一条消息,以中继到您的侦听客户端:
异步通知 “incoming_signals_feed” with payload “{”value“: 830.912026524988, ”signal_id“: ”ce2d72e0-a707-4ab5-8162-b37c0063d471“, ”source_id“: 1, ”signal_timestamp“: ”2023-11-23T09:01:09.308965“}” 从具有 PID 1209744的服务器进程接收。
要解决只能发送有效负载的事实,您可以改为仅中继客户端的唯一标识符text
notify
create or replace function f_signal_collector()returns trigger as $f$
begin perform pg_notify('incoming_signals_feed',new.signal_id::text);
return new;
end $f$ language plpgsql;
异步通知“incoming_signals_feed”,有效负载为“2a20ce9b-f077-45ec-a85a-b88fb54d6d0f”,从具有 PID 1209744的服务器进程接收。
然后让它简单地去捡起它:
select * from table_all_signals where signal_id='2a20ce9b-f077-45ec-a85a-b88fb54d6d0f';
让 psycopg2 处理绑定。
评论
where signal_id
text
notify
text
评论
cur.execute(...)
time.sleep
cur.execute("SELECT ...")