agency.agent

  1import inspect
  2import re
  3import threading
  4import uuid
  5from typing import Dict, List, Union
  6
  7from docstring_parser import DocstringStyle, parse
  8
  9from agency.logger import log
 10from agency.queue import Queue
 11from agency.resources import ResourceManager
 12from agency.schema import Message
 13
 14
 15def _python_to_json_type_name(python_type_name: str) -> str:
 16    return {
 17        'str': 'string',
 18        'int': 'number',
 19        'float': 'number',
 20        'bool': 'boolean',
 21        'list': 'array',
 22        'dict': 'object'
 23    }[python_type_name]
 24
 25
 26def _generate_help(method: callable) -> dict:
 27    """
 28    Generates a help object from a method's docstring and signature
 29
 30    Args:
 31        method: the method
 32
 33    Returns:
 34        A help object of the form:
 35
 36        {
 37            "description": <description>,
 38            "args": {
 39                "arg_name": {
 40                    "type": <type>,
 41                    "description": <description>
 42                },
 43            }
 44            "returns": {
 45                "type": <type>,
 46                "description": <description>
 47            }
 48        }
 49    """
 50    signature = inspect.signature(method)
 51    parsed_docstring = parse(method.__doc__, DocstringStyle.GOOGLE)
 52
 53    help_object = {}
 54
 55    # description
 56    if parsed_docstring.short_description is not None:
 57        description = parsed_docstring.short_description
 58        if parsed_docstring.long_description is not None:
 59            description += " " + parsed_docstring.long_description
 60        help_object["description"] = re.sub(r"\s+", " ", description).strip()
 61
 62    # args
 63    help_object["args"] = {}
 64    docstring_args = {arg.arg_name: arg for arg in parsed_docstring.params}
 65    arg_names = list(signature.parameters.keys())[1:]  # skip 'self' argument
 66    for arg_name in arg_names:
 67        arg_object = {}
 68
 69        # type
 70        sig_annotation = signature.parameters[arg_name].annotation
 71        if sig_annotation is not None and sig_annotation.__name__ != "_empty":
 72            arg_object["type"] = _python_to_json_type_name(
 73                signature.parameters[arg_name].annotation.__name__)
 74        elif arg_name in docstring_args and docstring_args[arg_name].type_name is not None:
 75            arg_object["type"] = _python_to_json_type_name(
 76                docstring_args[arg_name].type_name)
 77
 78        # description
 79        if arg_name in docstring_args and docstring_args[arg_name].description is not None:
 80            arg_object["description"] = docstring_args[arg_name].description.strip()
 81
 82        help_object["args"][arg_name] = arg_object
 83
 84    # returns
 85    if parsed_docstring.returns is not None:
 86        help_object["returns"] = {}
 87
 88        # type
 89        if signature.return_annotation is not None:
 90            help_object["returns"]["type"] = _python_to_json_type_name(
 91                signature.return_annotation.__name__)
 92        elif parsed_docstring.returns.type_name is not None:
 93            help_object["returns"]["type"] = _python_to_json_type_name(
 94                parsed_docstring.returns.type_name)
 95
 96        # description
 97        if parsed_docstring.returns.description is not None:
 98            help_object["returns"]["description"] = parsed_docstring.returns.description.strip()
 99
