agency.space

  1import threading
  2from abc import ABC, ABCMeta, abstractmethod
  3from concurrent.futures import Executor
  4from typing import Dict, List, Type
  5
  6from agency.agent import Agent
  7from agency.logger import log
  8from agency.processor import Processor, _EventProtocol
  9from agency.queue import Queue
 10from agency.resources import ResourceManager
 11
 12
 13class Space(ABC, metaclass=ABCMeta):
 14    """
 15    A Space is where Agents communicate.
 16    """
 17
 18    def __init__(self):
 19        self.processors: Dict[str, Processor] = {}
 20        self._processors_lock: threading.Lock = threading.Lock()
 21
 22    def __enter__(self):
 23        log("debug", "Entering Space context")
 24        return self
 25
 26    def __exit__(self, exc_type, exc_val, exc_tb):
 27        if exc_type is not None:
 28            log("debug", "Exiting Space context with exception", {
 29                "exc_type": exc_type,
 30                "exc_val": exc_val,
 31                "exc_tb": exc_tb,
 32            })
 33        self.destroy()
 34
 35    def add(self,
 36            agent_type: Type[Agent],
 37            agent_id: str,
 38            *agent_args,
 39            **agent_kwargs):
 40        """
 41        Adds an agent to the space allowing it to communicate.
 42
 43        This method adds the agent in a subprocess. The agent may not be
 44        directly accessed from the main thread.
 45
 46        Args:
 47            agent_type: The type of agent to add
 48            agent_id: The id of the agent to add
 49
 50            All other arguments are passed to the Agent constructor
 51
 52        Raises:
 53            ValueError: If the agent ID is already in use
 54        """
 55        self._add(foreground=False,
 56                  agent_type=agent_type,
 57                  agent_id=agent_id,
 58                  *agent_args,
 59                  **agent_kwargs)
 60
 61    def add_foreground(self,
 62                       agent_type: Type[Agent],
 63                       agent_id: str,
 64                       *agent_args,
 65                       **agent_kwargs) -> Agent:
 66        """
 67        Adds an agent to the space and returns it in the current thread.
 68
 69        This method adds an agent using threading. The agent instance is
 70        returned allowing direct access.
 71
 72        It is recommended to use the `add` method instead of this method in most
 73        cases. Agents added this way may block other agents or threads in the
 74        main process. Use this method when direct access to the agent instance
 75        is desired.
 76
 77        Args:
 78            agent_type: The type of agent to add
 79            agent_id: The id of the agent to add
 80
 81            All other arguments are passed to the Agent constructor
 82
 83        Returns:
 84            The agent
 85
 86        Raises:
 87            ValueError: If the agent ID is already in use
 88        """
 89        agent = self._add(foreground=True,
 90                          agent_type=agent_type,
 91                          agent_id=agent_id,
 92                          *agent_args,
 93                          **agent_kwargs)
 94        return agent
 95
 96    def remove(self, agent_id: str):
 97        """
 98        Removes an agent from the space by id.
 99
100        This method cannot remove an agent added from a different instance. In
101        other words, a Space instance cannot remove an agent that it did not
102        add.
103
104        Args:
105            agent_id: The id of the agent to remove
106
107        Raises:
108            ValueError: If the agent is not present in the space
109        """
110        self._stop_processor(agent_id)
111        log("info", f"{agent_id}: removed from space")
112
113    def destroy(self):
114        """
115        Cleans up resources used by this space.
116
117        Subclasses should call super().destroy() when overriding.
118        """
119        self._stop_all_processors()
120
121    def _add(self,
122             foreground: bool,
123             agent_type: Type[Agent],
124             agent_id: str,
125             *agent_args,
126             **agent_kwargs) -> Agent:
127
128        try:
129            agent = self._start_processor(
130                foreground=foreground,
131                agent_type=agent_type,
132                agent_id=agent_id,
133                agent_args=agent_args,
134                agent_kwargs=agent_kwargs,
135            )
136            log("info", f"{agent_id}: added to space")
137            return agent
138        except:
139            # clean up if an error occurs
140            self.remove(agent_id)
141            raise
142
143    def _start_processor(self,
144                         foreground: bool,
145                         agent_type: Type[Agent],
146                         agent_id: str,
147                         agent_args: List,
148                         agent_kwargs: Dict):
149        with self._processors_lock:
150            # Early existence check. Processor.start() will also check. This is
151            # because Spaces may be distributed.
152            if agent_id in self.processors.keys():
153                raise ValueError(f"Agent '{agent_id}' already exists")
154
155            self.processors[agent_id] = Processor(
156                agent_type=agent_type,
157                agent_id=agent_id,
158                agent_args=agent_args,
159                agent_kwargs=agent_kwargs,
160                inbound_queue=self._create_inbound_queue(agent_id),
161                outbound_queue=self._create_outbound_queue(agent_id),
162                started=self._define_event(foreground=foreground),
163                stopping=self._define_event(foreground=foreground),
164                new_message_event=self._define_event(foreground=foreground),
165                executor=self._get_executor(foreground=foreground),
166            )
167            return self.processors[agent_id].start()
168
169    def _stop_processor_unsafe(self, agent_id: str):
170        self.processors[agent_id].stop()
171        self.processors.pop(agent_id)
172
173    def _stop_processor(self, agent_id: str):
174        with self._processors_lock:
175            self._stop_processor_unsafe(agent_id)
176
177    def _stop_all_processors(self):
178        for agent_id in list(self.processors.keys()):
179            try:
180                with self._processors_lock:
181                    self._stop_processor_unsafe(agent_id)
182            except Exception as e:
183                log("error",
184                    f"{agent_id}: processor failed to stop", e)
185
186    def _get_executor(self, foreground: bool = False) -> Executor:
187        if foreground:
188            return ResourceManager().thread_pool_executor
189        else:
190            return ResourceManager().process_pool_executor
191
192    def _define_event(self, foreground: bool = False) -> _EventProtocol:
193        if foreground:
194            return threading.Event()
195        else:
196            return ResourceManager().multiprocessing_manager.Event()
197
198    @abstractmethod
199    def _create_inbound_queue(self, agent_id) -> Queue:
200        """
201        Returns a Queue suitable for receiving messages
202        """
203        raise NotImplementedError
204
205    @abstractmethod
206    def _create_outbound_queue(self, agent_id) -> Queue:
207        """
208        Returns a Queue suitable for sending messages
209        """
210        raise NotImplementedError
class Space(abc.ABC):
 14class Space(ABC, metaclass=ABCMeta):
 15    """
 16    A Space is where Agents communicate.
 17    """
 18
 19    def __init__(self):
 20        self.processors: Dict[str, Processor] = {}
 21        self._processors_lock: threading.Lock = threading.Lock()
 22
 23    def __enter__(self):
 24        log("debug", "Entering Space context")
 25        return self
 26
 27    def __exit__(self, exc_type, exc_val, exc_tb):
 28        if exc_type is not None:
 29            log("debug", "Exiting Space context with exception", {
 30                "exc_type": exc_type,
 31                "exc_val": exc_val,
 32                "exc_tb": exc_tb,
 33            })
 34        self.destroy()
 35
 36    def add(self,
 37            agent_type: Type[Agent],
 38            agent_id: str,
 39            *agent_args,
 40            **agent_kwargs):
 41        """
 42        Adds an agent to the space allowing it to communicate.
 43
 44        This method adds the agent in a subprocess. The agent may not be
 45        directly accessed from the main thread.
 46
 47        Args:
 48            agent_type: The type of agent to add
 49            agent_id: The id of the agent to add
 50
 51            All other arguments are passed to the Agent constructor
 52
 53        Raises:
 54            ValueError: If the agent ID is already in use
 55        """
 56        self._add(foreground=False,
 57                  agent_type=agent_type,
 58                  agent_id=agent_id,
 59                  *agent_args,
 60                  **agent_kwargs)
 61
 62    def add_foreground(self,
 63                       agent_type: Type[Agent],
 64                       agent_id: str,
 65                       *agent_args,
 66                       **agent_kwargs) -> Agent:
 67        """
 68        Adds an agent to the space and returns it in the current thread.
 69
 70        This method adds an agent using threading. The agent instance is
 71        returned allowing direct access.
 72
 73        It is recommended to use the `add` method instead of this method in most
 74        cases. Agents added this way may block other agents or threads in the
 75        main process. Use this method when direct access to the agent instance
 76        is desired.
 77
 78        Args:
 79            agent_type: The type of agent to add
 80            agent_id: The id of the agent to add
 81
 82            All other arguments are passed to the Agent constructor
 83
 84        Returns:
 85            The agent
 86
 87        Raises:
 88            ValueError: If the agent ID is already in use
 89        """
 90        agent = self._add(foreground=True,
 91                          agent_type=agent_type,
 92                          agent_id=agent_id,
 93                          *agent_args,
 94                          **agent_kwargs)
 95        return agent
 96
 97    def remove(self, agent_id: str):
 98        """
 99        Removes an agent from the space by id.
100
101        This method cannot remove an agent added from a different instance. In
102        other words, a Space instance cannot remove an agent that it did not
103        add.
104
105        Args:
106            agent_id: The id of the agent to remove
107
108        Raises:
109            ValueError: If the agent is not present in the space
110        """
111        self._stop_processor(agent_id)
112        log("info", f"{agent_id}: removed from space")
113
114    def destroy(self):
115        """
116        Cleans up resources used by this space.
117
118        Subclasses should call super().destroy() when overriding.
119        """
120        self._stop_all_processors()
121
122    def _add(self,
123             foreground: bool,
124             agent_type: Type[Agent],
125             agent_id: str,
126             *agent_args,
127             **agent_kwargs) -> Agent:
128
129        try:
130            agent = self._start_processor(
131                foreground=foreground,
132                agent_type=agent_type,
133                agent_id=agent_id,
134                agent_args=agent_args,
135                agent_kwargs=agent_kwargs,
136            )
137            log("info", f"{agent_id}: added to space")
138            return agent
139        except:
140            # clean up if an error occurs
141            self.remove(agent_id)
142            raise
143
144    def _start_processor(self,
145                         foreground: bool,
146                         agent_type: Type[Agent],
147                         agent_id: str,
148                         agent_args: List,
149                         agent_kwargs: Dict):
150        with self._processors_lock:
151            # Early existence check. Processor.start() will also check. This is
152            # because Spaces may be distributed.
153            if agent_id in self.processors.keys():
154                raise ValueError(f"Agent '{agent_id}' already exists")
155
156            self.processors[agent_id] = Processor(
157                agent_type=agent_type,
158                agent_id=agent_id,
159                agent_args=agent_args,
160                agent_kwargs=agent_kwargs,
161                inbound_queue=self._create_inbound_queue(agent_id),
162                outbound_queue=self._create_outbound_queue(agent_id),
163                started=self._define_event(foreground=foreground),
164                stopping=self._define_event(foreground=foreground),
165                new_message_event=self._define_event(foreground=foreground),
166                executor=self._get_executor(foreground=foreground),
167            )
168            return self.processors[agent_id].start()
169
170    def _stop_processor_unsafe(self, agent_id: str):
171        self.processors[agent_id].stop()
172        self.processors.pop(agent_id)
173
174    def _stop_processor(self, agent_id: str):
175        with self._processors_lock:
176            self._stop_processor_unsafe(agent_id)
177
178    def _stop_all_processors(self):
179        for agent_id in list(self.processors.keys()):
180            try:
181                with self._processors_lock:
182                    self._stop_processor_unsafe(agent_id)
183            except Exception as e:
184                log("error",
185                    f"{agent_id}: processor failed to stop", e)
186
187    def _get_executor(self, foreground: bool = False) -> Executor:
188        if foreground:
189            return ResourceManager().thread_pool_executor
190        else:
191            return ResourceManager().process_pool_executor
192
193    def _define_event(self, foreground: bool = False) -> _EventProtocol:
194        if foreground:
195            return threading.Event()
196        else:
197            return ResourceManager().multiprocessing_manager.Event()
198
199    @abstractmethod
200    def _create_inbound_queue(self, agent_id) -> Queue:
201        """
202        Returns a Queue suitable for receiving messages
203        """
204        raise NotImplementedError
205
206    @abstractmethod
207    def _create_outbound_queue(self, agent_id) -> Queue:
208        """
209        Returns a Queue suitable for sending messages
210        """
211        raise NotImplementedError

