Asyncio 和多处理 ETL 流 - 队列max_size不 1 时不插入所有行

Asyncio and multiprocessing ETL flow - not inserting all rows when queue max_size not 1

提问人:VladIPlay 提问时间:11/13/2023 更新时间:11/13/2023 访问量:19

问:

我在 https://www.vinta.com.br/blog/etl-with-asyncio-asyncpg 上发现了如何制作一个流程来从数据库获取数据(使用异步代码),使用多处理批量处理该数据,然后插入回数据库(也是异步)。

我的代码如下所示:

import asyncio
import aiosqlite

import multiprocessing
from concurrent.futures import ProcessPoolExecutor

from tqdm import tqdm
from tqdm.asyncio import tqdm_asyncio

from lingua import Language, LanguageDetectorBuilder
import ro_diacritics

import spacy
import json

import argostranslate.package
import argostranslate.translate

import functools
import datetime
import time
import re
import logging
from multiprocessing_logging import install_mp_handler
import os


def initializize():

    settings = dict()

    settings['new_table_name'] = 'news2'

    settings['old_table_name'] = 'news'

    settings['remove_columns'] = ['resultTitleLemma', 'articleContentLemma']

    settings['add_columns'] = [
        'id', 'newErrorStatus', 'newErrorException', 'articleTextLenght', 
        'resultTitleToken', 'resultTitleLemma', 'resultTitlePos', 
        'resultExtractedTitleToken', 'resultExtractedTitleLemma', 'resultExtractedTitlePos', 
        'articleTitleToken', 'articleTitleLemma', 'articleTitlePos',
        'articleTextToken', 'articleTextLemma', 'articleTextPos', 
        'resultTitleTranslated', 'resultExtractedTitleTranslated', 'articleTitleTranslated', 'articleTextTranslated',
        'resultTitleTranslatedToken', 'resultTitleTranslatedLemma', 'resultTitleTranslatedPos', 
        'resultExtractedTitleTranslatedToken', 'resultExtractedTitleTranslatedLemma', 'resultExtractedTitleTranslatedPos', 
        'articleTitleTranslatedToken', 'articleTitleTranslatedLemma', 'articleTitleTranslatedPos', 
        'articleTextTranslatedToken', 'articleTextTranslatedLemma', 'articleTextTranslatedPos'
        ]
    
    # check for title null or in blacklist
    settings['title_blacklist'] = ['nainte de a continua', 'One moment, please', 'Before you continue', 'tiri Google']

    # lingua-language detector
    #from lingua import Language, LanguageDetectorBuilder
    settings['wanted_language'] = "ROMANIAN"
    settings['language_detector'] = LanguageDetectorBuilder.from_all_languages_without(Language.TAGALOG, Language.TSONGA, Language.YORUBA, Language.LATIN).with_preloaded_language_models().build()

    # restore diacritics
    #import ro_diacritics
    settings['columns_diacritics'] = ['resultTitle', 'resultExtractedTitle', 'articleTitle', 'articleText']

    # Spacy models for lemmatizing
    #import spacy
    #import json
    # python -m spacy download ro_core_news_md
    settings['spacy_model_ro'] = spacy.load("ro_core_news_md", disable=['ner', 'parser'])

    settings['lemmatizer_columns_ro'] = {
        'resultTitle': {'text':'resultTitleToken', 'lemma_':'resultTitleLemma', 'pos_':'resultTitlePos'},
        'resultExtractedTitle': {'text':'resultExtractedTitleToken', 'lemma_':'resultExtractedTitleLemma', 'pos_':'resultExtractedTitlePos'},
        'articleTitle': {'text':'articleTitleToken', 'lemma_':'articleTitleLemma', 'pos_':'articleTitlePos'},
        'articleText': {'text':'articleTextToken', 'lemma_':'articleTextLemma', 'pos_':'articleTextPos'},
    }

    # python -m spacy download en_core_web_md
    settings['spacy_model_en'] = spacy.load("en_core_web_md", disable=['ner', 'parser'])

    settings['lemmatizer_columns_en'] = {
        'resultTitleTranslated': {'text':'resultTitleTranslatedToken', 'lemma_':'resultTitleTranslatedLemma', 'pos_':'resultTitleTranslatedPos'},
        'resultExtractedTitleTranslated': {'text':'resultExtractedTitleTranslatedToken', 'lemma_':'resultExtractedTitleTranslatedLemma', 'pos_':'resultExtractedTitleTranslatedPos'},
        'articleTitleTranslated': {'text':'articleTitleTranslatedToken', 'lemma_':'articleTitleTranslatedLemma', 'pos_':'articleTitleTranslatedPos'},
        'articleTextTranslated': {'text':'articleTextTranslatedToken', 'lemma_':'articleTextTranslatedLemma', 'pos_':'articleTextTranslatedPos'},
    }

    # Argostranslate from ro -> en
    #import argostranslate.package
    #import argostranslate.translate

    settings['from_lang'] = 'ro'
    settings['to_lang'] = 'en'

    argostranslate.package.update_package_index()
    available_packages = argostranslate.package.get_available_packages()
    package_to_install = next(
        filter(
            lambda x: x.from_code == settings['from_lang'] and x.to_code == settings['to_lang'], available_packages
        )
    )
    argostranslate.package.install_from_path(package_to_install.download())

    settings['translate_columns'] = {'resultTitle':'resultTitleTranslated', 
                        'resultExtractedTitle':'resultExtractedTitleTranslated', 
                        'articleTitle':'articleTitleTranslated', 
                        'articleText':'articleTextTranslated'}
    
    # Last check for matching 
    settings['check_match_columns'] = ['resultTitleLemma', 'resultExtractedTitleLemma', 'articleTitleLemma', 'articleTextLemma']

    return settings