100    return help_object
101
102
103# Special action names
104_RESPONSE_ACTION_NAME = "[response]"
105_ERROR_ACTION_NAME = "[error]"
106
107
108# Access policies
109ACCESS_PERMITTED = "ACCESS_PERMITTED"
110ACCESS_DENIED = "ACCESS_DENIED"
111ACCESS_REQUESTED = "ACCESS_REQUESTED"
112
113
114def action(*args, **kwargs):
115    """
116    Declares instance methods as actions making them accessible to other agents.
117
118    Keyword arguments:
119        name: The name of the action. Defaults to the name of the method.
120        help: The help object. Defaults to a generated object.
121        access_policy: The access policy. Defaults to ACCESS_PERMITTED.
122    """
123    def decorator(method):
124        action_name = kwargs.get("name", method.__name__)
125        if action_name == _RESPONSE_ACTION_NAME:
126            raise ValueError(f"action name '{action_name}' is reserved")
127        method.action_properties: dict = {
128            "name": method.__name__,
129            "help": _generate_help(method),
130            "access_policy": ACCESS_PERMITTED,
131            **kwargs,
132        }
133        return method
134
135    if len(args) == 1 and callable(args[0]) and not kwargs:
136        return decorator(args[0])  # The decorator was used without parentheses
137    else:
138        return decorator  # The decorator was used with parentheses
139
140
141class ActionError(Exception):
142    """Raised from the request() method if the action responds with an error"""
143
144
145class Agent():
146    """
147    An Actor that may represent an AI agent, software interface, or human user
148    """
149
150    def __init__(self, id: str, receive_own_broadcasts: bool = True):
151        """
152        Initializes an Agent.
153
154        This constructor is not meant to be called directly. It is invoked by
155        the Space class when adding an agent.
156
157        Subclasses should call super().__init__() in their constructor.
158
159        Args:
160            id: The id of the agent
161            receive_own_broadcasts:
162                Whether the agent will receive its own broadcasts. Defaults to
163                True
164        """
165        if len(id) < 1 or len(id) > 255:
166            raise ValueError("id must be between 1 and 255 characters")
167        if re.match(r"^amq\.", id):
168            raise ValueError("id cannot start with \"amq.\"")
169        if id == "*":
170            raise ValueError("id cannot be \"*\"")
171        self._id: str = id
172        self._receive_own_broadcasts: bool = receive_own_broadcasts
173        # --- non-constructor properties set by Space/Processor ---
174        self._outbound_queue: Queue = None
175        self._is_processing: bool = False
176        # --- non-constructor properties ---
177        self._message_log: List[Message] = []
178        self._message_log_lock: threading.Lock = threading.Lock()
179        self._pending_requests: Dict[str, Union[threading.Event, Message]] = {}
180        self._pending_requests_lock: threading.Lock = threading.Lock()
181        self.__thread_local_current_message = threading.local()
182        self.__thread_local_current_message.value: Message = None
183
184    def id(self) -> str:
185        return self._id
186
187    def send(self, message: dict) -> str:
188        """
189        Sends (out) a message from this agent.
190
191        Args:
192            message: The message
193
194        Returns:
195            The meta.id of the sent message
196
197        Raises:
198            TypeError: If the message is not a dict
199            ValueError: If the message is invalid
200        """
201        if not isinstance(message, dict):
202            raise TypeError("message must be a dict")
203        if "from" in message and message["from"] != self.id():
204            raise ValueError(
205                f"'from' field value '{message['from']}' does not match this agent's id.")
206        message["from"] = self.id()
207        message["meta"] = {
208            "id": uuid.uuid4().__str__(),
209            **message.get("meta", {}),
210        }
211        message = Message(**message).dict(by_alias=True, exclude_unset=True)
212        with self._message_log_lock:
213            log("info", f"{self._id}: sending", message)
214            self._message_log.append(message)
215            self._outbound_queue.put(message)
216        return message["meta"]["id"]
217
218    def request(self, message: dict, timeout: float = 3) -> object:
219        """
220        Synchronously sends a message then waits for and returns the return
221        value of the invoked action.
222
223        This method allows you to call an action synchronously like a function
224        and receive its return value in python. If the action raises an
225        exception an ActionError will be raised containing the error message.
226
227        Args:
228            message: The message to send
229            timeout:
230                The timeout in seconds to wait for the returned value.
231                Defaults to 3 seconds.
232
233        Returns:
234            object: The return value of the action.
235
236        Raises:
237            TimeoutError: If the timeout is reached
238            ActionError: If the action raised an exception
239            RuntimeError:
240                If called before the agent is processing incoming messages
241        """
242        if not self._is_processing:
243            raise RuntimeError(
244                "request() called while agent is not processing incoming messages. Use send() instead.")
245
246        # Send and mark the request as pending
247        with self._pending_requests_lock:
248            request_id = self.send(message)
249            self._pending_requests[request_id] = threading.Event()
250
251        # Wait for response
252        if not self._pending_requests[request_id].wait(timeout=timeout):
253            raise TimeoutError
254
255        with self._pending_requests_lock:
256            response_message = self._pending_requests.pop(request_id)
257
258        # Raise error or return value from response
259        if response_message["action"]["name"] == _ERROR_ACTION_NAME:
260            raise ActionError(response_message["action"]["args"]["error"])
261
262        if response_message["action"]["name"] == _RESPONSE_ACTION_NAME:
263            return response_message["action"]["args"]["value"]
264
265        raise RuntimeError("We should never get here")
266
267    def respond_with(self, value):
268        """
269        Sends a response with the given value.
270
271        Parameters:
272            value (any): The value to be sent in the response message.
273        """
274        self.send({
275            "meta": {
276                "parent_id": self.current_message()["meta"]["id"]
277            },
278            "to": self.current_message()['from'],
279            "action": {
280                "name": _RESPONSE_ACTION_NAME,
281                "args": {
282                    "value": value,
283                }
284            }
285        })
286
287    def raise_with(self, error: Exception):
288        """
289        Sends an error response.
290
291        Args:
292            error (Exception): The error to send.
293        """
294        self.send({
295            "meta": {
296                "parent_id": self.current_message()["meta"]["id"],
297            },
298            "to": self.current_message()['from'],
299            "action": {
300                "name": _ERROR_ACTION_NAME,
301                "args": {
302                    "error": f"{error.__class__.__name__}: {error}"
303                }
304            }
305        })
306
307    def _receive(self, message: dict):
308        """
309        Receives and handles an incoming message.
310
311        Args:
312            message: The incoming message
313        """
314        try:
315            # Ignore own broadcasts if _receive_own_broadcasts is false
316            if not self._receive_own_broadcasts \
317                    and message['from'] == self.id() \
318                    and message['to'] == '*':
319                return
320
321            log("debug", f"{self.id()}: received message", message)
322
323            # Record the received message before handling
324            with self._message_log_lock:
325                self._message_log.append(message)
326
327            # Handle incoming responses
328            # TODO: make serial/fan-out optional
329            if message["action"]["name"] in [_RESPONSE_ACTION_NAME, _ERROR_ACTION_NAME]:
330                parent_id = message["meta"]["parent_id"]
331                if parent_id in self._pending_requests.keys():
332                    # This was a response to a request(). We use a little trick
333                    # here and simply swap out the event that is waiting with
334                    # the message, then set the event. The request() method will
335                    # pick up the response message in the existing thread.
336                    event = self._pending_requests[parent_id]
337                    self._pending_requests[parent_id] = message
338                    event.set()
339                else:
340                    # This was a response to a send()
341                    if message["action"]["name"] == _RESPONSE_ACTION_NAME:
342                        handler_callback = self.handle_action_value
343                        arg = message["action"]["args"]["value"]
344                    elif message["action"]["name"] == _ERROR_ACTION_NAME:
345                        handler_callback = self.handle_action_error
346                        arg = ActionError(message["action"]["args"]["error"])
347                    else:
348                        raise RuntimeError("Unknown action response")
349
350                    # Spawn a thread to handle the response
351                    def __process_response(arg, current_message):
352                        threading.current_thread(
353                        ).name = f"{self.id()}: __process_response {current_message['meta']['id']}"
354                        self.__thread_local_current_message.value = current_message
355                        handler_callback(arg)
356
357                    ResourceManager().thread_pool_executor.submit(
358                        __process_response,
359                        arg,
360                        message,
361                    )
362
363            # Handle all other messages
364            else:
365                # Spawn a thread to process the message. This means that messages
366                # are processed concurrently, but may be processed out of order.
367                ResourceManager().thread_pool_executor.submit(
368                    self.__process,
369                    message,
370                )
371        except Exception as e:
372            log("error", f"{self.id()}: raised exception in _receive", e)
373
374    def __process(self, message: dict):
375        """
376        Top level method within the action processing thread.
377        """
378        threading.current_thread(
379        ).name = f"{self.id()}: __process {message['meta']['id']}"
380        self.__thread_local_current_message.value = message
381        try:
382            log("debug", f"{self.id()}: committing action", message)
383            self.__commit(message)
384        except Exception as e:
385            # Handle errors (including PermissionError) that occur while
386            # committing an action by reporting back to the sender.
387            log("warning",
388                f"{self.id()}: raised exception while committing action '{message['action']['name']}'", e)
389            self.raise_with(e)
390
391    def __commit(self, message: dict):
392        """
393        Invokes the action method
394
395        Args:
396            message: The incoming message specifying the action
397
398        Raises:
399            AttributeError: If the action method is not found
400            PermissionError: If the action is not permitted
401        """
402        try:
403            # Check if the action method exists
404            action_method = self.__action_method(message["action"]["name"])
405        except KeyError:
406            # the action was not found
407            if message['to'] == '*':
408                return  # broadcasts will not raise an error in this situation
409            else:
410                raise AttributeError(
411                    f"\"{message['action']['name']}\" not found on \"{self.id()}\"")
412
413        # Check if the action is permitted
414        if not self.__permitted(message):
415            raise PermissionError(
416                f"\"{self.id()}.{message['action']['name']}\" not permitted")
417
418        self.before_action(message)
419
420        return_value = None
421        error = None
422        try:
423            # Invoke the action method
424            return_value = action_method(**message['action'].get('args', {}))
425        except Exception as e:
426            error = e
427            raise
428        finally:
429            self.after_action(message, return_value, error)
430        return return_value
431
432    def __permitted(self, message: dict) -> bool:
433        """
434        Checks whether the message's action is allowed
435        """
436        action_method = self.__action_method(message['action']['name'])
437        policy = action_method.action_properties["access_policy"]
438        if policy == ACCESS_PERMITTED:
439            return True
440        elif policy == ACCESS_DENIED:
441            return False
442        elif policy == ACCESS_REQUESTED:
443            return self.request_permission(message)
444        else:
445            raise Exception(
446              f"Invalid access policy for method: {message['action']}, got '{policy}'")
447
448    def __action_methods(self) -> dict:
449        instance_methods = inspect.getmembers(self, inspect.ismethod)
450        action_methods = {
451            method_name: method
452            for method_name, method in instance_methods
453            if hasattr(method, "action_properties")
454        }
455        return action_methods
456
457    def __action_method(self, action_name: str):
458        """
459        Returns the method for the given action name.
460        """
461        action_methods = self.__action_methods()
462        return action_methods[action_name]
463
464    def _find_message(self, message_id: str) -> Message:
465        """
466        Returns a message from the log with the given ID.
467
468        Args:
469            message_id: The ID of the message
470
471        Returns:
472            The message or None if not found
473        """
474        for message in self._message_log:
475            if message["meta"]["id"] == message_id:
476                return message
477
478    def current_message(self) -> Message:
479        """
480        Returns the full incoming message which invoked the current action.
481
482        This method may be called within an action or action related callback to
483        retrieve the current message, for example to determine the sender or
484        inspect other details.
485
486        Returns:
487            The current message
488        """
489        return self.__thread_local_current_message.value
490
491    def parent_message(self, message: Message = None) -> Message:
492        """
493        Returns the message that the given message is responding to, if any.
494
495        This method may be used within the handle_action_value and
496        handle_action_error callbacks.
497
498        Args:
499            message: The message to get the parent message of. Defaults to the
500            current message.
501
502        Returns:
503            The parent message or None
504        """
505        if message is None:
506            message = self.current_message()
507        parent_id = message["meta"].get("parent_id", None)
508        if parent_id is not None:
509            return self._find_message(parent_id)
510
511    @action
512    def help(self, action_name: str = None) -> dict:
513        """
514        Returns a list of actions on this agent.
515
516        If action_name is passed, returns a list with only that action.
517        If no action_name is passed, returns all actions.
518
519        Args:
520            action_name: (Optional) The name of an action to request help for
521
522        Returns:
523            A dictionary of actions
524        """
525        self.respond_with(self._help(action_name))
526
527    def _help(self, action_name: str = None) -> dict:
528        """
529        Generates the help information returned by the help() action
530        """
531        special_actions = ["help", _RESPONSE_ACTION_NAME, _ERROR_ACTION_NAME]
532        help_list = {
533            method.action_properties["name"]: method.action_properties["help"]
534            for method in self.__action_methods().values()
535            if action_name is None
536            and method.action_properties["name"] not in special_actions
537            or method.action_properties["name"] == action_name
538        }
539        return help_list
540
541    def handle_action_value(self, value):
542        """
543        Receives a return value from a previous action.
544
545        This method receives return values from actions invoked by the send()
546        method. It is not called when using the request() method, which returns
547        the value directly.
548
549        To inspect the full response message carrying this value, use
550        current_message(). To inspect the message which returned the value, use
551        parent_message().
552
553        Args:
554            value:
555                The return value
556        """
557        if not hasattr(self, "_issued_handle_action_value_warning"):
558            self._issued_handle_action_value_warning = True
559            log("warning",
560                f"A value was returned from an action. Implement {self.__class__.__name__}.handle_action_value() to handle it.")
561
562    def handle_action_error(self, error: ActionError):
563        """
564        Receives an error from a previous action.
565
566        This method receives errors from actions invoked by the send() method.
567        It is not called when using the request() method, which raises an error
568        directly.
569
570        To inspect the full response message carrying this error, use
571        current_message(). To inspect the message which caused the error, use
572        parent_message().
573
574        Args:
575            error: The error
576        """
577        if not hasattr(self, "_issued_handle_action_error_warning"):
578            self._issued_handle_action_error_warning = True
579            log("warning",
580                f"An error was raised from an action. Implement {self.__class__.__name__}.handle_action_error() to handle it.")
581
582    def after_add(self):
583        """
584        Called after the agent is added to a space, but before it begins
585        processing incoming messages.
586
587        The agent may send messages during this callback using the send()
588        method, but may not use the request() method since it relies on
589        processing incoming messages.
590        """
591
592    def before_remove(self):
593        """
594        Called before the agent is removed from a space, after it has finished
595        processing incoming messages.
596
597        The agent may send final messages during this callback using the send()
598        method, but may not use the request() method since it relies on
599        processing incoming messages.
600        """
601
602    def before_action(self, message: dict):
603        """
604        Called before every action.
605
606        This method will only be called if the action exists and is permitted.
607
608        Args:
609            message: The received message that contains the action
610        """
611
612    def after_action(self, message: dict, return_value: str, error: str):
613        """
614        Called after every action, regardless of whether an error occurred.
615
616        Args:
617            message: The message which invoked the action
618            return_value: The return value from the action
619            error: The error from the action if any
620        """
621
622    def request_permission(self, proposed_message: dict) -> bool:
623        """
624        Receives a proposed action message and presents it to the agent for
625        review.
626
627        Args:
628            proposed_message: The proposed action message
629
630        Returns:
631            True if access should be permitted
632        """
633        raise NotImplementedError(
634            f"You must implement {self.__class__.__name__}.request_permission() to use ACCESS_REQUESTED")
ACCESS_PERMITTED = 'ACCESS_PERMITTED'
ACCESS_DENIED = 'ACCESS_DENIED'
ACCESS_REQUESTED = 'ACCESS_REQUESTED'
def action(*args, **kwargs):
115def action(*args, **kwargs):
116    """
117    Declares instance methods as actions making them accessible to other agents.
118
119    Keyword arguments:
120        name: The name of the action. Defaults to the name of the method.
121        help: The help object. Defaults to a generated object.
122        access_policy: The access policy. Defaults to ACCESS_PERMITTED.
123    """
124    def decorator(method):
125        action_name = kwargs.get("name", method.__name__)
126        if action_name == _RESPONSE_ACTION_NAME:
127            raise ValueError(f"action name '{action_name}' is reserved")
128        method.action_properties: dict = {
129            "name": method.__name__,
130            "help": _generate_help(method),
131            "access_policy": ACCESS_PERMITTED,
132            **kwargs,
133        }
134        return method
135
136    if len(args) == 1 and callable(args[0]) and not kwargs:
137        return decorator(args[0])  # The decorator was used without parentheses
138    else:
139        return decorator  # The decorator was used with parentheses

