提问人:Evangelos Malandrakis 提问时间:11/16/2023 最后编辑:OneCricketeerEvangelos Malandrakis 更新时间:11/17/2023 访问量:12
在 Snowflake 存储过程中遇到 Kafka 使用者出现“[Errno 16] 设备或资源繁忙”错误
Encountering "[Errno 16] Device or Resource Busy" Error with Kafka Consumer in Snowflake Stored Procedure
问:
目标
目标是创建一个 Snowflake 存储过程,该过程每天使用 Kafka 主题中的消息一次,处理这些消息,然后将处理后的数据加载到 Snowflake 表中以供进一步分析。
错误描述:
遇到的错误是 。它专门发生在 Kafka 使用者代码中的行。只有在 Snowflake 中运行代码时才会出现此问题;相同的代码在我的本地环境中成功执行。[Errno 16] Device or resource busy
messages = consumer.poll(timeout_ms=1000)
代码片段:
from kafka import KafkaConsumer
# Kafka consumer configuration
consumer_config = {
'bootstrap_servers': 'XXX',
'security_protocol': 'SASL_SSL',
'sasl_mechanism': 'PLAIN',
'sasl_plain_username': 'XXX',
'sasl_plain_password': "XXX",
'client_id': 'snowflake-dev-client',
'group_id': 'ZZZ',
'auto_offset_reset': 'earliest',
'enable_auto_commit': True
}
# Initialize Kafka consumer
consumer = KafkaConsumer(
"all",
**consumer_config
)
message_list = []
MAX_EMPTY_POLLS = 5 # Maximum number of empty polls before breaking the loop
empty_polls = 0
try:
while empty_polls < MAX_EMPTY_POLLS:
# Poll for messages
messages = consumer.poll(timeout_ms=1000) # Timeout in milliseconds
if not messages:
empty_polls += 1
time.sleep(1) # Optional: sleep to avoid tight looping
continue
empty_polls = 0 # Reset empty poll count if messages are found
for tp, msgs in messages.items():
for message in msgs:
# Process each message
msg_decoded = message.value.decode('utf-8')
msg_json = json.loads(msg_decoded)
if 'YYY' in msg_json['event_name']:
message_list.append(add_values_to_list(msg_json))
except Exception as e:
print(f"Error: {e}")
finally:
# Close the consumer
consumer.close()
已检查的可能错误
- 网络规则:已验证与外部访问集成关联的网络规则。与 Kafka 引导服务器的连接成功,表示网络设置正确。
- 身份验证密钥:还检查了用于 Kafka 身份验证的密钥。连接成功将确认其有效性。
问题:
- Snowflake 存储过程中的外部连接(如 Kafka)是否存在已知限制或特殊配置?
- 是否有人在 Snowflake 存储过程中成功使用了 Kafka 使用者,如果有,您如何管理连接冲突或库兼容性?
- 对为什么此错误可能专门发生在 Snowflake 环境中的任何见解?
答: 暂无答案
评论