提问人:Lilcodemuffin 提问时间:11/17/2023 更新时间:11/17/2023 访问量:50
如何检查我的 python 代码中是否存在列,而不是 SQL?
How do I check if a column exists in my python code, instead of SQL?
问:
这是我的代码:
import pyodbc
import logging
import json
import pandas as pd
import sqlalchemy as sa
import warnings
def read_query():
logger = logging.getLogger()
logger.setLevel(logging.INFO)
with open(r'upsert_config.json','r') as ts:
config = json.load(ts)
source_driver = config['source_database_params']['source_driver']
source_server = config['source_database_params']['source_server']
source_database = config['source_database_params']['source_database']
target_driver = config['target_database_params']['target_driver']
target_server = config ['target_database_params']['target_server']
target_database = config['target_database_params']['target_database']
conn = pyodbc.connect(
f'Driver={source_driver};'
f'Server={source_server};'
f'Database={source_database};'
f'Driver={target_driver};'
f'Server={target_server};'
f'Database={target_database};'
f'Trusted_Connection=yes;'
f'MARS_Connection=Yes'
)
try:
source_database=f'{source_database}.dbo'
cursor = conn.cursor()
read_table_query="""
SELECT distinct table_name
FROM information_schema.columns
WHERE COLUMN_NAME in ('PRCS_DTE', 'EFF_DTE', 'PRCS_RUN_DTE', 'reportDate', 'DateofData', 'InsertDate')
ORDER BY table_name asc
"""
cursor.execute(read_table_query)
logger.info("Successfully connected to database")
except Exception as e:
logger.error("Unable to connect to database: %s", str(e))
for tables in cursor.fetchall():
tab = tables[0]
select_data_query1 = f'SELECT * FROM {source_database}.{tab} WHERE PRCS_DTE > DATEADD(day, -2, CONVERT(date, SYSDATETIME()));'
select_data_query2 = f'SELECT * FROM {source_database}.{tab} WHERE EFF_DTE > DATEADD(day, -2, CONVERT(date, SYSDATETIME()));'
try:
df=pd.read_sql(select_data_query1, conn, chunksize=10000)
df2=pd.read_sql(select_data_query2, conn, chunksize=10000)
warnings.filterwarnings("ignore")
except Exception as e:
logger.exception(e)
continue
try:
engine = sa.create_engine(f'mssql+pyodbc://@{target_server}/{target_database}?trusted_connection=yes&driver={target_driver}')
for chunk_dataframe in df,df2:
rowcount = chunk_dataframe.to_sql(f'{tab}', engine, if_exists='append', index=False, method='multi', chunksize=10)
warnings.filterwarnings("ignore")
print("{} Records inserted ".format(rowcount) + f"into {tab}")
engine.dispose()
except Exception as e:
logging.exception(e)
read_query()
我正在尝试选择所有表,其中包含我在 .之后,我想从那些数据< 2 天的表中选择数据。然后将该表中的数据插入到该表的副本中。该代码有效,除非我尝试从包含不存在的列的表中选择数据。read_table_query
因此,如果我只在我的中使用 1 列,代码就可以工作,但是当我添加多列时,我收到错误:read_table_query
Invalid column name 'PRCS_DTE'
或
Invalid column name 'EFF_DTE'
或
Invalid column name 'PRCS_RUN_DTE'
等等......
我尝试添加另一个名为读取的数据帧,但在没有某些列的表上出现列名无效的错误,然后代码在其他具有列的表上运行良好。我可以只对我的数据帧列执行操作,还是应该忽略从某些表中不存在的列中提取的错误?df2
select_data_query2
if...else...
答:
0赞
Corralien
11/17/2023
#1
我不完全理解你的问题。但是,您可以在查询和使用语句之前提取列名:if...else
尝试:
for col in cursor.columns(table=table_name):
print(col.column_name)
评论
having count(1) = {however many columns you want to be assured are there}