Declares instance methods as actions making them accessible to other agents.

Keyword arguments:

name: The name of the action. Defaults to the name of the method. help: The help object. Defaults to a generated object. access_policy: The access policy. Defaults to ACCESS_PERMITTED.

class ActionError(builtins.Exception):
142class ActionError(Exception):
143    """Raised from the request() method if the action responds with an error"""

Raised from the request() method if the action responds with an error

Inherited Members
builtins.Exception
Exception
builtins.BaseException
with_traceback
args
class Agent:
146class Agent():
147    """
148    An Actor that may represent an AI agent, software interface, or human user
149    """
150
151    def __init__(self, id: str, receive_own_broadcasts: bool = True):
152        """
153        Initializes an Agent.
154
155        This constructor is not meant to be called directly. It is invoked by
156        the Space class when adding an agent.
157
158        Subclasses should call super().__init__() in their constructor.
159
160        Args:
161            id: The id of the agent
162            receive_own_broadcasts:
163                Whether the agent will receive its own broadcasts. Defaults to
164                True
165        """
166        if len(id) < 1 or len(id) > 255:
167            raise ValueError("id must be between 1 and 255 characters")
168        if re.match(r"^amq\.", id):
169            raise ValueError("id cannot start with \"amq.\"")
170        if id == "*":
171            raise ValueError("id cannot be \"*\"")
172        self._id: str = id
173        self._receive_own_broadcasts: bool = receive_own_broadcasts
174        # --- non-constructor properties set by Space/Processor ---
175        self._outbound_queue: Queue = None
176        self._is_processing: bool = False
177        # --- non-constructor properties ---
178        self._message_log: List[Message] = []
179        self._message_log_lock: threading.Lock = threading.Lock()
180        self._pending_requests: Dict[str, Union[threading.Event, Message]] = {}
181        self._pending_requests_lock: threading.Lock = threading.Lock()
182        self.__thread_local_current_message = threading.local()
183        self.__thread_local_current_message.value: Message = None
184
185    def id(self) -> str:
186        return self._id
187
188    def send(self, message: dict) -> str:
189        """
190        Sends (out) a message from this agent.
191
192        Args:
193            message: The message
194
195        Returns:
196            The meta.id of the sent message
197
198        Raises:
199            TypeError: If the message is not a dict
200            ValueError: If the message is invalid
201        """
202        if not isinstance(message, dict):
203            raise TypeError("message must be a dict")
204        if "from" in message and message["from"] != self.id():
205            raise ValueError(
206                f"'from' field value '{message['from']}' does not match this agent's id.")
207        message["from"] = self.id()
208        message["meta"] = {
209            "id": uuid.uuid4().__str__(),
210            **message.get("meta", {}),
211        }
212        message = Message(**message).dict(by_alias=True, exclude_unset=True)
213        with self._message_log_lock:
214            log("info", f"{self._id}: sending", message)
215            self._message_log.append(message)
216            self._outbound_queue.put(message)
217        return message["meta"]["id"]
218
219    def request(self, message: dict, timeout: float = 3) -> object:
220        """
221        Synchronously sends a message then waits for and returns the return
222        value of the invoked action.
223
224        This method allows you to call an action synchronously like a function
225        and receive its return value in python. If the action raises an
226        exception an ActionError will be raised containing the error message.
227
228        Args:
229            message: The message to send
230            timeout:
231                The timeout in seconds to wait for the returned value.
232                Defaults to 3 seconds.
233
234        Returns:
235            object: The return value of the action.
236
237        Raises:
238            TimeoutError: If the timeout is reached
239            ActionError: If the action raised an exception
240            RuntimeError:
241                If called before the agent is processing incoming messages
242        """
243        if not self._is_processing:
244            raise RuntimeError(
245                "request() called while agent is not processing incoming messages. Use send() instead.")
246
247        # Send and mark the request as pending
248        with self._pending_requests_lock:
249            request_id = self.send(message)
250            self._pending_requests[request_id] = threading.Event()
251
252        # Wait for response
253        if not self._pending_requests[request_id].wait(timeout=timeout):
254            raise TimeoutError
255
256        with self._pending_requests_lock:
257            response_message = self._pending_requests.pop(request_id)
258
259        # Raise error or return value from response
260        if response_message["action"]["name"] == _ERROR_ACTION_NAME:
261            raise ActionError(response_message["action"]["args"]["error"])
262
263        if response_message["action"]["name"] == _RESPONSE_ACTION_NAME:
264            return response_message["action"]["args"]["value"]
265
266        raise RuntimeError("We should never get here")
267
268    def respond_with(self, value):
269        """
270        Sends a response with the given value.
271
272        Parameters:
273            value (any): The value to be sent in the response message.
274        """
275        self.send({
276            "meta": {
277                "parent_id": self.current_message()["meta"]["id"]
278            },
279            "to": self.current_message()['from'],
280            "action": {
281                "name": _RESPONSE_ACTION_NAME,
282                "args": {
283                    "value": value,
284                }
285            }
286        })
287
288    def raise_with(self, error: Exception):
289        """
290        Sends an error response.
291
292        Args:
293            error (Exception): The error to send.
294        """
295        self.send({
296            "meta": {
297                "parent_id": self.current_message()["meta"]["id"],
298            },
299            "to": self.current_message()['from'],
300            "action": {
301                "name": _ERROR_ACTION_NAME,
302                "args": {
303                    "error": f"{error.__class__.__name__}: {error}"
304                }
305            }
306        })
307
308    def _receive(self, message: dict):
309        """
310        Receives and handles an incoming message.
311
312        Args:
313            message: The incoming message
314        """
315        try:
316            # Ignore own broadcasts if _receive_own_broadcasts is false
317            if not self._receive_own_broadcasts \
318                    and message['from'] == self.id() \
319                    and message['to'] == '*':
320                return
321
322            log("debug", f"{self.id()}: received message", message)
323
324            # Record the received message before handling
325            with self._message_log_lock:
326                self._message_log.append(message)
327
328            # Handle incoming responses
329            # TODO: make serial/fan-out optional
330            if message["action"]["name"] in [_RESPONSE_ACTION_NAME, _ERROR_ACTION_NAME]:
331                parent_id = message["meta"]["parent_id"]
332                if parent_id in self._pending_requests.keys():
333                    # This was a response to a request(). We use a little trick
334                    # here and simply swap out the event that is waiting with
335                    # the message, then set the event. The request() method will
336                    # pick up the response message in the existing thread.
337                    event = self._pending_requests[parent_id]
338                    self._pending_requests[parent_id] = message
339                    event.set()
340                else:
341                    # This was a response to a send()
342                    if message["action"]["name"] == _RESPONSE_ACTION_NAME:
343                        handler_callback = self.handle_action_value
344                        arg = message["action"]["args"]["value"]
345                    elif message["action"]["name"] == _ERROR_ACTION_NAME:
346                        handler_callback = self.handle_action_error
347                        arg = ActionError(message["action"]["args"]["error"])
348                    else:
349                        raise RuntimeError("Unknown action response")
350
351                    # Spawn a thread to handle the response
352                    def __process_response(arg, current_message):
353                        threading.current_thread(
354                        ).name = f"{self.id()}: __process_response {current_message['meta']['id']}"
355                        self.__thread_local_current_message.value = current_message
356                        handler_callback(arg)
357
358                    ResourceManager().thread_pool_executor.submit(
359                        __process_response,
360                        arg,
361                        message,
362                    )
363
364            # Handle all other messages
365            else:
366                # Spawn a thread to process the message. This means that messages
367                # are processed concurrently, but may be processed out of order.
368                ResourceManager().thread_pool_executor.submit(
369                    self.__process,
370                    message,
371                )
372        except Exception as e:
373            log("error", f"{self.id()}: raised exception in _receive", e)
374
375    def __process(self, message: dict):
376        """
377        Top level method within the action processing thread.
378        """
379        threading.current_thread(
380        ).name = f"{self.id()}: __process {message['meta']['id']}"
381        self.__thread_local_current_message.value = message
382        try:
383            log("debug", f"{self.id()}: committing action", message)
384            self.__commit(message)
385        except Exception as e:
386            # Handle errors (including PermissionError) that occur while
387            # committing an action by reporting back to the sender.
388            log("warning",
389                f"{self.id()}: raised exception while committing action '{message['action']['name']}'", e)
390            self.raise_with(e)
391
392    def __commit(self, message: dict):
393        """
394        Invokes the action method
395
396        Args:
397            message: The incoming message specifying the action
398
399        Raises:
400            AttributeError: If the action method is not found
401            PermissionError: If the action is not permitted
402        """
403        try:
404            # Check if the action method exists
405            action_method = self.__action_method(message["action"]["name"])
406        except KeyError:
407            # the action was not found
408            if message['to'] == '*':
409                return  # broadcasts will not raise an error in this situation
410            else:
411                raise AttributeError(
412                    f"\"{message['action']['name']}\" not found on \"{self.id()}\"")
413
414        # Check if the action is permitted
415        if not self.__permitted(message):
416            raise PermissionError(
417                f"\"{self.id()}.{message['action']['name']}\" not permitted")
418
419        self.before_action(message)
420
421        return_value = None
422        error = None
423        try:
424            # Invoke the action method
425            return_value = action_method(**message['action'].get('args', {}))
426        except Exception as e:
427            error = e
428            raise
429        finally:
430            self.after_action(message, return_value, error)
431        return return_value
432
433    def __permitted(self, message: dict) -> bool:
434        """
435        Checks whether the message's action is allowed
436        """
437        action_method = self.__action_method(message['action']['name'])
438        policy = action_method.action_properties["access_policy"]
439        if policy == ACCESS_PERMITTED:
440            return True
441        elif policy == ACCESS_DENIED:
442            return False
443        elif policy == ACCESS_REQUESTED:
444            return self.request_permission(message)
445        else:
446            raise Exception(
447              f"Invalid access policy for method: {message['action']}, got '{policy}'")
448
449    def __action_methods(self) -> dict:
450        instance_methods = inspect.getmembers(self, inspect.ismethod)
451        action_methods = {
452            method_name: method
453            for method_name, method in instance_methods
454            if hasattr(method, "action_properties")
455        }
456        return action_methods
457
458    def __action_method(self, action_name: str):
459        """
460        Returns the method for the given action name.
461        """
462        action_methods = self.__action_methods()
463        return action_methods[action_name]
464
465    def _find_message(self, message_id: str) -> Message:
466        """
467        Returns a message from the log with the given ID.
468
469        Args:
470            message_id: The ID of the message
471
472        Returns:
473            The message or None if not found
474        """
475        for message in self._message_log:
476            if message["meta"]["id"] == message_id:
477                return message
478
479    def current_message(self) -> Message:
480        """
481        Returns the full incoming message which invoked the current action.
482
483        This method may be called within an action or action related callback to
484        retrieve the current message, for example to determine the sender or
485        inspect other details.
486
487        Returns:
488            The current message
489        """
490        return self.__thread_local_current_message.value
491
492    def parent_message(self, message: Message = None) -> Message:
493        """
494        Returns the message that the given message is responding to, if any.
495
496        This method may be used within the handle_action_value and
497        handle_action_error callbacks.
498
499        Args:
500            message: The message to get the parent message of. Defaults to the
501            current message.
502
503        Returns:
504            The parent message or None
505        """
506        if message is None:
507            message = self.current_message()
508        parent_id = message["meta"].get("parent_id", None)
509        if parent_id is not None:
510            return self._find_message(parent_id)
511
512    @action
513    def help(self, action_name: str = None) -> dict:
514        """
515        Returns a list of actions on this agent.
516
517        If action_name is passed, returns a list with only that action.
518        If no action_name is passed, returns all actions.
519
520        Args:
521            action_name: (Optional) The name of an action to request help for
522
523        Returns:
524            A dictionary of actions
525        """
526        self.respond_with(self._help(action_name))
527
528    def _help(self, action_name: str = None) -> dict:
529        """
530        Generates the help information returned by the help() action
531        """
532        special_actions = ["help", _RESPONSE_ACTION_NAME, _ERROR_ACTION_NAME]
533        help_list = {
534            method.action_properties["name"]: method.action_properties["help"]
535            for method in self.__action_methods().values()
536            if action_name is None
537            and method.action_properties["name"] not in special_actions
538            or method.action_properties["name"] == action_name
539        }
540        return help_list
541
542    def handle_action_value(self, value):
543        """
544        Receives a return value from a previous action.
545
546        This method receives return values from actions invoked by the send()
547        method. It is not called when using the request() method, which returns
548        the value directly.
549
550        To inspect the full response message carrying this value, use
551        current_message(). To inspect the message which returned the value, use
552        parent_message().
553
554        Args:
555            value:
556                The return value
557        """
558        if not hasattr(self, "_issued_handle_action_value_warning"):
559            self._issued_handle_action_value_warning = True
560            log("warning",
561                f"A value was returned from an action. Implement {self.__class__.__name__}.handle_action_value() to handle it.")
562
563    def handle_action_error(self, error: ActionError):
564        """
565        Receives an error from a previous action.
566
567        This method receives errors from actions invoked by the send() method.
568        It is not called when using the request() method, which raises an error
569        directly.
570
571        To inspect the full response message carrying this error, use
572        current_message(). To inspect the message which caused the error, use
573        parent_message().
574
575        Args:
576            error: The error
577        """
578        if not hasattr(self, "_issued_handle_action_error_warning"):
579            self._issued_handle_action_error_warning = True
580            log("warning",
581                f"An error was raised from an action. Implement {self.__class__.__name__}.handle_action_error() to handle it.")
582
583    def after_add(self):
584        """
585        Called after the agent is added to a space, but before it begins
586        processing incoming messages.
587
588        The agent may send messages during this callback using the send()
589        method, but may not use the request() method since it relies on
590        processing incoming messages.
591        """
592
593    def before_remove(self):
594        """
595        Called before the agent is removed from a space, after it has finished
596        processing incoming messages.
597
598        The agent may send final messages during this callback using the send()
599        method, but may not use the request() method since it relies on
600        processing incoming messages.
601        """
602
603    def before_action(self, message: dict):
604        """
605        Called before every action.
606
607        This method will only be called if the action exists and is permitted.
608
609        Args:
610            message: The received message that contains the action
611        """
612
613    def after_action(self, message: dict, return_value: str, error: str):
614        """
615        Called after every action, regardless of whether an error occurred.
616
617        Args:
618            message: The message which invoked the action
619            return_value: The return value from the action
620            error: The error from the action if any
621        """
622
623    def request_permission(self, proposed_message: dict) -> bool:
624        """
625        Receives a proposed action message and presents it to the agent for
626        review.
627
628        Args:
629            proposed_message: The proposed action message
630
631        Returns:
632            True if access should be permitted
633        """
634        raise NotImplementedError(
635            f"You must implement {self.__class__.__name__}.request_permission() to use ACCESS_REQUESTED")

