如何检查我的 python 代码中是否存在列,而不是 SQL?

How do I check if a column exists in my python code, instead of SQL?

提问人:Lilcodemuffin 提问时间:11/17/2023 更新时间:11/17/2023 访问量:50

问:

这是我的代码:

import pyodbc
import logging
import json
import pandas as pd
import sqlalchemy as sa
import warnings

def read_query():
    logger = logging.getLogger()
    logger.setLevel(logging.INFO)

    with open(r'upsert_config.json','r') as ts:
        config = json.load(ts)

    source_driver = config['source_database_params']['source_driver']
    source_server = config['source_database_params']['source_server']
    source_database = config['source_database_params']['source_database']
    target_driver = config['target_database_params']['target_driver']
    target_server = config ['target_database_params']['target_server']
    target_database = config['target_database_params']['target_database']


    conn = pyodbc.connect(
    f'Driver={source_driver};'
    f'Server={source_server};'
    f'Database={source_database};'
    f'Driver={target_driver};'
    f'Server={target_server};'
    f'Database={target_database};'
    f'Trusted_Connection=yes;'
    f'MARS_Connection=Yes'
    )

    try:
        source_database=f'{source_database}.dbo'
        cursor = conn.cursor()
        read_table_query="""        
        SELECT distinct table_name
        FROM information_schema.columns
        WHERE COLUMN_NAME in ('PRCS_DTE', 'EFF_DTE', 'PRCS_RUN_DTE', 'reportDate', 'DateofData', 'InsertDate')
        ORDER BY table_name asc
            """
        cursor.execute(read_table_query)  

        logger.info("Successfully connected to database")

    except Exception as e:
        logger.error("Unable to connect to database: %s", str(e))    


    for tables in cursor.fetchall():
        tab = tables[0]
        select_data_query1 = f'SELECT * FROM {source_database}.{tab} WHERE PRCS_DTE > DATEADD(day, -2, CONVERT(date, SYSDATETIME()));'
        select_data_query2 = f'SELECT * FROM {source_database}.{tab} WHERE EFF_DTE > DATEADD(day, -2, CONVERT(date, SYSDATETIME()));'
        try:
            df=pd.read_sql(select_data_query1, conn, chunksize=10000)
            df2=pd.read_sql(select_data_query2, conn, chunksize=10000)
            warnings.filterwarnings("ignore")
        except Exception as e:
            logger.exception(e)
            continue       
        
        try:
            engine = sa.create_engine(f'mssql+pyodbc://@{target_server}/{target_database}?trusted_connection=yes&driver={target_driver}')

            for chunk_dataframe in df,df2:
                rowcount = chunk_dataframe.to_sql(f'{tab}', engine, if_exists='append', index=False, method='multi', chunksize=10)
                warnings.filterwarnings("ignore")
                print("{} Records inserted ".format(rowcount) + f"into {tab}")
                engine.dispose()

        except Exception as e:
            logging.exception(e)


           
read_query()

我正在尝试选择所有表,其中包含我在 .之后,我想从那些数据< 2 天的表中选择数据。然后将该表中的数据插入到该表的副本中。该代码有效,除非我尝试从包含不存在的列的表中选择数据。read_table_query

因此,如果我只在我的中使用 1 列,代码就可以工作,但是当我添加多列时,我收到错误:read_table_query

Invalid column name 'PRCS_DTE'

Invalid column name 'EFF_DTE'

Invalid column name 'PRCS_RUN_DTE'等等......

我尝试添加另一个名为读取的数据帧,但在没有某些列的表上出现列名无效的错误,然后代码在其他具有列的表上运行良好。我可以只对我的数据帧列执行操作,还是应该忽略从某些表中不存在的列中提取的错误?df2select_data_query2if...else...

python sql-server 熊猫

评论

0赞 siggemannen 11/17/2023
如果某些列 X 不存在,您要做什么?是否要选择 * 而不在何处?而且,你整个星期都没有问过这个问题的变体吗?
0赞 Xedni 11/17/2023
您不是在检查包含所有这些列的表,而是在检查包含这些列中的任何一列的表。按架构和表进行分组,然后添加having count(1) = {however many columns you want to be assured are there}
1赞 Xedni 11/17/2023
帮助吸血鬼?

答:

0赞 Corralien 11/17/2023 #1

我不完全理解你的问题。但是,您可以在查询和使用语句之前提取列名:if...else

尝试:

for col in cursor.columns(table=table_name):
    print(col.column_name)