### Data functions for main_row()

def convert_row_dict(row):
    return dict(row)


def remove_dict_columns(datadict, remove_columns):
    if not isinstance(remove_columns, list) and not isinstance(remove_columns, tuple): remove_columns = [remove_columns]
    [datadict.pop(column, None) for column in remove_columns]
    return datadict


def fix_accents_dict(datadict):
    def fix_accents(s):
        char_dict = { "º": "ș", "ª": "Ș", "ş": "ș", "Ş": "Ș", "ţ": "ț", "Ţ": "Ț", "þ": "ț", "Þ": "Ț", "ã": "ă"  }
        for k,v in char_dict.items():
            s = s.replace(k, v)
        return s
    for item in datadict.keys():
        if isinstance(datadict[item], str):
            datadict[item] = fix_accents(datadict[item])
    return datadict


def add_dict_columns(datadict, add_columns, default_value=None):
    if not isinstance(add_columns, list) and not isinstance(add_columns, tuple): add_columns = [add_columns]
    [datadict.update({column: default_value}) for column in add_columns]
    return datadict


def check_null_article_text(datadict):
    if not datadict['articleText']:
        datadict['newErrorStatus'] = str(3)
        datadict['newErrorException'] = 'Content null'
        raise Exception(datadict['newErrorException'])
    return datadict


def calc_content_lenght(datadict):
    datadict['articleTextLenght'] = str(len(datadict['articleText']))
    return datadict


def check_extracted_title(datadict, titleBlackList = None):
    if not datadict['resultExtractedTitle']:
        datadict['newErrorStatus'] = str(4)
        datadict['newErrorException'] = 'Page title null'
        raise Exception(datadict['newErrorException'])
    
    if not titleBlackList:
        titleBlackList = ['nainte de a continua', 'One moment, please', 'Before you continue', 'tiri Google']
    if any([x in datadict['resultExtractedTitle'] for x in titleBlackList]):
        datadict['newErrorStatus'] = str(4)
        datadict['newErrorException'] = 'Title in blocked list'
        raise Exception(datadict['newErrorException'])
    return datadict


def check_md_website(datadict):
    location_url = re.findall("(?:[a-zA-Z]*\.)+([a-zA-Z]+)(?:\/.*)?", str(datadict['articleUrl']))
    if any(x == "md" for x in location_url):
        datadict['newErrorStatus'] = str(5)
        datadict['newErrorException'] = 'Website is from md'
        raise Exception(datadict['newErrorException'])
    return datadict


def check_language(datadict, wanted_language = 'ROMANIAN', language_detector = None):
    if not language_detector:
        language_detector = LanguageDetectorBuilder.from_all_languages_without(Language.TAGALOG, Language.TSONGA, Language.YORUBA, Language.LATIN).with_preloaded_language_models().build()
    try:
        articleLanguage = language_detector.detect_language_of(datadict['articleText']).name
    except:
        datadict['newErrorStatus'] = str(6)
        datadict['newErrorException'] = f'Language could not be extracted'
        raise Exception(datadict['newErrorException'])
    else:
        if articleLanguage != wanted_language:
            datadict['newErrorStatus'] = str(6)
            datadict['newErrorException'] = f'Language of article is probably not Romanian, it is {articleLanguage}'
            raise Exception(datadict['newErrorException'])
    return datadict
        