An Actor that may represent an AI agent, software interface, or human user

Agent(id: str, receive_own_broadcasts: bool = True)
151    def __init__(self, id: str, receive_own_broadcasts: bool = True):
152        """
153        Initializes an Agent.
154
155        This constructor is not meant to be called directly. It is invoked by
156        the Space class when adding an agent.
157
158        Subclasses should call super().__init__() in their constructor.
159
160        Args:
161            id: The id of the agent
162            receive_own_broadcasts:
163                Whether the agent will receive its own broadcasts. Defaults to
164                True
165        """
166        if len(id) < 1 or len(id) > 255:
167            raise ValueError("id must be between 1 and 255 characters")
168        if re.match(r"^amq\.", id):
169            raise ValueError("id cannot start with \"amq.\"")
170        if id == "*":
171            raise ValueError("id cannot be \"*\"")
172        self._id: str = id
173        self._receive_own_broadcasts: bool = receive_own_broadcasts
174        # --- non-constructor properties set by Space/Processor ---
175        self._outbound_queue: Queue = None
176        self._is_processing: bool = False
177        # --- non-constructor properties ---
178        self._message_log: List[Message] = []
179        self._message_log_lock: threading.Lock = threading.Lock()
180        self._pending_requests: Dict[str, Union[threading.Event, Message]] = {}
181        self._pending_requests_lock: threading.Lock = threading.Lock()
182        self.__thread_local_current_message = threading.local()
183        self.__thread_local_current_message.value: Message = None

