使用 telethon 和 postgresql 插入电报数据

Inserting telegram data using telethon and postgresql

提问人:onat dicleli 提问时间:10/28/2023 最后编辑:AussieJoeonat dicleli 更新时间:10/29/2023 访问量:29

问:

我写了一个小类来处理消息并从中提取一些内容。我现在正试图使用 telethon 跟踪这些数据并插入到我的 postgresql 数据库中。我还尝试使用我没有问题的方法插入它们,从我检查的日志来看,telethon 在实时拉取中工作正常,但我无法弄清楚数据库插入有什么问题。iter_messages

    client = TelegramClient('test', api_id, api_hash)

    # Create a message queue

    message_queue = queue.Queue()

    # Message retrieval and queuing

    @client.on(events.NewMessage(chats=groups))
    async def handle_new_message(event):
    message = event.message
    \# Print the message as it comes
    print(f"New message: {message.text}")
    \# Add the message to the queue for processing
    message_queue.put(message)

    async def print_asterisk_periodically():
    while True:
    print('*-*-\*')
    await asyncio.sleep(150)

    # Message processing worker

    def message_processing_worker():
    while True:
    message = message_queue.get()
    try:

            Cdata.insert_replymentions(message)
    
    
            # Check if the message has a contract address
    
            result = Cdata.contract_message(message)
    
            if result is not None:
                print("***** I'm working *****")
                conn = psycopg2.connect(**db_params)
    
    
                # Process the result (message and contract address) here
    
                message, contract_address = result
                Cdata.insert_contract_address(contract_address,conn)
                Cdata.insert_telegram_messages(message,contract_address,conn)
                
                conn.close()
    
        except Exception as e:
            print(f"Error processing database insert: {str(e)}")
    
        finally:
            
            message_queue.task_done()

    num_workers = 2  
    workers = \[\]
    for \_ in range(num_workers):
    worker = threading.Thread(target=message_processing_worker)
    worker.start()
    workers.append(worker)

    async def main():
    await client.start()
    await asyncio.gather(client.run_until_disconnected(),print_asterisk_periodically() )

    asyncio.run(main())

    \#Functions Used

    @staticmethod
    def contract_message(message):
    contract_address = None
    cmes = None

        # Check if the message has 'message', 'chat', 'title', and 'id' attributes
        if hasattr(message, 'message') and hasattr(message, 'chat') and hasattr(message.chat, 'title') and hasattr(message, 'id'):
            
            # Directly look for the Ethereum contract address pattern
            match = re.search(r"0x[a-fA-F0-9]{40}", str(message.message))
            
            if match and "https://www.pinksale.finance/launchpad/" not in str(message.message):
                contract_address = match.group(0)
                
                if ( token_address := Cdata.return_base_token(contract_address)) is not None:
                
                    cmes=message
                    return cmes,token_address
    @staticmethod
    def insert_replymentions(message):
    
        
        
        conn = psycopg2.connect(**db_params)
        cursor = conn.cursor()
        
        cursor.execute("SELECT symbol, name FROM static_fundamental_data")
        coins = cursor.fetchall()
    
        # Exit the function if the coins query returns None
        if coins is None:
            conn.close()
            return
    
        # Filter out None values for coin symbols and names
        coin_symbols = [coin[0] for coin in coins if coin[0] is not None]
        coin_names = [coin[1] for coin in coins if coin[1] is not None]
    
        # Handle the case where the message has no text
        message_text = message.text if hasattr(message, 'text') else ''
        if not message_text:
            conn.close()
            return
        
       
        
        cursor.execute("SELECT symbol, name FROM static_fundamental_data")
        coins = cursor.fetchall()
    
        coin_symbols = [coin[0] for coin in coins]
        coin_names = [coin[1] for coin in coins]
    
        message_text = message.text if hasattr(message, 'text') else ''
        
        
        
        # Check if the coin is mentioned in the messages text
        is_coin_mentioned = any(re.search(rf'\b{re.escape(symbol)}\b', message_text, re.I) for symbol in coin_symbols) or any(re.search(rf'\b{re.escape(name)}\b', 
        message_text, re.I) for name in coin_names)
    
        #Check if the message is a reply to another message
        is_reply = bool(message.reply_to_msg_id)
        contract_address = None
        
    
        if is_reply:
            replied_message_id = message.reply_to_msg_id
            replied_mes_id = f"{message.chat.title}:-:{replied_message_id}"
            
            # Check if the replied message ID exists in the database
            cursor.execute("SELECT COUNT(*) FROM telegram_messages WHERE telethon_id = %s", (replied_mes_id,))
            count = cursor.fetchone()[0]
            is_reply_in_db = count > 0
            
            # If the replied message in the database 
            if is_reply_in_db:
                cursor.execute("SELECT telegram_messages.telethon_id FROM telegram_messages INNER JOIN static_fundamental_data ON telegram_messages.contract_address_address = static_fundamental_data.contract_address_address WHERE telegram_messages.telethon_id = %s", (replied_mes_id,))
                db_message = cursor.fetchone()
                
                if db_message:
                    cmes,contract_address = Cdata.contract_message(db_message)
    
    
    
        if is_coin_mentioned and not contract_address:
            # Find the contract address corresponding to the coin symbol or name mentioned in the message
            mentioned_symbol_or_name = next((symbol for symbol in coin_symbols if re.search(rf'\b{re.escape(symbol)}\b', message_text, re.I)), None) or \
                                    next((name for name in coin_names if re.search(rf'\b{re.escape(name)}\b', message_text, re.I)), None)
            
            #If the contract name or symbol is in the messages text fetch the contrac address
            if mentioned_symbol_or_name:
                cursor.execute("SELECT contract_address_address FROM static_fundamental_data WHERE symbol = %s OR name = %s", (mentioned_symbol_or_name, mentioned_symbol_or_name))
                contract_address = cursor.fetchone()[0]
    
        # If the coin is mentioned in the message table or it is a reply to a coin promoting message that is in the database insert them in to the database referencing the coin
    
        if is_coin_mentioned or (is_reply and is_reply_in_db):
            Cdata.insert_telegram_messages(message, contract_address,conn)
            Cdata.insert_first_view_data(message,conn)
            Cdata.insert_message_data(message,conn)
            
            conn.commit()
    
        conn.close()

@staticmethod
def insert_telegram_messages(message,contract_address,conn):
telethon_id = Cdata.message_to_id(message)

        cursor = conn.cursor()
    
        cursor.execute(""" 
        INSERT   INTO telegram_messages (
        messages,
        telethon_id,
        contract_address_address
        ) VALUES (%s,%s,%s)
        ON CONFLICT (telethon_id)
        DO NOTHING
        
        """,
        (message.text,
        telethon_id,
        contract_address
        ))        
        conn.commit()

@staticmethod
def insert_contract_address(contract_address,conn):

        cursor = conn.cursor()
        cursor.execute("""
            INSERT INTO contract_address (address)
            VALUES (%s)
            ON CONFLICT (address)
            DO NOTHING
        """, (contract_address,))        
        
        conn.commit()

我期望数据库插入没有问题,因为我事先尝试使用 iter_messages 方法进行插入。日志文件也没有帮助我。

python postgresql 多线程异 电视马拉松

评论


答: 暂无答案