agency.spaces.local_space

 1import multiprocessing
 2import queue
 3import threading
 4from concurrent.futures import Future
 5
 6from agency.logger import log
 7from agency.queue import Queue
 8from agency.resources import ResourceManager
 9from agency.schema import Message
10from agency.space import Space
11
12
13class _LocalQueue(Queue):
14    """A multiprocessing based implementation of Queue"""
15
16    def __init__(self, outbound_message_event: multiprocessing.Event = None):
17        self.outbound_message_event = outbound_message_event
18        self._queue = ResourceManager().multiprocessing_manager.Queue()
19
20    def put(self, message: Message):
21        self._queue.put(message)
22        if self.outbound_message_event is not None:
23            self.outbound_message_event.set()
24
25    def get(self, block: bool = True, timeout: float = None) -> Message:
26        return self._queue.get(block=block, timeout=timeout)
27
28
29class LocalSpace(Space):
30    """
31    A LocalSpace allows Agents to communicate within the python application
32    """
33
34    def __init__(self, max_workers=None):
35        super().__init__()
36        self._stop_router_event: threading.Event = threading.Event()
37        self._outbound_message_event: multiprocessing.Event = ResourceManager(max_workers
38        ).multiprocessing_manager.Event()
39        self._router_future: Future = self._start_router_thread()
40
41    def destroy(self):
42        self._stop_router_thread()
43        super().destroy()
44
45    def _start_router_thread(self):
46        def _router_thread():
47            """Routes outbound messages"""
48            log("debug", "LocalSpace: router thread starting")
49            while not self._stop_router_event.is_set():
50                self._outbound_message_event.wait(timeout=0.1)
51                if self._stop_router_event.is_set():
52                    log("debug", "LocalSpace: router thread stopping")
53                    break
54                self._outbound_message_event.clear()
55                # drain each outbound queue
56                processors = list(self.processors.values())
57                for processor in processors:
58                    outbound_queue = processor.outbound_queue
59                    while True:
60                        try:
61                            message = outbound_queue.get(block=False)
62                            log("debug", f"LocalSpace: routing message", message)
63                            recipient_processors = [
64                                processor for processor in processors
65                                if message["to"] == processor.agent_id or message["to"] == "*"
66                            ]
67                            for recipient_processor in recipient_processors:
68                                recipient_processor.inbound_queue.put(message)
69                        except queue.Empty:
70                            break
71            log("debug", "LocalSpace: router thread stopped")
72
73        return ResourceManager().thread_pool_executor.submit(_router_thread)
74
75    def _stop_router_thread(self):
76        self._stop_router_event.set()
77        self._router_future.result()
78
79    def _create_inbound_queue(self, agent_id) -> Queue:
80        return _LocalQueue()
81
82    def _create_outbound_queue(self, agent_id) -> Queue:
83        return _LocalQueue(outbound_message_event=self._outbound_message_event)
class LocalSpace(agency.space.Space):
30class LocalSpace(Space):
31    """
32    A LocalSpace allows Agents to communicate within the python application
33    """
34
35    def __init__(self, max_workers=None):
36        super().__init__()
37        self._stop_router_event: threading.Event = threading.Event()
38        self._outbound_message_event: multiprocessing.Event = ResourceManager(max_workers
39        ).multiprocessing_manager.Event()
40        self._router_future: Future = self._start_router_thread()
41
42    def destroy(self):
43        self._stop_router_thread()
44        super().destroy()
45
46    def _start_router_thread(self):
47        def _router_thread():
48            """Routes outbound messages"""
49            log("debug", "LocalSpace: router thread starting")
50            while not self._stop_router_event.is_set():
51                self._outbound_message_event.wait(timeout=0.1)
52                if self._stop_router_event.is_set():
53                    log("debug", "LocalSpace: router thread stopping")
54                    break
55                self._outbound_message_event.clear()
56                # drain each outbound queue
57                processors = list(self.processors.values())
58                for processor in processors:
59                    outbound_queue = processor.outbound_queue
60                    while True:
61                        try:
62                            message = outbound_queue.get(block=False)
63                            log("debug", f"LocalSpace: routing message", message)
64                            recipient_processors = [
65                                processor for processor in processors
66                                if message["to"] == processor.agent_id or message["to"] == "*"
67                            ]
68                            for recipient_processor in recipient_processors:
69                                recipient_processor.inbound_queue.put(message)
70                        except queue.Empty:
71                            break
72            log("debug", "LocalSpace: router thread stopped")
73
74        return ResourceManager().thread_pool_executor.submit(_router_thread)
75
76    def _stop_router_thread(self):
77        self._stop_router_event.set()
78        self._router_future.result()
79
80    def _create_inbound_queue(self, agent_id) -> Queue:
81        return _LocalQueue()
82
83    def _create_outbound_queue(self, agent_id) -> Queue:
84        return _LocalQueue(outbound_message_event=self._outbound_message_event)

A LocalSpace allows Agents to communicate within the python application

LocalSpace(max_workers=None)
35    def __init__(self, max_workers=None):
36        super().__init__()
37        self._stop_router_event: threading.Event = threading.Event()
38        self._outbound_message_event: multiprocessing.Event = ResourceManager(max_workers
39        ).multiprocessing_manager.Event()
40        self._router_future: Future = self._start_router_thread()
def destroy(self):
42    def destroy(self):
43        self._stop_router_thread()
44        super().destroy()

Cleans up resources used by this space.

Subclasses should call super().destroy() when overriding.