Initializes an Agent.

This constructor is not meant to be called directly. It is invoked by the Space class when adding an agent.

Subclasses should call super().__init__() in their constructor.

Arguments:
  • id: The id of the agent
  • receive_own_broadcasts: Whether the agent will receive its own broadcasts. Defaults to True
def id(self) -> str:
185    def id(self) -> str:
186        return self._id
def send(self, message: dict) -> str:
188    def send(self, message: dict) -> str:
189        """
190        Sends (out) a message from this agent.
191
192        Args:
193            message: The message
194
195        Returns:
196            The meta.id of the sent message
197
198        Raises:
199            TypeError: If the message is not a dict
200            ValueError: If the message is invalid
201        """
202        if not isinstance(message, dict):
203            raise TypeError("message must be a dict")
204        if "from" in message and message["from"] != self.id():
205            raise ValueError(
206                f"'from' field value '{message['from']}' does not match this agent's id.")
207        message["from"] = self.id()
208        message["meta"] = {
209            "id": uuid.uuid4().__str__(),
210            **message.get("meta", {}),
211        }
212        message = Message(**message).dict(by_alias=True, exclude_unset=True)
213        with self._message_log_lock:
214            log("info", f"{self._id}: sending", message)
215            self._message_log.append(message)
216            self._outbound_queue.put(message)
217        return message["meta"]["id"]

