如何使用 python 以更快的方式获取 postgres 表中新添加的行

How to fetch newly added rows in postgres tables in faster way using python

提问人:Balaji 提问时间:11/16/2023 最后编辑:Balaji 更新时间:11/23/2023 访问量:80

问:

我们有一个 postgres 数据库表,每秒 50 行将从实时应用程序连续添加到该表中。现在我们已经使用 python 脚本读取了每个新添加的行(一次一行)。是否可以读取所有行而不会遗漏任何行?

如果可能的话,请帮助我使用 python 脚本从 postgres 数据库中读取实时更新的行(50 行/秒)。

尝试了下面的脚本。

    print('\nConnecting to the PostgreSQL database...\n')
    conn = psycopg2.connect(**params)
    
    # create a cursor
    cur = conn.cursor()
    
    while TRUE:
        cur.execute('SELECT * from postgrestable ORDER BY TIMESTAMP DESC LIMIT 1')
        result = cur.fetchone();  
        print(result)

    #Commit your changes in the database
    conn.commit()

响应:数据库访问正在工作,但我能够每到几秒钟获得一行。每 2 秒,我的数据库 ID 就会获得 100 行。但我只能从 100 行中获取 1 行。

这是因为 cur.execute() 函数吗?此函数需要时间才能执行吗?

python postgresql psql

评论

3赞 Alex 11/16/2023
你为什么评论?如果你不改变任何东西,为什么你需要提交?如果你想要更快,为什么需要?cur.execute(...)time.sleep
0赞 snakecharmerb 11/16/2023
@Alex OP 需要提交才能获取新添加的行(尽管将连接配置为使用自动提交可能会更好,具体取决于它们的用例)。
1赞 Alex 11/16/2023
@snakecharmerb他不需要提交,他已经连接到数据库。仅针对更改提交。
0赞 Alex 11/16/2023
如果你想在循环中获取新行,那么你需要内部循环来获取新数据。您可以通过使用光标检查行数来循环表中出现的所有新行。但具体来说,每 1 秒获取 50 个新行将很难实现,因为操作可能需要 1 秒以上(有很多因素)cur.execute("SELECT ...")
0赞 Alex 11/16/2023
此外,您可能需要按选定的行进行排序,以使用 fetchone 获取新行

答:

-1赞 Maimoona Abid 11/16/2023 #1

试试这个更新的代码,在这段代码中,我将 OrderBy: [asc(document.columns.latestUpdatedAt)] 替换为 orderBy: [desc(document.columns.latestUpdatedAt)]您将首先获得最新的聊天室,因为这将根据 latestUpdatedAt 列按降序对结果集进行排序。

import psycopg2
import time

# Connection parameters
params = {
    'dbname': 'your_database_name',
    'user': 'your_username',
    'password': 'your_password',
    'host': 'your_host',
    'port': 'your_port'
}

# Connect to the PostgreSQL database
print('\nConnecting to the PostgreSQL database...\n')
conn = psycopg2.connect(**params)

# Create a cursor
cur = conn.cursor()

# Initial value for the last processed primary key or timestamp
last_processed_id = 0  # Assuming the primary key is an integer

while True:
    # Fetch new rows based on the primary key or timestamp
    cur.execute('SELECT * FROM postgrestable WHERE id > %s ORDER BY id', (last_processed_id,))
    
    # Fetch all new rows
    new_rows = cur.fetchall()
    
    # Process the new rows
    for row in new_rows:
        print(row)
        # Update the last processed primary key or timestamp
        last_processed_id = row[0]  # Assuming the primary key is the first column
    
    # Sleep for a short duration before checking for new rows again
    time.sleep(1/50)

# Close the cursor and connection (not reached in this example due to the infinite loop)
cur.close()
conn.close()

希望它能:)

评论

0赞 Balaji 11/16/2023
在我们的表 id 列中,值是相同的。ID 不会为每一行递增或更改。我们的表 id、时间戳和值中有 3 列。我已经尝试了上面提供的脚本。它没有获取最新的行。相反,它获取起始行(旧行)。
0赞 Balaji 11/16/2023
我已经编辑了我的问题,并更新了 python 脚本。请检查一下,给我一个解决方案。
0赞 jjanes 11/17/2023
这并不可靠,因为行不需要严格按照 id 的顺序可见。
0赞 Zegarek 11/23/2023 #2

您可以将 python 客户端设置为侦听从触发器广播的 pg_notify()...在表格上插入或更新后,收集您的信号。

create table table_all_signals (
    source_id int not null,
    signal_id uuid default gen_random_uuid() primary key,
    signal_timestamp timestamp not null,
    value numeric not null );

create function f_signal_collector()returns trigger as $f$
begin perform pg_notify('incoming_signals_feed',to_jsonb(new)::text);
      return new;
end $f$ language plpgsql;

create trigger t_signal_collector after insert or update on table_all_signals
for each row execute function f_signal_collector();

插入示例将触发一条消息,以中继到您的侦听客户端:

异步通知 “incoming_signals_feed” with payload “{”value“: 830.912026524988, ”signal_id“: ”ce2d72e0-a707-4ab5-8162-b37c0063d471“, ”source_id“: 1, ”signal_timestamp“: ”2023-11-23T09:01:09.308965“}” 从具有 PID 1209744的服务器进程接收。

要解决只能发送有效负载的事实,您可以改为仅中继客户端的唯一标识符textnotify

create or replace function f_signal_collector()returns trigger as $f$
begin perform pg_notify('incoming_signals_feed',new.signal_id::text);
      return new;
end $f$ language plpgsql;

异步通知“incoming_signals_feed”,有效负载为“2a20ce9b-f077-45ec-a85a-b88fb54d6d0f”,从具有 PID 1209744的服务器进程接收。

然后让它简单地去捡起它:

select * from table_all_signals where signal_id='2a20ce9b-f077-45ec-a85a-b88fb54d6d0f';

让 psycopg2 处理绑定。

评论

0赞 Balaji 11/23/2023
我想获取每个新行,而不管 SignalID 如何。我的表将以每秒 50 行的速度插入。
0赞 Zegarek 11/23/2023
@Balaji 这确实允许您获取每一行,而不管它们的 SignalID 如何 - 第一个示例直接广播整行。如果您不想直接从 广播新行,我提到这是一种解决方法。相反,您可以广播该行中唯一标识该行的字段,并让客户端捕获它们并使用它们以标准方式获取该行,而无需通过 进行交互。上面,我使 SignalID 是唯一的,但这只是一个示例 - 在你的情况下,你可以传递 singal_id 和 signal_timestamp,然后根据这些进行选择where signal_idtextnotifytext