def restore_diacritics(datadict, columns_diacritics = None):
    if not columns_diacritics:
        columns_diacritics = ['resultTitle', 'resultExtractedTitle', 'articleTitle', 'articleText']
    diacticice = ["ă", "Ă", "î", "Î", "â", "Â", "ș", "Ș", "ț", "Ț"]
    for column in columns_diacritics:
        if not any(e in dict[column] for e in diacticice):
            datadict[column] = ro_diacritics.restore_diacritics(datadict[column])
    return datadict


def lemmatizer(datadict, columns, spacy_model):
    '''
    lemmatizer_columns_ro = {
        'resultTitle': {'text':'resultTitleToken', 'lemma_':'resultTitleLemma', 'pos_':'resultTitlePos'},
        'resultExtractedTitle': {'text':'resultExtractedTitleToken', 'lemma_':'resultExtractedTitleLemma', 'pos_':'resultExtractedTitlePos'},
        'articleTitle': {'text':'articleTitleToken', 'lemma_':'articleTitleLemma', 'pos_':'articleTitlePos'},
        'articleText': {'text':'articleTextToken', 'lemma_':'articleTextLemma', 'pos_':'articleTextPos'},
    }
    spacy_model_ro = spacy.load("ro_core_news_md", disable=['ner', 'parser'])
    lemmatizer_columns_en = {
        'resultTitleTranslated': {'text':'resultTitleTranslatedToken', 'lemma_':'resultTitleTranslatedLemma', 'pos_':'resultTitleTranslatedPos'},
        'resultExtractedTitleTranslated': {'text':'resultExtractedTitleTranslatedToken', 'lemma_':'resultExtractedTitleTranslatedLemma', 'pos_':'resultExtractedTitleTranslatedPos'},
        'articleTitleTranslated': {'text':'articleTitleTranslatedToken', 'lemma_':'articleTitleTranslatedLemma', 'pos_':'articleTitleTranslatedPos'},
        'articleTextTranslated': {'text':'articleTextTranslatedToken', 'lemma_':'articleTextTranslatedLemma', 'pos_':'articleTextTranslatedPos'},
    }
    spacy_model_en = spacy.load("en_core_web_md", disable=['ner', 'parser'])
    '''
    for column in columns:
        result_spacy =  spacy_model(datadict[column])
        for method in columns[column]:
            datadict[columns[column][method]] = json.dumps([getattr(token, method) for token in result_spacy])
    return datadict


def translate_lang_check(from_lang = None, to_lang = None):
    if not from_lang: from_lang = 'ro'
    if not to_lang: to_lang = 'en'
    argostranslate.package.update_package_index()
    available_packages = argostranslate.package.get_available_packages()
    package_to_install = next(
        filter(
            lambda x: x.from_code == from_lang and x.to_code == to_lang, available_packages
        )
    )
    argostranslate.package.install_from_path(package_to_install.download())


def translate(datadict, columns_dict_from_to = None, from_lang = None, to_lang = None):
    if not columns_dict_from_to:
        columns_dict_from_to = {'resultTitle':'resultTitleTranslated', 
                                'resultExtractedTitle':'resultExtractedTitleTranslated', 
                                'articleTitle':'articleTitleTranslated', 
                                'articleText':'articleTextTranslated'}
    if not from_lang: from_lang = 'ro'
    if not to_lang: to_lang = 'en'
    for column in columns_dict_from_to:
        datadict[columns_dict_from_to[column]] = argostranslate.translate.translate(datadict[column], from_lang, to_lang)
    return datadict


def removeFromListSmallerNth(listStrings, noCharacters=3):
    listStringsLower = [x.lower() for x in listStrings]
    return set(filter(lambda i: len(i) >= noCharacters, listStringsLower))