Sends (out) a message from this agent.

Arguments:
  • message: The message
Returns:

The meta.id of the sent message

Raises:
  • TypeError: If the message is not a dict
  • ValueError: If the message is invalid
def request(self, message: dict, timeout: float = 3) -> object:
219    def request(self, message: dict, timeout: float = 3) -> object:
220        """
221        Synchronously sends a message then waits for and returns the return
222        value of the invoked action.
223
224        This method allows you to call an action synchronously like a function
225        and receive its return value in python. If the action raises an
226        exception an ActionError will be raised containing the error message.
227
228        Args:
229            message: The message to send
230            timeout:
231                The timeout in seconds to wait for the returned value.
232                Defaults to 3 seconds.
233
234        Returns:
235            object: The return value of the action.
236
237        Raises:
238            TimeoutError: If the timeout is reached
239            ActionError: If the action raised an exception
240            RuntimeError:
241                If called before the agent is processing incoming messages
242        """
243        if not self._is_processing:
244            raise RuntimeError(
245                "request() called while agent is not processing incoming messages. Use send() instead.")
246
247        # Send and mark the request as pending
248        with self._pending_requests_lock:
249            request_id = self.send(message)
250            self._pending_requests[request_id] = threading.Event()
251
252        # Wait for response
253        if not self._pending_requests[request_id].wait(timeout=timeout):
254            raise TimeoutError
255
256        with self._pending_requests_lock:
257            response_message = self._pending_requests.pop(request_id)
258
259        # Raise error or return value from response
260        if response_message["action"]["name"] == _ERROR_ACTION_NAME:
261            raise ActionError(response_message["action"]["args"]["error"])
262
263        if response_message["action"]["name"] == _RESPONSE_ACTION_NAME:
264            return response_message["action"]["args"]["value"]
265
266        raise RuntimeError("We should never get here")

