提问人:VladIPlay 提问时间:11/13/2023 更新时间:11/13/2023 访问量:19
Asyncio 和多处理 ETL 流 - 队列max_size不 1 时不插入所有行
Asyncio and multiprocessing ETL flow - not inserting all rows when queue max_size not 1
问:
我在 https://www.vinta.com.br/blog/etl-with-asyncio-asyncpg 上发现了如何制作一个流程来从数据库获取数据(使用异步代码),使用多处理批量处理该数据,然后插入回数据库(也是异步)。
我的代码如下所示:
import asyncio
import aiosqlite
import multiprocessing
from concurrent.futures import ProcessPoolExecutor
from tqdm import tqdm
from tqdm.asyncio import tqdm_asyncio
from lingua import Language, LanguageDetectorBuilder
import ro_diacritics
import spacy
import json
import argostranslate.package
import argostranslate.translate
import functools
import datetime
import time
import re
import logging
from multiprocessing_logging import install_mp_handler
import os
def initializize():
settings = dict()
settings['new_table_name'] = 'news2'
settings['old_table_name'] = 'news'
settings['remove_columns'] = ['resultTitleLemma', 'articleContentLemma']
settings['add_columns'] = [
'id', 'newErrorStatus', 'newErrorException', 'articleTextLenght',
'resultTitleToken', 'resultTitleLemma', 'resultTitlePos',
'resultExtractedTitleToken', 'resultExtractedTitleLemma', 'resultExtractedTitlePos',
'articleTitleToken', 'articleTitleLemma', 'articleTitlePos',
'articleTextToken', 'articleTextLemma', 'articleTextPos',
'resultTitleTranslated', 'resultExtractedTitleTranslated', 'articleTitleTranslated', 'articleTextTranslated',
'resultTitleTranslatedToken', 'resultTitleTranslatedLemma', 'resultTitleTranslatedPos',
'resultExtractedTitleTranslatedToken', 'resultExtractedTitleTranslatedLemma', 'resultExtractedTitleTranslatedPos',
'articleTitleTranslatedToken', 'articleTitleTranslatedLemma', 'articleTitleTranslatedPos',
'articleTextTranslatedToken', 'articleTextTranslatedLemma', 'articleTextTranslatedPos'
]
# check for title null or in blacklist
settings['title_blacklist'] = ['nainte de a continua', 'One moment, please', 'Before you continue', 'tiri Google']
# lingua-language detector
#from lingua import Language, LanguageDetectorBuilder
settings['wanted_language'] = "ROMANIAN"
settings['language_detector'] = LanguageDetectorBuilder.from_all_languages_without(Language.TAGALOG, Language.TSONGA, Language.YORUBA, Language.LATIN).with_preloaded_language_models().build()
# restore diacritics
#import ro_diacritics
settings['columns_diacritics'] = ['resultTitle', 'resultExtractedTitle', 'articleTitle', 'articleText']
# Spacy models for lemmatizing
#import spacy
#import json
# python -m spacy download ro_core_news_md
settings['spacy_model_ro'] = spacy.load("ro_core_news_md", disable=['ner', 'parser'])
settings['lemmatizer_columns_ro'] = {
'resultTitle': {'text':'resultTitleToken', 'lemma_':'resultTitleLemma', 'pos_':'resultTitlePos'},
'resultExtractedTitle': {'text':'resultExtractedTitleToken', 'lemma_':'resultExtractedTitleLemma', 'pos_':'resultExtractedTitlePos'},
'articleTitle': {'text':'articleTitleToken', 'lemma_':'articleTitleLemma', 'pos_':'articleTitlePos'},
'articleText': {'text':'articleTextToken', 'lemma_':'articleTextLemma', 'pos_':'articleTextPos'},
}
# python -m spacy download en_core_web_md
settings['spacy_model_en'] = spacy.load("en_core_web_md", disable=['ner', 'parser'])
settings['lemmatizer_columns_en'] = {
'resultTitleTranslated': {'text':'resultTitleTranslatedToken', 'lemma_':'resultTitleTranslatedLemma', 'pos_':'resultTitleTranslatedPos'},
'resultExtractedTitleTranslated': {'text':'resultExtractedTitleTranslatedToken', 'lemma_':'resultExtractedTitleTranslatedLemma', 'pos_':'resultExtractedTitleTranslatedPos'},
'articleTitleTranslated': {'text':'articleTitleTranslatedToken', 'lemma_':'articleTitleTranslatedLemma', 'pos_':'articleTitleTranslatedPos'},
'articleTextTranslated': {'text':'articleTextTranslatedToken', 'lemma_':'articleTextTranslatedLemma', 'pos_':'articleTextTranslatedPos'},
}
# Argostranslate from ro -> en
#import argostranslate.package
#import argostranslate.translate
settings['from_lang'] = 'ro'
settings['to_lang'] = 'en'
argostranslate.package.update_package_index()
available_packages = argostranslate.package.get_available_packages()
package_to_install = next(
filter(
lambda x: x.from_code == settings['from_lang'] and x.to_code == settings['to_lang'], available_packages
)
)
argostranslate.package.install_from_path(package_to_install.download())
settings['translate_columns'] = {'resultTitle':'resultTitleTranslated',
'resultExtractedTitle':'resultExtractedTitleTranslated',
'articleTitle':'articleTitleTranslated',
'articleText':'articleTextTranslated'}
# Last check for matching
settings['check_match_columns'] = ['resultTitleLemma', 'resultExtractedTitleLemma', 'articleTitleLemma', 'articleTextLemma']
return settings
### Data functions for main_row()
def convert_row_dict(row):
return dict(row)
def remove_dict_columns(datadict, remove_columns):
if not isinstance(remove_columns, list) and not isinstance(remove_columns, tuple): remove_columns = [remove_columns]
[datadict.pop(column, None) for column in remove_columns]
return datadict
def fix_accents_dict(datadict):
def fix_accents(s):
char_dict = { "º": "ș", "ª": "Ș", "ş": "ș", "Ş": "Ș", "ţ": "ț", "Ţ": "Ț", "þ": "ț", "Þ": "Ț", "ã": "ă" }
for k,v in char_dict.items():
s = s.replace(k, v)
return s
for item in datadict.keys():
if isinstance(datadict[item], str):
datadict[item] = fix_accents(datadict[item])
return datadict
def add_dict_columns(datadict, add_columns, default_value=None):
if not isinstance(add_columns, list) and not isinstance(add_columns, tuple): add_columns = [add_columns]
[datadict.update({column: default_value}) for column in add_columns]
return datadict
def check_null_article_text(datadict):
if not datadict['articleText']:
datadict['newErrorStatus'] = str(3)
datadict['newErrorException'] = 'Content null'
raise Exception(datadict['newErrorException'])
return datadict
def calc_content_lenght(datadict):
datadict['articleTextLenght'] = str(len(datadict['articleText']))
return datadict
def check_extracted_title(datadict, titleBlackList = None):
if not datadict['resultExtractedTitle']:
datadict['newErrorStatus'] = str(4)
datadict['newErrorException'] = 'Page title null'
raise Exception(datadict['newErrorException'])
if not titleBlackList:
titleBlackList = ['nainte de a continua', 'One moment, please', 'Before you continue', 'tiri Google']
if any([x in datadict['resultExtractedTitle'] for x in titleBlackList]):
datadict['newErrorStatus'] = str(4)
datadict['newErrorException'] = 'Title in blocked list'
raise Exception(datadict['newErrorException'])
return datadict
def check_md_website(datadict):
location_url = re.findall("(?:[a-zA-Z]*\.)+([a-zA-Z]+)(?:\/.*)?", str(datadict['articleUrl']))
if any(x == "md" for x in location_url):
datadict['newErrorStatus'] = str(5)
datadict['newErrorException'] = 'Website is from md'
raise Exception(datadict['newErrorException'])
return datadict
def check_language(datadict, wanted_language = 'ROMANIAN', language_detector = None):
if not language_detector:
language_detector = LanguageDetectorBuilder.from_all_languages_without(Language.TAGALOG, Language.TSONGA, Language.YORUBA, Language.LATIN).with_preloaded_language_models().build()
try:
articleLanguage = language_detector.detect_language_of(datadict['articleText']).name
except:
datadict['newErrorStatus'] = str(6)
datadict['newErrorException'] = f'Language could not be extracted'
raise Exception(datadict['newErrorException'])
else:
if articleLanguage != wanted_language:
datadict['newErrorStatus'] = str(6)
datadict['newErrorException'] = f'Language of article is probably not Romanian, it is {articleLanguage}'
raise Exception(datadict['newErrorException'])
return datadict
def restore_diacritics(datadict, columns_diacritics = None):
if not columns_diacritics:
columns_diacritics = ['resultTitle', 'resultExtractedTitle', 'articleTitle', 'articleText']
diacticice = ["ă", "Ă", "î", "Î", "â", "Â", "ș", "Ș", "ț", "Ț"]
for column in columns_diacritics:
if not any(e in dict[column] for e in diacticice):
datadict[column] = ro_diacritics.restore_diacritics(datadict[column])
return datadict
def lemmatizer(datadict, columns, spacy_model):
'''
lemmatizer_columns_ro = {
'resultTitle': {'text':'resultTitleToken', 'lemma_':'resultTitleLemma', 'pos_':'resultTitlePos'},
'resultExtractedTitle': {'text':'resultExtractedTitleToken', 'lemma_':'resultExtractedTitleLemma', 'pos_':'resultExtractedTitlePos'},
'articleTitle': {'text':'articleTitleToken', 'lemma_':'articleTitleLemma', 'pos_':'articleTitlePos'},
'articleText': {'text':'articleTextToken', 'lemma_':'articleTextLemma', 'pos_':'articleTextPos'},
}
spacy_model_ro = spacy.load("ro_core_news_md", disable=['ner', 'parser'])
lemmatizer_columns_en = {
'resultTitleTranslated': {'text':'resultTitleTranslatedToken', 'lemma_':'resultTitleTranslatedLemma', 'pos_':'resultTitleTranslatedPos'},
'resultExtractedTitleTranslated': {'text':'resultExtractedTitleTranslatedToken', 'lemma_':'resultExtractedTitleTranslatedLemma', 'pos_':'resultExtractedTitleTranslatedPos'},
'articleTitleTranslated': {'text':'articleTitleTranslatedToken', 'lemma_':'articleTitleTranslatedLemma', 'pos_':'articleTitleTranslatedPos'},
'articleTextTranslated': {'text':'articleTextTranslatedToken', 'lemma_':'articleTextTranslatedLemma', 'pos_':'articleTextTranslatedPos'},
}
spacy_model_en = spacy.load("en_core_web_md", disable=['ner', 'parser'])
'''
for column in columns:
result_spacy = spacy_model(datadict[column])
for method in columns[column]:
datadict[columns[column][method]] = json.dumps([getattr(token, method) for token in result_spacy])
return datadict
def translate_lang_check(from_lang = None, to_lang = None):
if not from_lang: from_lang = 'ro'
if not to_lang: to_lang = 'en'
argostranslate.package.update_package_index()
available_packages = argostranslate.package.get_available_packages()
package_to_install = next(
filter(
lambda x: x.from_code == from_lang and x.to_code == to_lang, available_packages
)
)
argostranslate.package.install_from_path(package_to_install.download())
def translate(datadict, columns_dict_from_to = None, from_lang = None, to_lang = None):
if not columns_dict_from_to:
columns_dict_from_to = {'resultTitle':'resultTitleTranslated',
'resultExtractedTitle':'resultExtractedTitleTranslated',
'articleTitle':'articleTitleTranslated',
'articleText':'articleTextTranslated'}
if not from_lang: from_lang = 'ro'
if not to_lang: to_lang = 'en'
for column in columns_dict_from_to:
datadict[columns_dict_from_to[column]] = argostranslate.translate.translate(datadict[column], from_lang, to_lang)
return datadict
def removeFromListSmallerNth(listStrings, noCharacters=3):
listStringsLower = [x.lower() for x in listStrings]
return set(filter(lambda i: len(i) >= noCharacters, listStringsLower))
def check_match_titles(datadict, columns_match = None):
if not columns_match:
columns_match = ['resultTitleLemma', 'resultExtractedTitleLemma', 'articleTitleLemma', 'articleTextLemma']
results = dict()
for column in columns_match:
results[column] = removeFromListSmallerNth(json.loads(datadict[column]))
matches = dict()
for column in results:
temp_results = results.copy()
temp_results.pop(column, None)
matches[column] = dict()
for temp_column in temp_results:
matches[column][temp_column] = len((results[column] & results[temp_column]))
if all([matches[column][value] for column in matches for value in matches[column]]) < 1:
datadict['newErrorStatus'] = str(8)
datadict['newErrorException'] = 'Article might not match with google title - no match exists'
raise Exception(datadict['newErrorException'])
if sum([sum(matches[column].values()) for column in matches])/2 < 5:
datadict['newErrorStatus'] = str(8)
datadict['newErrorException'] = 'Article might not match with google title - sum of all matches is smaller than 5'
raise Exception(datadict['newErrorException'])
if any([sum(matches[column].values()) for column in matches]) < 1:
datadict['newErrorStatus'] = str(8)
datadict['newErrorException'] = 'Article might not match with google title - sum of a group match is 0'
raise Exception(datadict['newErrorException'])
return datadict
### Data function for main()
def main_data(row_data_dict, settings, ij, ii, j, i = None):
if not i: i = row_data_dict['temp_id']
try:
logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - Starting tasks for {i}/{j}")
row_data_dict = remove_dict_columns(row_data_dict, settings['remove_columns'])
row_data_dict = fix_accents_dict(row_data_dict)
row_data_dict = add_dict_columns(row_data_dict, settings['add_columns'], default_value=None)
row_data_dict['id'] = row_data_dict['temp_id']
row_data_dict = check_null_article_text(row_data_dict)
#check error
row_data_dict = calc_content_lenght(row_data_dict)
row_data_dict = check_extracted_title(row_data_dict, settings['title_blacklist'])
#check error
row_data_dict = check_md_website(row_data_dict)
#check error
row_data_dict = check_language(row_data_dict, settings['wanted_language'], settings['language_detector'])
#check error
row_data_dict = restore_diacritics(row_data_dict, settings['columns_diacritics'])
logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - Starting ro lemmatizer for {i}/{j}")
row_data_dict = lemmatizer(row_data_dict, settings['lemmatizer_columns_ro'], settings['spacy_model_ro'])
logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - Starting translating for {i}/{j}")
row_data_dict = translate(row_data_dict, settings['translate_columns'], settings['from_lang'], settings['to_lang'])
logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - Starting en lemmatizer for {i}/{j}")
row_data_dict = lemmatizer(row_data_dict, settings['lemmatizer_columns_en'], settings['spacy_model_en'])
row_data_dict = check_match_titles(row_data_dict, settings['check_match_columns'])
logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - Finisted tasks for {i}/{j}")
except Exception as e:
if not row_data_dict['newErrorStatus']:
row_data_dict['newErrorStatus'] = str(0)
if not row_data_dict['newErrorException']:
row_data_dict['newErrorException'] = str(e)
logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - Error task for {i}/{j} - {row_data_dict['newErrorStatus']} / {row_data_dict['newErrorException']}")
finally:
row_data_dict.pop('temp_id', None)
return row_data_dict
def transform(batch, settings, ij, ii, j):
logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - Started transform")
transformed_batch = []
for record in batch:
transformed_batch.append(main_data(record, settings, ij, ii, j))
logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - Ended transform")
return transformed_batch
async def load(batch, connection, settings, ij, ii, j):
logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - Started load")
values = [tuple(r.values()) for r in batch]
sqlstring = f'INSERT INTO {settings["new_table_name"]} VALUES (' + ','.join(['?'] * settings['no_columns']) + ')'
await connection.executemany(sqlstring, values)
await connection.commit()
for item in batch:
logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - Inserted row {item['id']}/{j}")
logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - Ended load")
async def task_set_load_helper(task_set, connection, settings, ij, ii, j):
for future in task_set:
await load( await future, connection, settings, ij, ii, j)
logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - Ended task_set_load_helper")
async def consumer(loop, pool, queue, db_connection, settings, ij, ii, j):
logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - Started etl - consumer")
task_set = set()
batch = []
while True:
record = await queue.get()
if record is not None:
#record = dict(record)
batch.append(record)
if queue.empty():
task = loop.run_in_executor(pool, transform, batch, settings, ij, ii, j)
task_set.add(task)
if len(task_set) >= pool._max_workers:
done_set, task_set = await asyncio.wait(task_set, return_when=asyncio.FIRST_COMPLETED)
logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - Start task_set_load_helper #1")
await task_set_load_helper(done_set, db_connection, settings, ij, ii, j)
batch = []
if record is None:
break
if task_set:
logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - Start task_set_load_helper #2")
await task_set_load_helper(asyncio.as_completed(task_set), db_connection, settings, ij, ii, j)
logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - Ended etl - consumer")
async def extract(db_connection, settings, ij, ii, j):
logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - Started etl - producer - extract")
i = 0
db_connection.row_factory = aiosqlite.Row
async with db_connection.execute(f'SELECT * FROM {settings["old_table_name"]}') as cursor:
async for record in cursor:
if i > 100: break
i += 1
record = dict(record)
record['temp_id'] = i
logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - Extracted row {i}/{j}")
yield record
logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - Ended etl - producer - extract")
async def producer(queue, db_connection, settings, ij, ii, j):
logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - Started etl - producer")
async for record in extract(db_connection, settings, ij, ii, j):
await queue.put(record)
await queue.put(None)
logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - Ended etl - producer")
async def etl(db_connection, settings, ij, ii, j):
logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - Started etl")
logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - Number of workers = {multiprocessing.cpu_count()}")
with ProcessPoolExecutor(
max_workers=multiprocessing.cpu_count() - 2,
) as pool:
loop = asyncio.get_running_loop()
queue = asyncio.Queue(maxsize= 1)
await asyncio.gather(
asyncio.create_task(producer(queue, db_connection, settings, ij, ii, j)),
asyncio.create_task(consumer(loop, pool, queue, db_connection, settings, ij, ii, j)),
return_exceptions=False,
)
logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - Ended etl")
### Database insert for main()
async def insert_db(table_name, table_connection, row_data_dict):
sqlstring = f'INSERT INTO {table_name} (' + ','.join(row_data_dict.keys()) +') values(' + ','.join(['?'] * len(list(row_data_dict.values()))) + ')'
await table_connection.execute(sqlstring, list(row_data_dict.values()))
await table_connection.commit()
### Database functions for main_db()
async def create_db_table_from_table(new_table_name, table_cursor, table_connection, ij, ii, old_table_name = None):
logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - Create new table {new_table_name}")
await table_cursor.execute(f"DROP TABLE IF EXISTS {new_table_name}")
await table_connection.commit()
if not old_table_name:
await table_cursor.execute(f"CREATE TABLE {new_table_name}")
await table_connection.commit()
else:
await table_cursor.execute(f"CREATE TABLE {new_table_name} AS SELECT * FROM {old_table_name} WHERE 0")
await table_connection.commit()
async def remove_db_columns(table_name, table_cursor, table_connection, remove_columns, ij, ii):
if not isinstance(remove_columns, list) and not isinstance(remove_columns, tuple): remove_columns = [remove_columns]
for remove_column in remove_columns:
logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - Remove column {remove_column}")
await table_cursor.execute(f"ALTER TABLE {table_name} DROP COLUMN {remove_column}")
await table_connection.commit()
async def add_db_columns(table_name, table_cursor, table_connection, add_columns, ij, ii):
if not isinstance(add_columns, list) and not isinstance(add_columns, tuple): add_columns = [add_columns]
for new_column in add_columns:
logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - Add column {new_column}")
await table_cursor.execute(f"ALTER TABLE {table_name} ADD COLUMN {new_column} TEXT")
await table_connection.commit()
async def get_no_rows_db(db_table_connection, table_name, ij, ii):
async with db_table_connection.execute(f'SELECT * FROM {table_name}') as cursor:
return len( await cursor.fetchall())
async def get_no_columns_table(db_table_connection, table_name, ij, ii):
async with db_table_connection.execute(f'SELECT * FROM {table_name}') as cursor:
columns = [description[0] for description in cursor.description]
return len(columns)
### Database initialization for main()
async def main_db(db_table_cursor, db_table_connection, settings): #(new_table_name, table_cursor, table_connection, ij, ii, j, old_table_name = None)
await create_db_table_from_table(settings['new_table_name'], db_table_cursor, db_table_connection, ij, ii, settings['old_table_name'])
await remove_db_columns(settings['new_table_name'], db_table_cursor, db_table_connection, settings['remove_columns'], ij, ii)
await add_db_columns(settings['new_table_name'], db_table_cursor, db_table_connection, settings['add_columns'], ij, ii)
no_rows_old_table = await get_no_rows_db(db_table_connection, settings['old_table_name'], ij, ii)
no_columns_new_table = await get_no_columns_table(db_table_connection, settings['new_table_name'], ij, ii)
return no_rows_old_table, no_columns_new_table
async def get_aiosqlite_connection(db_filename):
connection = await aiosqlite.connect(db_filename)
return connection
### main loop
async def main(db_filename, ij, ii):
# Initialize parameters
logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - Starting initialize parameters for {db_filename}")
settings = initializize()
logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - Finished initialize parameters for {db_filename}")
# Initialize database
logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - Starting initialize database for {db_filename}")
db_connection = await get_aiosqlite_connection(db_filename)
db_cursor = await db_connection.cursor()
j, no_columns = await main_db(db_cursor, db_connection, settings)
settings['no_columns'] = no_columns
logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - Finished initialize database for {db_filename}")
logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - Starting tasks for {db_filename}")
await etl(db_connection, settings, ij, ii, j)
# Test without multiprocessing
#i = 0
#db_connection.row_factory = aiosqlite.Row
#async with db_connection.execute(f'SELECT * FROM {settings["old_table_name"]}') as cursor:
# async for record in cursor:
# if i > 100: break
# i += 1
# record = dict(record)
# record['temp_id'] = i
# logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - Extracted row {i}/{j}")
# result = main_data(record, settings, ij, ii, j)
# await load( [result], db_connection, settings, ij, ii, j)
logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - Finished tasks for {db_filename}")
await db_connection.close()
### startup
if __name__ == "__main__":
program_start_time = datetime.datetime.now()
logging.basicConfig(level=logging.INFO,
handlers=[
logging.FileHandler(f'debug_correct_translate_lemma_{program_start_time.strftime("%Y_%m_%d_%H_%M")}.txt',
'a', 'utf-8'), logging.StreamHandler()])
install_mp_handler()
logging.info(f"{datetime.datetime.now()} - {__file__} started at {program_start_time}")
#linux mypath = '/media/vlad/HDD1000/disertatie/extracted_original_1/test1'
#mypath = 'C:/Users/Vlad/PycharmProjects/newsScrapping/database'
mypath = 'D:/disertatie/cod/newsScrapping/database'
filenames = next(os.walk(mypath), (None, None, []))[2] # [] if no file
filepaths = [os.path.join(dirpath,f) for (dirpath, dirnames, filenames) in os.walk(mypath) for f in filenames]
dbfiles = filepaths
ii = 0
ij = len(dbfiles)
for dbfile in dbfiles:
ii += 1
keyword_start_time = datetime.datetime.now()
logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - {dbfile} started at {keyword_start_time}")
asyncio.run(main(dbfile, ij, ii))
keyword_end_time = datetime.datetime.now()
keyword_elapsed_time = keyword_end_time - keyword_start_time
logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - {dbfile} started at {keyword_start_time}")
logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - {dbfile} ended at {keyword_end_time}")
logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - {dbfile} executed in {keyword_elapsed_time}")
program_end_time = datetime.datetime.now()
program_elapsed_time = program_end_time - program_start_time
logging.info(f"{datetime.datetime.now()} - program started at {program_start_time}")
logging.info(f"{datetime.datetime.now()} - program ended at {program_end_time}")
logging.info(f"{datetime.datetime.now()} - program executed in {program_elapsed_time}")
问题在于它只将大约 10% 的行插入回数据库(例如,当队列的 max_size 为 8 时,28620 行中有 3577 行)。 我也尝试过没有队列的 max_size 参数,但仍然没有返回所有行。 当我将其设置为 1 时,将返回所有行,但时间几乎与运行代码而不进行多重处理的时间相同......
不带多进程数据库 1:0:08:58.806576 101 行 无多进程 db 2:0:13:52.000334 101 行 不带多进程数据库 3:0:02:49.493299 101 行 使用多进程数据库 1:0:08:12.181982 101 行 使用多进程数据库 2:0:10:40.637731 101 行 使用多进程数据库 3:0:06:35.342060 101 行
问题是什么,我可以做些什么来减少使用多进程的时间?队列有什么问题?
答: 暂无答案
评论