A Space is where Agents communicate.

processors: Dict[str, agency.processor.Processor]
def add( self, agent_type: Type[agency.agent.Agent], agent_id: str, *agent_args, **agent_kwargs):
36    def add(self,
37            agent_type: Type[Agent],
38            agent_id: str,
39            *agent_args,
40            **agent_kwargs):
41        """
42        Adds an agent to the space allowing it to communicate.
43
44        This method adds the agent in a subprocess. The agent may not be
45        directly accessed from the main thread.
46
47        Args:
48            agent_type: The type of agent to add
49            agent_id: The id of the agent to add
50
51            All other arguments are passed to the Agent constructor
52
53        Raises:
54            ValueError: If the agent ID is already in use
55        """
56        self._add(foreground=False,
57                  agent_type=agent_type,
58                  agent_id=agent_id,
59                  *agent_args,
60                  **agent_kwargs)

Adds an agent to the space allowing it to communicate.

This method adds the agent in a subprocess. The agent may not be directly accessed from the main thread.

Arguments:
  • agent_type: The type of agent to add
  • agent_id: The id of the agent to add
  • All other arguments are passed to the Agent constructor
Raises:
  • ValueError: If the agent ID is already in use
def add_foreground( self, agent_type: Type[agency.agent.Agent], agent_id: str, *agent_args, **agent_kwargs) -> agency.agent.Agent:
62    def add_foreground(self,
63                       agent_type: Type[Agent],
64                       agent_id: str,
65                       *agent_args,
66                       **agent_kwargs) -> Agent:
67        """
68        Adds an agent to the space and returns it in the current thread.
69
70        This method adds an agent using threading. The agent instance is
71        returned allowing direct access.
72
73        It is recommended to use the `add` method instead of this method in most
74        cases. Agents added this way may block other agents or threads in the
75        main process. Use this method when direct access to the agent instance
76        is desired.
77
78        Args:
79            agent_type: The type of agent to add
80            agent_id: The id of the agent to add
81
82            All other arguments are passed to the Agent constructor
83
84        Returns:
85            The agent
86
87        Raises:
88            ValueError: If the agent ID is already in use
89        """
90        agent = self._add(foreground=True,
91                          agent_type=agent_type,
92                          agent_id=agent_id,
93                          *agent_args,
94                          **agent_kwargs)
95        return agent

