提问人:Pascal 提问时间:5/3/2023 更新时间:5/4/2023 访问量:64
Python 线程中的垃圾回收
garbage collection in python threading
问:
当实现一个旨在定期从流中读取的线程时,我无法设法使线程正确停止。只有当我使用的回调函数被实现为代理 () 的方法时,才会出现这种情况。请参阅此示例 (python v3.10.11):Worker
import threading
from time import sleep
import weakref
class Consumer(threading.Thread):
"""This class periodically reads from a stream."""
def __init__(self, stream_key, callback):
super().__init__()
self._stream_key: str = stream_key
self._handlers = {callback}
self._running = True
def run(self):
"""Poll the event stream and call each handler with each event item returned."""
counter = 0
while self._running:
for number, handler in enumerate(self._handlers):
handler(number, counter)
print("reading from stream: ", self._stream_key)
counter += 1
sleep(2)
def stop(self):
"""Stop polling the event stream."""
self._running = False
self.join()
def start(self) -> None:
self._running = True
return super().start()
def add_handler(self, callback):
self._handlers.add(callback)
def remove_handler(self, callback):
self._handlers.remove(callback)
class EventHandler:
def __init__(self):
self.consumers = weakref.WeakValueDictionary()
def subscribe(self, stream_key: str, callback):
if stream_key in self.consumers:
self.consumers[stream_key].add_handler(callback)
else:
consumer = Consumer(stream_key=stream_key, callback=callback)
self.consumers[stream_key] = consumer
self.consumers[stream_key].start()
def __del__(self):
for consumer in self.consumers.values():
consumer.stop()
class Worker:
def __init__(self) -> None:
self._eventhandler = EventHandler()
self.registered = False
self._subscriptions = {("test-stream-key", self.handlerfunc)}
def register(self):
self._start_listeners()
self.registered = True
def _start_listeners(self):
for subscription in self._subscriptions:
self._eventhandler.subscribe(*subscription)
def handlerfunc(self, number, counter):
print(f"handler {number} doing things, counting: {counter}")
worker = Worker()
worker.register()
del worker
它不断产生输出,例如
reading from stream: test-stream-key
handler 0 doing things, counting: 1
reading from stream: test-stream-key
handler 0 doing things, counting: 2
...
在命令之后,我希望垃圾回收发挥其魔力,从而停止代理(包括那个也有一个方法)。del
EventHandler
__del__
有趣的是,如果我没有将 定义为 但在全局范围内的方法,这可以正常工作:handlerfunc
Worker
import threading
from time import sleep
import weakref
class Consumer(threading.Thread):
"""This class periodically reads from a stream."""
def __init__(self, stream_key, callback):
super().__init__()
self._stream_key: str = stream_key
self._handlers = {callback}
self._running = True
def run(self):
"""Poll the event stream and call each handler with each event item returned."""
counter = 0
while self._running:
for number, handler in enumerate(self._handlers):
handler(number, counter)
print("reading from stream: ", self._stream_key)
counter += 1
sleep(2)
def stop(self):
"""Stop polling the event stream."""
self._running = False
self.join()
def start(self) -> None:
self._running = True
return super().start()
def add_handler(self, callback):
self._handlers.add(callback)
def remove_handler(self, callback):
self._handlers.remove(callback)
class EventHandler:
def __init__(self):
self.consumers = weakref.WeakValueDictionary()
def subscribe(self, stream_key: str, callback):
if stream_key in self.consumers:
self.consumers[stream_key].add_handler(callback)
else:
consumer = Consumer(stream_key=stream_key, callback=callback)
self.consumers[stream_key] = consumer
self.consumers[stream_key].start()
def __del__(self):
for consumer in self.consumers.values():
consumer.stop()
class Worker:
def __init__(self) -> None:
self._eventhandler = EventHandler()
self.registered = False
self._subscriptions = {("test-stream-key", handlerfunc)}
def register(self):
self._start_listeners()
self.registered = True
def _start_listeners(self):
for subscription in self._subscriptions:
self._eventhandler.subscribe(*subscription)
def handlerfunc(number, counter):
print(f"handler {number} doing things, counting: {counter}")
worker = Worker()
worker.register()
del worker
在这种情况下,它会在一条消息后停止,或多或少会立即停止。这也是我对类范围方法的期望。
这是怎么回事?使用正确吗?(显然不是)但至少使用正确的想法吗?weakref.WeakValueDictionary()
weakref
答: 暂无答案
评论
Event
.stop()
self._running
False
__del__