def check_match_titles(datadict, columns_match = None):
    if not columns_match:
        columns_match = ['resultTitleLemma', 'resultExtractedTitleLemma', 'articleTitleLemma', 'articleTextLemma']
    results = dict()
    for column in columns_match:
        results[column] = removeFromListSmallerNth(json.loads(datadict[column])) 
    matches = dict()
    for column in results:
        temp_results = results.copy()
        temp_results.pop(column, None)
        matches[column] = dict()
        for temp_column in temp_results:
            matches[column][temp_column] = len((results[column] & results[temp_column]))
    if all([matches[column][value] for column in matches for value in matches[column]]) < 1: 
        datadict['newErrorStatus'] = str(8)
        datadict['newErrorException'] = 'Article might not match with google title - no match exists'
        raise Exception(datadict['newErrorException'])
    if sum([sum(matches[column].values()) for column in matches])/2 < 5:
        datadict['newErrorStatus'] = str(8)
        datadict['newErrorException'] = 'Article might not match with google title - sum of all matches is smaller than 5'
        raise Exception(datadict['newErrorException'])
    if any([sum(matches[column].values()) for column in matches]) < 1: 
        datadict['newErrorStatus'] = str(8)
        datadict['newErrorException'] = 'Article might not match with google title - sum of a group match is 0'
        raise Exception(datadict['newErrorException'])
    return datadict


### Data function for main()

def main_data(row_data_dict, settings, ij, ii, j, i = None):
    if not i: i = row_data_dict['temp_id']
    try:
        logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - Starting tasks for {i}/{j}")
        row_data_dict = remove_dict_columns(row_data_dict, settings['remove_columns'])
        row_data_dict = fix_accents_dict(row_data_dict)
        row_data_dict = add_dict_columns(row_data_dict, settings['add_columns'], default_value=None)
        row_data_dict['id'] = row_data_dict['temp_id']
        row_data_dict = check_null_article_text(row_data_dict)
        #check error
        row_data_dict = calc_content_lenght(row_data_dict)
        row_data_dict = check_extracted_title(row_data_dict, settings['title_blacklist'])
        #check error
        row_data_dict = check_md_website(row_data_dict)
        #check error
        row_data_dict = check_language(row_data_dict, settings['wanted_language'], settings['language_detector'])
        #check error
        row_data_dict = restore_diacritics(row_data_dict, settings['columns_diacritics'])
        logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - Starting ro lemmatizer for {i}/{j}")
        row_data_dict = lemmatizer(row_data_dict, settings['lemmatizer_columns_ro'], settings['spacy_model_ro'])
        logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - Starting translating for {i}/{j}")
        row_data_dict = translate(row_data_dict, settings['translate_columns'], settings['from_lang'], settings['to_lang'])
        logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - Starting en lemmatizer for {i}/{j}")
        row_data_dict = lemmatizer(row_data_dict, settings['lemmatizer_columns_en'], settings['spacy_model_en'])
        row_data_dict = check_match_titles(row_data_dict, settings['check_match_columns'])
        logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - Finisted tasks for {i}/{j}")
    except Exception as e:
        if not row_data_dict['newErrorStatus']:
            row_data_dict['newErrorStatus'] = str(0)
        if not row_data_dict['newErrorException']:
            row_data_dict['newErrorException'] = str(e)
        logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - Error task for {i}/{j} - {row_data_dict['newErrorStatus']} / {row_data_dict['newErrorException']}")
    finally:
        row_data_dict.pop('temp_id', None)
        return row_data_dict
    

def transform(batch, settings, ij, ii, j):
    logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - Started transform")
    transformed_batch = []
    for record in batch: 
        transformed_batch.append(main_data(record, settings, ij, ii, j))
    logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - Ended transform")
    return transformed_batch


async def load(batch, connection, settings, ij, ii, j):
    logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - Started load")
    values = [tuple(r.values()) for r in batch]
    sqlstring = f'INSERT INTO {settings["new_table_name"]} VALUES (' + ','.join(['?'] * settings['no_columns']) + ')'
    await connection.executemany(sqlstring, values)
    await connection.commit()
    for item in batch:
        logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - Inserted row {item['id']}/{j}")
    logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - Ended load")


async def task_set_load_helper(task_set, connection, settings, ij, ii, j):
    for future in task_set:
        await load( await future, connection, settings, ij, ii, j)
    logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - Ended task_set_load_helper")
    

async def consumer(loop, pool, queue, db_connection, settings, ij, ii, j):
    logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - Started etl - consumer")   
    task_set = set()
    batch = []
    while True:
        record = await queue.get()
        if record is not None:
            #record = dict(record)
            batch.append(record)
        if queue.empty():
            task = loop.run_in_executor(pool, transform, batch, settings, ij, ii, j)
            task_set.add(task)
        if len(task_set) >= pool._max_workers:
            done_set, task_set = await asyncio.wait(task_set, return_when=asyncio.FIRST_COMPLETED)
            logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - Start task_set_load_helper #1")
            await task_set_load_helper(done_set, db_connection, settings, ij, ii, j)
        batch = []
        if record is None:
            break
    if task_set:
        logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - Start task_set_load_helper #2")
        await task_set_load_helper(asyncio.as_completed(task_set), db_connection, settings, ij, ii, j)
    logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - Ended etl - consumer")