Adds an agent to the space and returns it in the current thread.

This method adds an agent using threading. The agent instance is returned allowing direct access.

It is recommended to use the add method instead of this method in most cases. Agents added this way may block other agents or threads in the main process. Use this method when direct access to the agent instance is desired.

Arguments:
  • agent_type: The type of agent to add
  • agent_id: The id of the agent to add
  • All other arguments are passed to the Agent constructor
Returns:

The agent

Raises:
  • ValueError: If the agent ID is already in use
def remove(self, agent_id: str):
 97    def remove(self, agent_id: str):
 98        """
 99        Removes an agent from the space by id.
100
101        This method cannot remove an agent added from a different instance. In
102        other words, a Space instance cannot remove an agent that it did not
103        add.
104
105        Args:
106            agent_id: The id of the agent to remove
107
108        Raises:
109            ValueError: If the agent is not present in the space
110        """
111        self._stop_processor(agent_id)
112        log("info", f"{agent_id}: removed from space")

Removes an agent from the space by id.

This method cannot remove an agent added from a different instance. In other words, a Space instance cannot remove an agent that it did not add.

Arguments:
  • agent_id: The id of the agent to remove
Raises:
  • ValueError: If the agent is not present in the space
def destroy(self):
114    def destroy(self):
115        """
116        Cleans up resources used by this space.
117
118        Subclasses should call super().destroy() when overriding.
119        """
120        self._stop_all_processors()

Cleans up resources used by this space.

Subclasses should call super().destroy() when overriding.