提问人:onat dicleli 提问时间:10/28/2023 最后编辑:AussieJoeonat dicleli 更新时间:10/29/2023 访问量:29
使用 telethon 和 postgresql 插入电报数据
Inserting telegram data using telethon and postgresql
问:
我写了一个小类来处理消息并从中提取一些内容。我现在正试图使用 telethon 跟踪这些数据并插入到我的 postgresql 数据库中。我还尝试使用我没有问题的方法插入它们,从我检查的日志来看,telethon 在实时拉取中工作正常,但我无法弄清楚数据库插入有什么问题。iter_messages
client = TelegramClient('test', api_id, api_hash)
# Create a message queue
message_queue = queue.Queue()
# Message retrieval and queuing
@client.on(events.NewMessage(chats=groups))
async def handle_new_message(event):
message = event.message
\# Print the message as it comes
print(f"New message: {message.text}")
\# Add the message to the queue for processing
message_queue.put(message)
async def print_asterisk_periodically():
while True:
print('*-*-\*')
await asyncio.sleep(150)
# Message processing worker
def message_processing_worker():
while True:
message = message_queue.get()
try:
Cdata.insert_replymentions(message)
# Check if the message has a contract address
result = Cdata.contract_message(message)
if result is not None:
print("***** I'm working *****")
conn = psycopg2.connect(**db_params)
# Process the result (message and contract address) here
message, contract_address = result
Cdata.insert_contract_address(contract_address,conn)
Cdata.insert_telegram_messages(message,contract_address,conn)
conn.close()
except Exception as e:
print(f"Error processing database insert: {str(e)}")
finally:
message_queue.task_done()
num_workers = 2
workers = \[\]
for \_ in range(num_workers):
worker = threading.Thread(target=message_processing_worker)
worker.start()
workers.append(worker)
async def main():
await client.start()
await asyncio.gather(client.run_until_disconnected(),print_asterisk_periodically() )
asyncio.run(main())
\#Functions Used
@staticmethod
def contract_message(message):
contract_address = None
cmes = None
# Check if the message has 'message', 'chat', 'title', and 'id' attributes
if hasattr(message, 'message') and hasattr(message, 'chat') and hasattr(message.chat, 'title') and hasattr(message, 'id'):
# Directly look for the Ethereum contract address pattern
match = re.search(r"0x[a-fA-F0-9]{40}", str(message.message))
if match and "https://www.pinksale.finance/launchpad/" not in str(message.message):
contract_address = match.group(0)
if ( token_address := Cdata.return_base_token(contract_address)) is not None:
cmes=message
return cmes,token_address
@staticmethod
def insert_replymentions(message):
conn = psycopg2.connect(**db_params)
cursor = conn.cursor()
cursor.execute("SELECT symbol, name FROM static_fundamental_data")
coins = cursor.fetchall()
# Exit the function if the coins query returns None
if coins is None:
conn.close()
return
# Filter out None values for coin symbols and names
coin_symbols = [coin[0] for coin in coins if coin[0] is not None]
coin_names = [coin[1] for coin in coins if coin[1] is not None]
# Handle the case where the message has no text
message_text = message.text if hasattr(message, 'text') else ''
if not message_text:
conn.close()
return
cursor.execute("SELECT symbol, name FROM static_fundamental_data")
coins = cursor.fetchall()
coin_symbols = [coin[0] for coin in coins]
coin_names = [coin[1] for coin in coins]
message_text = message.text if hasattr(message, 'text') else ''
# Check if the coin is mentioned in the messages text
is_coin_mentioned = any(re.search(rf'\b{re.escape(symbol)}\b', message_text, re.I) for symbol in coin_symbols) or any(re.search(rf'\b{re.escape(name)}\b',
message_text, re.I) for name in coin_names)
#Check if the message is a reply to another message
is_reply = bool(message.reply_to_msg_id)
contract_address = None
if is_reply:
replied_message_id = message.reply_to_msg_id
replied_mes_id = f"{message.chat.title}:-:{replied_message_id}"
# Check if the replied message ID exists in the database
cursor.execute("SELECT COUNT(*) FROM telegram_messages WHERE telethon_id = %s", (replied_mes_id,))
count = cursor.fetchone()[0]
is_reply_in_db = count > 0
# If the replied message in the database
if is_reply_in_db:
cursor.execute("SELECT telegram_messages.telethon_id FROM telegram_messages INNER JOIN static_fundamental_data ON telegram_messages.contract_address_address = static_fundamental_data.contract_address_address WHERE telegram_messages.telethon_id = %s", (replied_mes_id,))
db_message = cursor.fetchone()
if db_message:
cmes,contract_address = Cdata.contract_message(db_message)
if is_coin_mentioned and not contract_address:
# Find the contract address corresponding to the coin symbol or name mentioned in the message
mentioned_symbol_or_name = next((symbol for symbol in coin_symbols if re.search(rf'\b{re.escape(symbol)}\b', message_text, re.I)), None) or \
next((name for name in coin_names if re.search(rf'\b{re.escape(name)}\b', message_text, re.I)), None)
#If the contract name or symbol is in the messages text fetch the contrac address
if mentioned_symbol_or_name:
cursor.execute("SELECT contract_address_address FROM static_fundamental_data WHERE symbol = %s OR name = %s", (mentioned_symbol_or_name, mentioned_symbol_or_name))
contract_address = cursor.fetchone()[0]
# If the coin is mentioned in the message table or it is a reply to a coin promoting message that is in the database insert them in to the database referencing the coin
if is_coin_mentioned or (is_reply and is_reply_in_db):
Cdata.insert_telegram_messages(message, contract_address,conn)
Cdata.insert_first_view_data(message,conn)
Cdata.insert_message_data(message,conn)
conn.commit()
conn.close()
@staticmethod
def insert_telegram_messages(message,contract_address,conn):
telethon_id = Cdata.message_to_id(message)
cursor = conn.cursor()
cursor.execute("""
INSERT INTO telegram_messages (
messages,
telethon_id,
contract_address_address
) VALUES (%s,%s,%s)
ON CONFLICT (telethon_id)
DO NOTHING
""",
(message.text,
telethon_id,
contract_address
))
conn.commit()
@staticmethod
def insert_contract_address(contract_address,conn):
cursor = conn.cursor()
cursor.execute("""
INSERT INTO contract_address (address)
VALUES (%s)
ON CONFLICT (address)
DO NOTHING
""", (contract_address,))
conn.commit()
我期望数据库插入没有问题,因为我事先尝试使用 iter_messages 方法进行插入。日志文件也没有帮助我。
答: 暂无答案
评论