async def extract(db_connection, settings, ij, ii, j):
    logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - Started etl - producer - extract")
    i = 0
    db_connection.row_factory = aiosqlite.Row
    async with db_connection.execute(f'SELECT * FROM {settings["old_table_name"]}') as cursor:
        async for record in cursor:
            if i > 100: break

            i += 1
            record = dict(record)
            record['temp_id'] = i
            logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - Extracted row {i}/{j}")
            yield record
    logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - Ended etl - producer - extract")

    
async def producer(queue, db_connection, settings, ij, ii, j):
  logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - Started etl - producer")
  async for record in extract(db_connection, settings, ij, ii, j):
    await queue.put(record)
  await queue.put(None)
  logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - Ended etl - producer")


async def etl(db_connection, settings, ij, ii, j):
  logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - Started etl")
  logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - Number of workers = {multiprocessing.cpu_count()}")
  with ProcessPoolExecutor(
    max_workers=multiprocessing.cpu_count() - 2,
  ) as pool:
    loop = asyncio.get_running_loop()
    queue = asyncio.Queue(maxsize= 1)
    await asyncio.gather(
      asyncio.create_task(producer(queue, db_connection, settings, ij, ii, j)),
      asyncio.create_task(consumer(loop, pool, queue, db_connection, settings, ij, ii, j)),
      return_exceptions=False,
    )
    logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - Ended etl")



### Database insert for main()    

async def insert_db(table_name, table_connection, row_data_dict):
    sqlstring = f'INSERT INTO {table_name} (' + ','.join(row_data_dict.keys()) +') values(' + ','.join(['?'] * len(list(row_data_dict.values()))) + ')'
    await table_connection.execute(sqlstring, list(row_data_dict.values()))
    await table_connection.commit()


### Database functions for main_db()

async def create_db_table_from_table(new_table_name, table_cursor, table_connection, ij, ii, old_table_name = None):
    logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - Create new table {new_table_name}")
    await table_cursor.execute(f"DROP TABLE IF EXISTS {new_table_name}")
    await table_connection.commit()
    if not old_table_name:
        await table_cursor.execute(f"CREATE TABLE {new_table_name}")
        await table_connection.commit()
    else:
        await table_cursor.execute(f"CREATE TABLE {new_table_name} AS SELECT * FROM {old_table_name} WHERE 0")
        await table_connection.commit()


async def remove_db_columns(table_name, table_cursor, table_connection, remove_columns, ij, ii):
    if not isinstance(remove_columns, list) and not isinstance(remove_columns, tuple): remove_columns = [remove_columns]
    for remove_column in remove_columns:
        logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - Remove column {remove_column}")
        await table_cursor.execute(f"ALTER TABLE {table_name} DROP COLUMN {remove_column}")
        await table_connection.commit()


async def add_db_columns(table_name, table_cursor, table_connection, add_columns, ij, ii):
    if not isinstance(add_columns, list) and not isinstance(add_columns, tuple): add_columns = [add_columns]
    for new_column in add_columns:
        logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - Add column {new_column}")
        await table_cursor.execute(f"ALTER TABLE {table_name} ADD COLUMN {new_column} TEXT")
        await table_connection.commit()


async def get_no_rows_db(db_table_connection, table_name, ij, ii):
    async with db_table_connection.execute(f'SELECT * FROM {table_name}') as cursor:
        return len( await cursor.fetchall())
    

async def get_no_columns_table(db_table_connection, table_name, ij, ii):
    async with db_table_connection.execute(f'SELECT * FROM {table_name}') as cursor:
        columns = [description[0] for description in cursor.description]    
        return len(columns)
    

### Database initialization for main() 

async def main_db(db_table_cursor, db_table_connection, settings): #(new_table_name, table_cursor, table_connection, ij, ii, j, old_table_name = None)
    await create_db_table_from_table(settings['new_table_name'], db_table_cursor, db_table_connection, ij, ii, settings['old_table_name'])
    await remove_db_columns(settings['new_table_name'], db_table_cursor, db_table_connection, settings['remove_columns'], ij, ii)
    await add_db_columns(settings['new_table_name'], db_table_cursor, db_table_connection, settings['add_columns'], ij, ii)
    no_rows_old_table = await get_no_rows_db(db_table_connection, settings['old_table_name'], ij, ii)
    no_columns_new_table = await get_no_columns_table(db_table_connection, settings['new_table_name'], ij, ii)
    return no_rows_old_table, no_columns_new_table