Synchronously sends a message then waits for and returns the return value of the invoked action.

This method allows you to call an action synchronously like a function and receive its return value in python. If the action raises an exception an ActionError will be raised containing the error message.

Arguments:
  • message: The message to send
  • timeout: The timeout in seconds to wait for the returned value. Defaults to 3 seconds.
Returns:

object: The return value of the action.

Raises:
  • TimeoutError: If the timeout is reached
  • ActionError: If the action raised an exception
  • RuntimeError: If called before the agent is processing incoming messages
def respond_with(self, value):
268    def respond_with(self, value):
269        """
270        Sends a response with the given value.
271
272        Parameters:
273            value (any): The value to be sent in the response message.
274        """
275        self.send({
276            "meta": {
277                "parent_id": self.current_message()["meta"]["id"]
278            },
279            "to": self.current_message()['from'],
280            "action": {
281                "name": _RESPONSE_ACTION_NAME,
282                "args": {
283                    "value": value,
284                }
285            }
286        })

Sends a response with the given value.

Arguments:
  • value (any): The value to be sent in the response message.
def raise_with(self, error: Exception):
288    def raise_with(self, error: Exception):
289        """
290        Sends an error response.
291
292        Args:
293            error (Exception): The error to send.
294        """
295        self.send({
296            "meta": {
297                "parent_id": self.current_message()["meta"]["id"],
298            },
299            "to": self.current_message()['from'],
300            "action": {
301                "name": _ERROR_ACTION_NAME,
302                "args": {
303                    "error": f"{error.__class__.__name__}: {error}"
304                }
305            }
306        })

Sends an error response.

Arguments:
  • error (Exception): The error to send.
