agency.spaces.local_space
1import multiprocessing 2import queue 3import threading 4from concurrent.futures import Future 5 6from agency.logger import log 7from agency.queue import Queue 8from agency.resources import ResourceManager 9from agency.schema import Message 10from agency.space import Space 11 12 13class _LocalQueue(Queue): 14 """A multiprocessing based implementation of Queue""" 15 16 def __init__(self, outbound_message_event: multiprocessing.Event = None): 17 self.outbound_message_event = outbound_message_event 18 self._queue = ResourceManager().multiprocessing_manager.Queue() 19 20 def put(self, message: Message): 21 self._queue.put(message) 22 if self.outbound_message_event is not None: 23 self.outbound_message_event.set() 24 25 def get(self, block: bool = True, timeout: float = None) -> Message: 26 return self._queue.get(block=block, timeout=timeout) 27 28 29class LocalSpace(Space): 30 """ 31 A LocalSpace allows Agents to communicate within the python application 32 """ 33 34 def __init__(self, max_workers=None): 35 super().__init__() 36 self._stop_router_event: threading.Event = threading.Event() 37 self._outbound_message_event: multiprocessing.Event = ResourceManager(max_workers 38 ).multiprocessing_manager.Event() 39 self._router_future: Future = self._start_router_thread() 40 41 def destroy(self): 42 self._stop_router_thread() 43 super().destroy() 44 45 def _start_router_thread(self): 46 def _router_thread(): 47 """Routes outbound messages""" 48 log("debug", "LocalSpace: router thread starting") 49 while not self._stop_router_event.is_set(): 50 self._outbound_message_event.wait(timeout=0.1) 51 if self._stop_router_event.is_set(): 52 log("debug", "LocalSpace: router thread stopping") 53 break 54 self._outbound_message_event.clear() 55 # drain each outbound queue 56 processors = list(self.processors.values()) 57 for processor in processors: 58 outbound_queue = processor.outbound_queue 59 while True: 60 try: 61 message = outbound_queue.get(block=False) 62 log("debug", f"LocalSpace: routing message", message) 63 recipient_processors = [ 64 processor for processor in processors 65 if message["to"] == processor.agent_id or message["to"] == "*" 66 ] 67 for recipient_processor in recipient_processors: 68 recipient_processor.inbound_queue.put(message) 69 except queue.Empty: 70 break 71 log("debug", "LocalSpace: router thread stopped") 72 73 return ResourceManager().thread_pool_executor.submit(_router_thread) 74 75 def _stop_router_thread(self): 76 self._stop_router_event.set() 77 self._router_future.result() 78 79 def _create_inbound_queue(self, agent_id) -> Queue: 80 return _LocalQueue() 81 82 def _create_outbound_queue(self, agent_id) -> Queue: 83 return _LocalQueue(outbound_message_event=self._outbound_message_event)
30class LocalSpace(Space): 31 """ 32 A LocalSpace allows Agents to communicate within the python application 33 """ 34 35 def __init__(self, max_workers=None): 36 super().__init__() 37 self._stop_router_event: threading.Event = threading.Event() 38 self._outbound_message_event: multiprocessing.Event = ResourceManager(max_workers 39 ).multiprocessing_manager.Event() 40 self._router_future: Future = self._start_router_thread() 41 42 def destroy(self): 43 self._stop_router_thread() 44 super().destroy() 45 46 def _start_router_thread(self): 47 def _router_thread(): 48 """Routes outbound messages""" 49 log("debug", "LocalSpace: router thread starting") 50 while not self._stop_router_event.is_set(): 51 self._outbound_message_event.wait(timeout=0.1) 52 if self._stop_router_event.is_set(): 53 log("debug", "LocalSpace: router thread stopping") 54 break 55 self._outbound_message_event.clear() 56 # drain each outbound queue 57 processors = list(self.processors.values()) 58 for processor in processors: 59 outbound_queue = processor.outbound_queue 60 while True: 61 try: 62 message = outbound_queue.get(block=False) 63 log("debug", f"LocalSpace: routing message", message) 64 recipient_processors = [ 65 processor for processor in processors 66 if message["to"] == processor.agent_id or message["to"] == "*" 67 ] 68 for recipient_processor in recipient_processors: 69 recipient_processor.inbound_queue.put(message) 70 except queue.Empty: 71 break 72 log("debug", "LocalSpace: router thread stopped") 73 74 return ResourceManager().thread_pool_executor.submit(_router_thread) 75 76 def _stop_router_thread(self): 77 self._stop_router_event.set() 78 self._router_future.result() 79 80 def _create_inbound_queue(self, agent_id) -> Queue: 81 return _LocalQueue() 82 83 def _create_outbound_queue(self, agent_id) -> Queue: 84 return _LocalQueue(outbound_message_event=self._outbound_message_event)
A LocalSpace allows Agents to communicate within the python application
LocalSpace(max_workers=None)
35 def __init__(self, max_workers=None): 36 super().__init__() 37 self._stop_router_event: threading.Event = threading.Event() 38 self._outbound_message_event: multiprocessing.Event = ResourceManager(max_workers 39 ).multiprocessing_manager.Event() 40 self._router_future: Future = self._start_router_thread()
def destroy(self):
Cleans up resources used by this space.
Subclasses should call super().destroy() when overriding.