async def get_aiosqlite_connection(db_filename):
  connection = await aiosqlite.connect(db_filename)
  return connection


### main loop

async def main(db_filename, ij, ii):
    # Initialize parameters
    logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - Starting initialize parameters for {db_filename}")
    settings = initializize()
    logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - Finished initialize parameters for {db_filename}")
    # Initialize database
    logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - Starting initialize database for {db_filename}")
    db_connection = await get_aiosqlite_connection(db_filename)
    db_cursor = await db_connection.cursor()
    j, no_columns = await main_db(db_cursor, db_connection, settings)
    settings['no_columns'] = no_columns
    logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - Finished initialize database for {db_filename}")
    
    logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - Starting tasks for {db_filename}")
    await etl(db_connection, settings, ij, ii, j)
    
    # Test without multiprocessing
    #i = 0
    #db_connection.row_factory = aiosqlite.Row
    #async with db_connection.execute(f'SELECT * FROM {settings["old_table_name"]}') as cursor:
    #    async for record in cursor:
    #        if i > 100: break
    #        i += 1
    #        record = dict(record)
    #        record['temp_id'] = i
    #        logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - Extracted row {i}/{j}")
    #        result = main_data(record, settings, ij, ii, j)
    #        await load( [result], db_connection, settings, ij, ii, j)
    
    
    logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - Finished tasks for {db_filename}")

    await db_connection.close()
   

### startup 

if __name__ == "__main__": 
    program_start_time = datetime.datetime.now()
    logging.basicConfig(level=logging.INFO,
                handlers=[
                    logging.FileHandler(f'debug_correct_translate_lemma_{program_start_time.strftime("%Y_%m_%d_%H_%M")}.txt',
                                        'a', 'utf-8'), logging.StreamHandler()])
    install_mp_handler()
    logging.info(f"{datetime.datetime.now()} - {__file__} started at {program_start_time}")

    #linux mypath = '/media/vlad/HDD1000/disertatie/extracted_original_1/test1'
    #mypath = 'C:/Users/Vlad/PycharmProjects/newsScrapping/database'
    mypath = 'D:/disertatie/cod/newsScrapping/database'
    filenames = next(os.walk(mypath), (None, None, []))[2]  # [] if no file
    filepaths = [os.path.join(dirpath,f) for (dirpath, dirnames, filenames) in os.walk(mypath) for f in filenames] 
    dbfiles = filepaths
    
    ii = 0
    ij = len(dbfiles)
    for dbfile in dbfiles:
        ii += 1
        keyword_start_time = datetime.datetime.now()
        logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - {dbfile} started at {keyword_start_time}")

        asyncio.run(main(dbfile, ij, ii))
        
        keyword_end_time = datetime.datetime.now()
        keyword_elapsed_time = keyword_end_time - keyword_start_time
        logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - {dbfile} started at {keyword_start_time}")
        logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - {dbfile} ended at {keyword_end_time}")
        logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - {dbfile} executed in {keyword_elapsed_time}")
    
    program_end_time = datetime.datetime.now()
    program_elapsed_time = program_end_time - program_start_time
    logging.info(f"{datetime.datetime.now()} - program started at {program_start_time}")
    logging.info(f"{datetime.datetime.now()} - program ended at {program_end_time}")
    logging.info(f"{datetime.datetime.now()} - program executed in {program_elapsed_time}")

问题在于它只将大约 10% 的行插入回数据库(例如,当队列的 max_size 为 8 时,28620 行中有 3577 行)。 我也尝试过没有队列的 max_size 参数,但仍然没有返回所有行。 当我将其设置为 1 时,将返回所有行,但时间几乎与运行代码而不进行多重处理的时间相同......

不带多进程数据库 1:0:08:58.806576 101 行 无多进程 db 2:0:13:52.000334 101 行 不带多进程数据库 3:0:02:49.493299 101 行 使用多进程数据库 1:0:08:12.181982 101 行 使用多进程数据库 2:0:10:40.637731 101 行 使用多进程数据库 3:0:06:35.342060 101 行

问题是什么,我可以做些什么来减少使用多进程的时间?队列有什么问题?

蟒蛇 python-asyncio python- 多处理

评论

0赞 Community 11/14/2023
请修剪您的代码,以便更轻松地找到您的问题。请遵循这些准则,以创建最小的可重现示例

答: 暂无答案