def current_message(self) -> agency.schema.Message:
479    def current_message(self) -> Message:
480        """
481        Returns the full incoming message which invoked the current action.
482
483        This method may be called within an action or action related callback to
484        retrieve the current message, for example to determine the sender or
485        inspect other details.
486
487        Returns:
488            The current message
489        """
490        return self.__thread_local_current_message.value

Returns the full incoming message which invoked the current action.

This method may be called within an action or action related callback to retrieve the current message, for example to determine the sender or inspect other details.

Returns:

The current message

def parent_message(self, message: agency.schema.Message = None) -> agency.schema.Message:
492    def parent_message(self, message: Message = None) -> Message:
493        """
494        Returns the message that the given message is responding to, if any.
495
496        This method may be used within the handle_action_value and
497        handle_action_error callbacks.
498
499        Args:
500            message: The message to get the parent message of. Defaults to the
501            current message.
502
503        Returns:
504            The parent message or None
505        """
506        if message is None:
507            message = self.current_message()
508        parent_id = message["meta"].get("parent_id", None)
509        if parent_id is not None:
510            return self._find_message(parent_id)

Returns the message that the given message is responding to, if any.

This method may be used within the handle_action_value and handle_action_error callbacks.

Arguments:
  • message: The message to get the parent message of. Defaults to the
  • current message.
Returns:

The parent message or None

@action
def help(self, action_name: str = None) -> dict:
512    @action
513    def help(self, action_name: str = None) -> dict:
514        """
515        Returns a list of actions on this agent.
516
517        If action_name is passed, returns a list with only that action.
518        If no action_name is passed, returns all actions.
519
520        Args:
521            action_name: (Optional) The name of an action to request help for
522
523        Returns:
524            A dictionary of actions
525        """
526        self.respond_with(self._help(action_name))

Returns a list of actions on this agent.

If action_name is passed, returns a list with only that action. If no action_name is passed, returns all actions.

Arguments:
  • action_name: (Optional) The name of an action to request help for
Returns:

A dictionary of actions

def handle_action_value(self, value):
542    def handle_action_value(self, value):
543        """
544        Receives a return value from a previous action.
545
546        This method receives return values from actions invoked by the send()
547        method. It is not called when using the request() method, which returns
548        the value directly.
549
550        To inspect the full response message carrying this value, use
551        current_message(). To inspect the message which returned the value, use
552        parent_message().
553
554        Args:
555            value:
556                The return value
557        """
558        if not hasattr(self, "_issued_handle_action_value_warning"):
559            self._issued_handle_action_value_warning = True
560            log("warning",
561                f"A value was returned from an action. Implement {self.__class__.__name__}.handle_action_value() to handle it.")

Receives a return value from a previous action.

This method receives return values from actions invoked by the send() method. It is not called when using the request() method, which returns the value directly.

To inspect the full response message carrying this value, use current_message(). To inspect the message which returned the value, use parent_message().

Arguments:
  • value: The return value
def handle_action_error(self, error: ActionError):
563    def handle_action_error(self, error: ActionError):
564        """
565        Receives an error from a previous action.
566
567        This method receives errors from actions invoked by the send() method.
568        It is not called when using the request() method, which raises an error
569        directly.
570
571        To inspect the full response message carrying this error, use
572        current_message(). To inspect the message which caused the error, use
573        parent_message().
574
575        Args:
576            error: The error
577        """
578        if not hasattr(self, "_issued_handle_action_error_warning"):
579            self._issued_handle_action_error_warning = True
580            log("warning",
581                f"An error was raised from an action. Implement {self.__class__.__name__}.handle_action_error() to handle it.")

Receives an error from a previous action.

This method receives errors from actions invoked by the send() method. It is not called when using the request() method, which raises an error directly.

To inspect the full response message carrying this error, use current_message(). To inspect the message which caused the error, use parent_message().

Arguments:
  • error: The error
def after_add(self):
583    def after_add(self):
584        """
585        Called after the agent is added to a space, but before it begins
586        processing incoming messages.
587
588        The agent may send messages during this callback using the send()
589        method, but may not use the request() method since it relies on
590        processing incoming messages.
591        """

Called after the agent is added to a space, but before it begins processing incoming messages.

The agent may send messages during this callback using the send() method, but may not use the request() method since it relies on processing incoming messages.

def before_remove(self):
593    def before_remove(self):
594        """
595        Called before the agent is removed from a space, after it has finished
596        processing incoming messages.
597
598        The agent may send final messages during this callback using the send()
599        method, but may not use the request() method since it relies on
600        processing incoming messages.
601        """

Called before the agent is removed from a space, after it has finished processing incoming messages.

The agent may send final messages during this callback using the send() method, but may not use the request() method since it relies on processing incoming messages.

def before_action(self, message: dict):
603    def before_action(self, message: dict):
604        """
605        Called before every action.
606
607        This method will only be called if the action exists and is permitted.
608
609        Args:
610            message: The received message that contains the action
611        """

Called before every action.

This method will only be called if the action exists and is permitted.

Arguments:
  • message: The received message that contains the action
def after_action(self, message: dict, return_value: str, error: str):
613    def after_action(self, message: dict, return_value: str, error: str):
614        """
615        Called after every action, regardless of whether an error occurred.
616
617        Args:
618            message: The message which invoked the action
619            return_value: The return value from the action
620            error: The error from the action if any
621        """

Called after every action, regardless of whether an error occurred.

Arguments:
  • message: The message which invoked the action
  • return_value: The return value from the action
  • error: The error from the action if any
def request_permission(self, proposed_message: dict) -> bool:
623    def request_permission(self, proposed_message: dict) -> bool:
624        """
625        Receives a proposed action message and presents it to the agent for
626        review.
627
628        Args:
629            proposed_message: The proposed action message
630
631        Returns:
632            True if access should be permitted
633        """
634        raise NotImplementedError(
635            f"You must implement {self.__class__.__name__}.request_permission() to use ACCESS_REQUESTED")

Receives a proposed action message and presents it to the agent for review.

Arguments:
  • proposed_message: The proposed action message
Returns:

True if access should be permitted