提问人:WolfiG 提问时间:9/1/2023 更新时间:9/1/2023 访问量:20
Python 多线程块意外
Python multithreading blocks unexpectedly
问:
我正在开发一个 Python 粒子群优化器应用程序,它与 Kafka 代理交换消息。我想向代理发送一条消息,另一个应用程序(将是 JAVA)接收消息,处理它并返回结果。python 应用程序需要等到结果可用。目前,代码如下所示:
class KafkaConnector(Thread):
__consumption_thread = None
__kafka_consumer: KafkaConsumer = None
__kafka_producer: KafkaProducer = None
__pso_optimizer: GlobalBestPSO = None
__device_names: np.array = None
def __init__(self):
Thread.__init__(self)
self.__kafka_consumer = KafkaConsumer('A_TOPIC', bootstrap_servers=['my.kafka.server:9092'])
self.__kafka_producer = KafkaProducer(bootstrap_servers=['my.kafka.server:9092'])
self.__result_received = Event()
def consume_messages(self):
assert self.__kafka_consumer is not None
self.__kafka_consumer.subscribe(['A_TOPIC'])
assert self.__kafka_consumer.subscription() is not None
self.__consumption_thread = KafkaListener(self)
self.__consumption_thread.start()
print('[INFO] Kafka consumer thread started')
def stop_listener(self):
self.__consumption_thread.stop_consumer()
self.__kafka_consumer.unsubscribe()
self.__kafka_consumer.close()
def message_received(self, message: ConsumerRecord):
'''
Method switching message content by use case
'''
print('[INFO] Received message:', message)
if ('mode', b'INIT') in message.headers: # INIT comes from device automator
self.init_pso(message.value) <--- this call works as expected
elif ('mode', b'RESULT') in message.headers: # RESULT comes from device automator
print('perform step')
elif ('mode', b'SET') in message.headers: # SET is sent by PSO
print('SET received')
self.__result_received.set()
else:
print('unknown command', message.headers)
def send_message(self, message_content, headers: list):
self.__kafka_producer.send(topic='A_TOPIC', value=message_content, headers=headers)
def init_pso(self, json_string: str):
[...]
self.__pso_optimizer = GlobalBestPSO(n_particles=10 * parameter_count, dimensions=parameter_count,
options=options, bounds=bounds)
# start optimization run
best_cost, best_position = self.__pso_optimizer.optimize(self.objective_function, iters=100)
print(best_position, best_cost)
def objective_function(self, new_settings):
result = np.empty(len(new_settings))
for i in range(len(new_settings)):
setting_dict = []
result[i] = 0
for j in range(len(self.__device_names)):
parameter_value = {self.__device_names[j]: new_settings[i][j]}
setting_dict.append(parameter_value)
# send new setting values to Kafka / LSA client
self.__kafka_producer.send('A_TOPIC', value=json.dumps(setting_dict).encode('UTF-8'),
headers=[('mode','SET'.encode('UTF-8'))])
# wait for result to return
if self.__result_received.is_set():
self.__result_received.clear()
self.__result_received.wait() <--- this wait blocks everything, including KafkaListener::run() without wait(), the code executes as expected
# append to result array
#result[i] += new_settings[i][j]
return result
def get_optimizer(self):
return self.__pso_optimizer
'''
This is a class supposed to listen in a parallel thread to messages coming from Kafka
'''
class KafkaListener(Thread):
__stop_event: Event = None
__caller: KafkaConnector = None
def __init__(self, caller):
Thread.__init__(self)
self.__caller = caller
self.setDaemon(True)
self.__stop_event = Event()
def run(self):
while not self.__stop_event.is_set():
for message in self.__caller.getKafkaConsumer():
print('[INFO] KafkaListener received:', message)
self.__caller.message_received(message)
if self.__stop_event.is_set():
break
def stop_consumer(self):
self.__stop_event.set()
最终,我想等待通过 Kafka 获得结果,然后继续下一个循环
答: 暂无答案
评论
__