agency.space
1import threading 2from abc import ABC, ABCMeta, abstractmethod 3from concurrent.futures import Executor 4from typing import Dict, List, Type 5 6from agency.agent import Agent 7from agency.logger import log 8from agency.processor import Processor, _EventProtocol 9from agency.queue import Queue 10from agency.resources import ResourceManager 11 12 13class Space(ABC, metaclass=ABCMeta): 14 """ 15 A Space is where Agents communicate. 16 """ 17 18 def __init__(self): 19 self.processors: Dict[str, Processor] = {} 20 self._processors_lock: threading.Lock = threading.Lock() 21 22 def __enter__(self): 23 log("debug", "Entering Space context") 24 return self 25 26 def __exit__(self, exc_type, exc_val, exc_tb): 27 if exc_type is not None: 28 log("debug", "Exiting Space context with exception", { 29 "exc_type": exc_type, 30 "exc_val": exc_val, 31 "exc_tb": exc_tb, 32 }) 33 self.destroy() 34 35 def add(self, 36 agent_type: Type[Agent], 37 agent_id: str, 38 *agent_args, 39 **agent_kwargs): 40 """ 41 Adds an agent to the space allowing it to communicate. 42 43 This method adds the agent in a subprocess. The agent may not be 44 directly accessed from the main thread. 45 46 Args: 47 agent_type: The type of agent to add 48 agent_id: The id of the agent to add 49 50 All other arguments are passed to the Agent constructor 51 52 Raises: 53 ValueError: If the agent ID is already in use 54 """ 55 self._add(foreground=False, 56 agent_type=agent_type, 57 agent_id=agent_id, 58 *agent_args, 59 **agent_kwargs) 60 61 def add_foreground(self, 62 agent_type: Type[Agent], 63 agent_id: str, 64 *agent_args, 65 **agent_kwargs) -> Agent: 66 """ 67 Adds an agent to the space and returns it in the current thread. 68 69 This method adds an agent using threading. The agent instance is 70 returned allowing direct access. 71 72 It is recommended to use the `add` method instead of this method in most 73 cases. Agents added this way may block other agents or threads in the 74 main process. Use this method when direct access to the agent instance 75 is desired. 76 77 Args: 78 agent_type: The type of agent to add 79 agent_id: The id of the agent to add 80 81 All other arguments are passed to the Agent constructor 82 83 Returns: 84 The agent 85 86 Raises: 87 ValueError: If the agent ID is already in use 88 """ 89 agent = self._add(foreground=True, 90 agent_type=agent_type, 91 agent_id=agent_id, 92 *agent_args, 93 **agent_kwargs) 94 return agent 95 96 def remove(self, agent_id: str): 97 """ 98 Removes an agent from the space by id. 99 100 This method cannot remove an agent added from a different instance. In 101 other words, a Space instance cannot remove an agent that it did not 102 add. 103 104 Args: 105 agent_id: The id of the agent to remove 106 107 Raises: 108 ValueError: If the agent is not present in the space 109 """ 110 self._stop_processor(agent_id) 111 log("info", f"{agent_id}: removed from space") 112 113 def destroy(self): 114 """ 115 Cleans up resources used by this space. 116 117 Subclasses should call super().destroy() when overriding. 118 """ 119 self._stop_all_processors() 120 121 def _add(self, 122 foreground: bool, 123 agent_type: Type[Agent], 124 agent_id: str, 125 *agent_args, 126 **agent_kwargs) -> Agent: 127 128 try: 129 agent = self._start_processor( 130 foreground=foreground, 131 agent_type=agent_type, 132 agent_id=agent_id, 133 agent_args=agent_args, 134 agent_kwargs=agent_kwargs, 135 ) 136 log("info", f"{agent_id}: added to space") 137 return agent 138 except: 139 # clean up if an error occurs 140 self.remove(agent_id) 141 raise 142 143 def _start_processor(self, 144 foreground: bool, 145 agent_type: Type[Agent], 146 agent_id: str, 147 agent_args: List, 148 agent_kwargs: Dict): 149 with self._processors_lock: 150 # Early existence check. Processor.start() will also check. This is 151 # because Spaces may be distributed. 152 if agent_id in self.processors.keys(): 153 raise ValueError(f"Agent '{agent_id}' already exists") 154 155 self.processors[agent_id] = Processor( 156 agent_type=agent_type, 157 agent_id=agent_id, 158 agent_args=agent_args, 159 agent_kwargs=agent_kwargs, 160 inbound_queue=self._create_inbound_queue(agent_id), 161 outbound_queue=self._create_outbound_queue(agent_id), 162 started=self._define_event(foreground=foreground), 163 stopping=self._define_event(foreground=foreground), 164 new_message_event=self._define_event(foreground=foreground), 165 executor=self._get_executor(foreground=foreground), 166 ) 167 return self.processors[agent_id].start() 168 169 def _stop_processor_unsafe(self, agent_id: str): 170 self.processors[agent_id].stop() 171 self.processors.pop(agent_id) 172 173 def _stop_processor(self, agent_id: str): 174 with self._processors_lock: 175 self._stop_processor_unsafe(agent_id) 176 177 def _stop_all_processors(self): 178 for agent_id in list(self.processors.keys()): 179 try: 180 with self._processors_lock: 181 self._stop_processor_unsafe(agent_id) 182 except Exception as e: 183 log("error", 184 f"{agent_id}: processor failed to stop", e) 185 186 def _get_executor(self, foreground: bool = False) -> Executor: 187 if foreground: 188 return ResourceManager().thread_pool_executor 189 else: 190 return ResourceManager().process_pool_executor 191 192 def _define_event(self, foreground: bool = False) -> _EventProtocol: 193 if foreground: 194 return threading.Event() 195 else: 196 return ResourceManager().multiprocessing_manager.Event() 197 198 @abstractmethod 199 def _create_inbound_queue(self, agent_id) -> Queue: 200 """ 201 Returns a Queue suitable for receiving messages 202 """ 203 raise NotImplementedError 204 205 @abstractmethod 206 def _create_outbound_queue(self, agent_id) -> Queue: 207 """ 208 Returns a Queue suitable for sending messages 209 """ 210 raise NotImplementedError
class Space(abc.ABC):
14class Space(ABC, metaclass=ABCMeta): 15 """ 16 A Space is where Agents communicate. 17 """ 18 19 def __init__(self): 20 self.processors: Dict[str, Processor] = {} 21 self._processors_lock: threading.Lock = threading.Lock() 22 23 def __enter__(self): 24 log("debug", "Entering Space context") 25 return self 26 27 def __exit__(self, exc_type, exc_val, exc_tb): 28 if exc_type is not None: 29 log("debug", "Exiting Space context with exception", { 30 "exc_type": exc_type, 31 "exc_val": exc_val, 32 "exc_tb": exc_tb, 33 }) 34 self.destroy() 35 36 def add(self, 37 agent_type: Type[Agent], 38 agent_id: str, 39 *agent_args, 40 **agent_kwargs): 41 """ 42 Adds an agent to the space allowing it to communicate. 43 44 This method adds the agent in a subprocess. The agent may not be 45 directly accessed from the main thread. 46 47 Args: 48 agent_type: The type of agent to add 49 agent_id: The id of the agent to add 50 51 All other arguments are passed to the Agent constructor 52 53 Raises: 54 ValueError: If the agent ID is already in use 55 """ 56 self._add(foreground=False, 57 agent_type=agent_type, 58 agent_id=agent_id, 59 *agent_args, 60 **agent_kwargs) 61 62 def add_foreground(self, 63 agent_type: Type[Agent], 64 agent_id: str, 65 *agent_args, 66 **agent_kwargs) -> Agent: 67 """ 68 Adds an agent to the space and returns it in the current thread. 69 70 This method adds an agent using threading. The agent instance is 71 returned allowing direct access. 72 73 It is recommended to use the `add` method instead of this method in most 74 cases. Agents added this way may block other agents or threads in the 75 main process. Use this method when direct access to the agent instance 76 is desired. 77 78 Args: 79 agent_type: The type of agent to add 80 agent_id: The id of the agent to add 81 82 All other arguments are passed to the Agent constructor 83 84 Returns: 85 The agent 86 87 Raises: 88 ValueError: If the agent ID is already in use 89 """ 90 agent = self._add(foreground=True, 91 agent_type=agent_type, 92 agent_id=agent_id, 93 *agent_args, 94 **agent_kwargs) 95 return agent 96 97 def remove(self, agent_id: str): 98 """ 99 Removes an agent from the space by id. 100 101 This method cannot remove an agent added from a different instance. In 102 other words, a Space instance cannot remove an agent that it did not 103 add. 104 105 Args: 106 agent_id: The id of the agent to remove 107 108 Raises: 109 ValueError: If the agent is not present in the space 110 """ 111 self._stop_processor(agent_id) 112 log("info", f"{agent_id}: removed from space") 113 114 def destroy(self): 115 """ 116 Cleans up resources used by this space. 117 118 Subclasses should call super().destroy() when overriding. 119 """ 120 self._stop_all_processors() 121 122 def _add(self, 123 foreground: bool, 124 agent_type: Type[Agent], 125 agent_id: str, 126 *agent_args, 127 **agent_kwargs) -> Agent: 128 129 try: 130 agent = self._start_processor( 131 foreground=foreground, 132 agent_type=agent_type, 133 agent_id=agent_id, 134 agent_args=agent_args, 135 agent_kwargs=agent_kwargs, 136 ) 137 log("info", f"{agent_id}: added to space") 138 return agent 139 except: 140 # clean up if an error occurs 141 self.remove(agent_id) 142 raise 143 144 def _start_processor(self, 145 foreground: bool, 146 agent_type: Type[Agent], 147 agent_id: str, 148 agent_args: List, 149 agent_kwargs: Dict): 150 with self._processors_lock: 151 # Early existence check. Processor.start() will also check. This is 152 # because Spaces may be distributed. 153 if agent_id in self.processors.keys(): 154 raise ValueError(f"Agent '{agent_id}' already exists") 155 156 self.processors[agent_id] = Processor( 157 agent_type=agent_type, 158 agent_id=agent_id, 159 agent_args=agent_args, 160 agent_kwargs=agent_kwargs, 161 inbound_queue=self._create_inbound_queue(agent_id), 162 outbound_queue=self._create_outbound_queue(agent_id), 163 started=self._define_event(foreground=foreground), 164 stopping=self._define_event(foreground=foreground), 165 new_message_event=self._define_event(foreground=foreground), 166 executor=self._get_executor(foreground=foreground), 167 ) 168 return self.processors[agent_id].start() 169 170 def _stop_processor_unsafe(self, agent_id: str): 171 self.processors[agent_id].stop() 172 self.processors.pop(agent_id) 173 174 def _stop_processor(self, agent_id: str): 175 with self._processors_lock: 176 self._stop_processor_unsafe(agent_id) 177 178 def _stop_all_processors(self): 179 for agent_id in list(self.processors.keys()): 180 try: 181 with self._processors_lock: 182 self._stop_processor_unsafe(agent_id) 183 except Exception as e: 184 log("error", 185 f"{agent_id}: processor failed to stop", e) 186 187 def _get_executor(self, foreground: bool = False) -> Executor: 188 if foreground: 189 return ResourceManager().thread_pool_executor 190 else: 191 return ResourceManager().process_pool_executor 192 193 def _define_event(self, foreground: bool = False) -> _EventProtocol: 194 if foreground: 195 return threading.Event() 196 else: 197 return ResourceManager().multiprocessing_manager.Event() 198 199 @abstractmethod 200 def _create_inbound_queue(self, agent_id) -> Queue: 201 """ 202 Returns a Queue suitable for receiving messages 203 """ 204 raise NotImplementedError 205 206 @abstractmethod 207 def _create_outbound_queue(self, agent_id) -> Queue: 208 """ 209 Returns a Queue suitable for sending messages 210 """ 211 raise NotImplementedError
A Space is where Agents communicate.
processors: Dict[str, agency.processor.Processor]
36 def add(self, 37 agent_type: Type[Agent], 38 agent_id: str, 39 *agent_args, 40 **agent_kwargs): 41 """ 42 Adds an agent to the space allowing it to communicate. 43 44 This method adds the agent in a subprocess. The agent may not be 45 directly accessed from the main thread. 46 47 Args: 48 agent_type: The type of agent to add 49 agent_id: The id of the agent to add 50 51 All other arguments are passed to the Agent constructor 52 53 Raises: 54 ValueError: If the agent ID is already in use 55 """ 56 self._add(foreground=False, 57 agent_type=agent_type, 58 agent_id=agent_id, 59 *agent_args, 60 **agent_kwargs)
Adds an agent to the space allowing it to communicate.
This method adds the agent in a subprocess. The agent may not be directly accessed from the main thread.
Arguments:
- agent_type: The type of agent to add
- agent_id: The id of the agent to add
- All other arguments are passed to the Agent constructor
Raises:
- ValueError: If the agent ID is already in use
def add_foreground( self, agent_type: Type[agency.agent.Agent], agent_id: str, *agent_args, **agent_kwargs) -> agency.agent.Agent:
62 def add_foreground(self, 63 agent_type: Type[Agent], 64 agent_id: str, 65 *agent_args, 66 **agent_kwargs) -> Agent: 67 """ 68 Adds an agent to the space and returns it in the current thread. 69 70 This method adds an agent using threading. The agent instance is 71 returned allowing direct access. 72 73 It is recommended to use the `add` method instead of this method in most 74 cases. Agents added this way may block other agents or threads in the 75 main process. Use this method when direct access to the agent instance 76 is desired. 77 78 Args: 79 agent_type: The type of agent to add 80 agent_id: The id of the agent to add 81 82 All other arguments are passed to the Agent constructor 83 84 Returns: 85 The agent 86 87 Raises: 88 ValueError: If the agent ID is already in use 89 """ 90 agent = self._add(foreground=True, 91 agent_type=agent_type, 92 agent_id=agent_id, 93 *agent_args, 94 **agent_kwargs) 95 return agent
Adds an agent to the space and returns it in the current thread.
This method adds an agent using threading. The agent instance is returned allowing direct access.
It is recommended to use the add
method instead of this method in most cases. Agents added this way may block other agents or threads in the main process. Use this method when direct access to the agent instance is desired.
Arguments:
- agent_type: The type of agent to add
- agent_id: The id of the agent to add
- All other arguments are passed to the Agent constructor
Returns:
The agent
Raises:
- ValueError: If the agent ID is already in use
def remove(self, agent_id: str):
97 def remove(self, agent_id: str): 98 """ 99 Removes an agent from the space by id. 100 101 This method cannot remove an agent added from a different instance. In 102 other words, a Space instance cannot remove an agent that it did not 103 add. 104 105 Args: 106 agent_id: The id of the agent to remove 107 108 Raises: 109 ValueError: If the agent is not present in the space 110 """ 111 self._stop_processor(agent_id) 112 log("info", f"{agent_id}: removed from space")