在 Snowflake 存储过程中遇到 Kafka 使用者出现“[Errno 16] 设备或资源繁忙”错误

Encountering "[Errno 16] Device or Resource Busy" Error with Kafka Consumer in Snowflake Stored Procedure

提问人:Evangelos Malandrakis 提问时间:11/16/2023 最后编辑:OneCricketeerEvangelos Malandrakis 更新时间:11/17/2023 访问量:12

问:

目标

目标是创建一个 Snowflake 存储过程,该过程每天使用 Kafka 主题中的消息一次,处理这些消息,然后将处理后的数据加载到 Snowflake 表中以供进一步分析。

错误描述:

遇到的错误是 。它专门发生在 Kafka 使用者代码中的行。只有在 Snowflake 中运行代码时才会出现此问题;相同的代码在我的本地环境中成功执行。[Errno 16] Device or resource busymessages = consumer.poll(timeout_ms=1000)

代码片段:

from kafka import KafkaConsumer
 
# Kafka consumer configuration
consumer_config = {
    'bootstrap_servers': 'XXX',
    'security_protocol': 'SASL_SSL',
    'sasl_mechanism': 'PLAIN',
    'sasl_plain_username': 'XXX',
    'sasl_plain_password': "XXX",
    'client_id': 'snowflake-dev-client',
    'group_id': 'ZZZ',
    'auto_offset_reset': 'earliest',
    'enable_auto_commit': True
}
 
# Initialize Kafka consumer
consumer = KafkaConsumer(
    "all",
    **consumer_config
)
 
message_list = []
MAX_EMPTY_POLLS = 5  # Maximum number of empty polls before breaking the loop
empty_polls = 0
 
try:
    while empty_polls < MAX_EMPTY_POLLS:
       # Poll for messages
        messages = consumer.poll(timeout_ms=1000)  # Timeout in milliseconds
        if not messages:
            empty_polls += 1
            time.sleep(1)  # Optional: sleep to avoid tight looping
            continue
 
        empty_polls = 0  # Reset empty poll count if messages are found
 
        for tp, msgs in messages.items():
            for message in msgs:
                # Process each message
                msg_decoded = message.value.decode('utf-8')
                msg_json = json.loads(msg_decoded)
                if 'YYY' in msg_json['event_name']:
                    message_list.append(add_values_to_list(msg_json))
 
except Exception as e:
    print(f"Error: {e}")
finally:
    # Close the consumer
    consumer.close()

已检查的可能错误

  • 网络规则:已验证与外部访问集成关联的网络规则。与 Kafka 引导服务器的连接成功,表示网络设置正确。
  • 身份验证密钥:还检查了用于 Kafka 身份验证的密钥。连接成功将确认其有效性。

问题:

  1. Snowflake 存储过程中的外部连接(如 Kafka)是否存在已知限制或特殊配置?
  2. 是否有人在 Snowflake 存储过程中成功使用了 Kafka 使用者,如果有,您如何管理连接冲突或库兼容性?
  3. 对为什么此错误可能专门发生在 Snowflake 环境中的任何见解?
apache-kafka snowflake-cloud-data-platform kafka-python

评论

0赞 Sergiu 11/17/2023
您是如何在 Snowflake 方面导入库的?我之所以问,是因为这不是 Snowflake Anaconda 频道下的图书馆。导入成功了吗?
0赞 Evangelos Malandrakis 11/20/2023
Snowflake 有一个新的实验性功能。在这里查看: docs.snowflake.com/en/developer-guide/snowpark/reference/python/...
0赞 Sergiu 11/20/2023
您是否有可以向我提供的特定查询 ID,以便我检查发生了什么?如果您愿意,可以通过私人消息提供查询 ID。
0赞 Evangelos Malandrakis 11/27/2023
我回复了 Snowflake 社区帖子@Sergiu,谢谢。

答: 暂无答案