Python 多线程块意外

Python multithreading blocks unexpectedly

提问人:WolfiG 提问时间:9/1/2023 更新时间:9/1/2023 访问量:20

问:

我正在开发一个 Python 粒子群优化器应用程序,它与 Kafka 代理交换消息。我想向代理发送一条消息,另一个应用程序(将是 JAVA)接收消息,处理它并返回结果。python 应用程序需要等到结果可用。目前,代码如下所示:

class KafkaConnector(Thread):
  __consumption_thread = None
  __kafka_consumer: KafkaConsumer = None
  __kafka_producer: KafkaProducer = None
  __pso_optimizer: GlobalBestPSO = None
  __device_names: np.array = None

  def __init__(self):
    Thread.__init__(self)
    self.__kafka_consumer = KafkaConsumer('A_TOPIC', bootstrap_servers=['my.kafka.server:9092'])
    self.__kafka_producer = KafkaProducer(bootstrap_servers=['my.kafka.server:9092'])
    self.__result_received = Event()

  def consume_messages(self):
    assert self.__kafka_consumer is not None
    self.__kafka_consumer.subscribe(['A_TOPIC'])
    assert self.__kafka_consumer.subscription() is not None

    self.__consumption_thread = KafkaListener(self)
    self.__consumption_thread.start()
    print('[INFO] Kafka consumer thread started')

  def stop_listener(self):
    self.__consumption_thread.stop_consumer()
    self.__kafka_consumer.unsubscribe()
    self.__kafka_consumer.close()

  def message_received(self, message: ConsumerRecord):
  '''
  Method switching message content by use case
  '''
    print('[INFO] Received message:', message)
    if ('mode', b'INIT') in message.headers: # INIT comes from device automator
        self.init_pso(message.value) <--- this call works as expected
    elif ('mode', b'RESULT') in message.headers: # RESULT comes from device automator
        print('perform step')
    elif ('mode', b'SET') in message.headers: # SET is sent by PSO
        print('SET received')
        self.__result_received.set()
    else:
        print('unknown command', message.headers)

  def send_message(self, message_content, headers: list):
    self.__kafka_producer.send(topic='A_TOPIC', value=message_content, headers=headers)

  def init_pso(self, json_string: str):

    [...]

    self.__pso_optimizer = GlobalBestPSO(n_particles=10 * parameter_count, dimensions=parameter_count,
                                         options=options, bounds=bounds)

    # start optimization run
    best_cost, best_position = self.__pso_optimizer.optimize(self.objective_function, iters=100)
    print(best_position, best_cost)

  def objective_function(self, new_settings):

    result = np.empty(len(new_settings))

    for i in range(len(new_settings)):
        setting_dict = []
        result[i] = 0

        for j in range(len(self.__device_names)):
            parameter_value = {self.__device_names[j]: new_settings[i][j]}
            setting_dict.append(parameter_value)

        # send new setting values to Kafka / LSA client
        self.__kafka_producer.send('A_TOPIC', value=json.dumps(setting_dict).encode('UTF-8'),
                                   headers=[('mode','SET'.encode('UTF-8'))])
        # wait for result to return
        if self.__result_received.is_set():
            self.__result_received.clear()

        self.__result_received.wait() <--- this wait blocks everything, including KafkaListener::run() without wait(), the code executes as expected
        # append to result array

        #result[i] += new_settings[i][j]

    return result


def get_optimizer(self):
    return self.__pso_optimizer


'''
This is a class supposed to listen in a parallel thread to messages coming from Kafka
'''
class KafkaListener(Thread):
  __stop_event: Event = None
  __caller: KafkaConnector = None

  def __init__(self, caller):
    Thread.__init__(self)
    self.__caller = caller
    self.setDaemon(True)
    self.__stop_event = Event()

  def run(self):
    while not self.__stop_event.is_set():
        for message in self.__caller.getKafkaConsumer():
            print('[INFO] KafkaListener received:', message)
            self.__caller.message_received(message)
            if self.__stop_event.is_set():
                break

  def stop_consumer(self):
    self.__stop_event.set()

最终,我想等待通过 Kafka 获得结果,然后继续下一个循环

Python 多线程事件

评论

0赞 jsbueno 9/6/2023
所以。什么块?设置和运行 Thread 实例的代码在哪里?它在什么情况下会阻止?这个问题并不是真的可以理解。
0赞 jsbueno 9/6/2023
顺便说一句,这很少是“Pythonic”:没有必要在 Python 中“声明变量”只是为了声明它们。此外,“私有属性”的前缀资源并不是真正的东西,它只是使代码更难阅读。Python 没有私有属性是有原因的:公共属性通常也能正常工作。(尽管在这种情况下,由于您正在扩展 Thread 类,因此可能会发生名称冲突;避免它的首选方法是:不要继承“Thread”,只需创建一个以工作方法为目标的“Thread”实例。__

答: 暂无答案