agency.agent
1import inspect 2import re 3import threading 4import uuid 5from typing import Dict, List, Union 6 7from docstring_parser import DocstringStyle, parse 8 9from agency.logger import log 10from agency.queue import Queue 11from agency.resources import ResourceManager 12from agency.schema import Message 13 14 15def _python_to_json_type_name(python_type_name: str) -> str: 16 return { 17 'str': 'string', 18 'int': 'number', 19 'float': 'number', 20 'bool': 'boolean', 21 'list': 'array', 22 'dict': 'object' 23 }[python_type_name] 24 25 26def _generate_help(method: callable) -> dict: 27 """ 28 Generates a help object from a method's docstring and signature 29 30 Args: 31 method: the method 32 33 Returns: 34 A help object of the form: 35 36 { 37 "description": <description>, 38 "args": { 39 "arg_name": { 40 "type": <type>, 41 "description": <description> 42 }, 43 } 44 "returns": { 45 "type": <type>, 46 "description": <description> 47 } 48 } 49 """ 50 signature = inspect.signature(method) 51 parsed_docstring = parse(method.__doc__, DocstringStyle.GOOGLE) 52 53 help_object = {} 54 55 # description 56 if parsed_docstring.short_description is not None: 57 description = parsed_docstring.short_description 58 if parsed_docstring.long_description is not None: 59 description += " " + parsed_docstring.long_description 60 help_object["description"] = re.sub(r"\s+", " ", description).strip() 61 62 # args 63 help_object["args"] = {} 64 docstring_args = {arg.arg_name: arg for arg in parsed_docstring.params} 65 arg_names = list(signature.parameters.keys())[1:] # skip 'self' argument 66 for arg_name in arg_names: 67 arg_object = {} 68 69 # type 70 sig_annotation = signature.parameters[arg_name].annotation 71 if sig_annotation is not None and sig_annotation.__name__ != "_empty": 72 arg_object["type"] = _python_to_json_type_name( 73 signature.parameters[arg_name].annotation.__name__) 74 elif arg_name in docstring_args and docstring_args[arg_name].type_name is not None: 75 arg_object["type"] = _python_to_json_type_name( 76 docstring_args[arg_name].type_name) 77 78 # description 79 if arg_name in docstring_args and docstring_args[arg_name].description is not None: 80 arg_object["description"] = docstring_args[arg_name].description.strip() 81 82 help_object["args"][arg_name] = arg_object 83 84 # returns 85 if parsed_docstring.returns is not None: 86 help_object["returns"] = {} 87 88 # type 89 if signature.return_annotation is not None: 90 help_object["returns"]["type"] = _python_to_json_type_name( 91 signature.return_annotation.__name__) 92 elif parsed_docstring.returns.type_name is not None: 93 help_object["returns"]["type"] = _python_to_json_type_name( 94 parsed_docstring.returns.type_name) 95 96 # description 97 if parsed_docstring.returns.description is not None: 98 help_object["returns"]["description"] = parsed_docstring.returns.description.strip() 99 100 return help_object 101 102 103# Special action names 104_RESPONSE_ACTION_NAME = "[response]" 105_ERROR_ACTION_NAME = "[error]" 106 107 108# Access policies 109ACCESS_PERMITTED = "ACCESS_PERMITTED" 110ACCESS_DENIED = "ACCESS_DENIED" 111ACCESS_REQUESTED = "ACCESS_REQUESTED" 112 113 114def action(*args, **kwargs): 115 """ 116 Declares instance methods as actions making them accessible to other agents. 117 118 Keyword arguments: 119 name: The name of the action. Defaults to the name of the method. 120 help: The help object. Defaults to a generated object. 121 access_policy: The access policy. Defaults to ACCESS_PERMITTED. 122 """ 123 def decorator(method): 124 action_name = kwargs.get("name", method.__name__) 125 if action_name == _RESPONSE_ACTION_NAME: 126 raise ValueError(f"action name '{action_name}' is reserved") 127 method.action_properties: dict = { 128 "name": method.__name__, 129 "help": _generate_help(method), 130 "access_policy": ACCESS_PERMITTED, 131 **kwargs, 132 } 133 return method 134 135 if len(args) == 1 and callable(args[0]) and not kwargs: 136 return decorator(args[0]) # The decorator was used without parentheses 137 else: 138 return decorator # The decorator was used with parentheses 139 140 141class ActionError(Exception): 142 """Raised from the request() method if the action responds with an error""" 143 144 145class Agent(): 146 """ 147 An Actor that may represent an AI agent, software interface, or human user 148 """ 149 150 def __init__(self, id: str, receive_own_broadcasts: bool = True): 151 """ 152 Initializes an Agent. 153 154 This constructor is not meant to be called directly. It is invoked by 155 the Space class when adding an agent. 156 157 Subclasses should call super().__init__() in their constructor. 158 159 Args: 160 id: The id of the agent 161 receive_own_broadcasts: 162 Whether the agent will receive its own broadcasts. Defaults to 163 True 164 """ 165 if len(id) < 1 or len(id) > 255: 166 raise ValueError("id must be between 1 and 255 characters") 167 if re.match(r"^amq\.", id): 168 raise ValueError("id cannot start with \"amq.\"") 169 if id == "*": 170 raise ValueError("id cannot be \"*\"") 171 self._id: str = id 172 self._receive_own_broadcasts: bool = receive_own_broadcasts 173 # --- non-constructor properties set by Space/Processor --- 174 self._outbound_queue: Queue = None 175 self._is_processing: bool = False 176 # --- non-constructor properties --- 177 self._message_log: List[Message] = [] 178 self._message_log_lock: threading.Lock = threading.Lock() 179 self._pending_requests: Dict[str, Union[threading.Event, Message]] = {} 180 self._pending_requests_lock: threading.Lock = threading.Lock() 181 self.__thread_local_current_message = threading.local() 182 self.__thread_local_current_message.value: Message = None 183 184 def id(self) -> str: 185 return self._id 186 187 def send(self, message: dict) -> str: 188 """ 189 Sends (out) a message from this agent. 190 191 Args: 192 message: The message 193 194 Returns: 195 The meta.id of the sent message 196 197 Raises: 198 TypeError: If the message is not a dict 199 ValueError: If the message is invalid 200 """ 201 if not isinstance(message, dict): 202 raise TypeError("message must be a dict") 203 if "from" in message and message["from"] != self.id(): 204 raise ValueError( 205 f"'from' field value '{message['from']}' does not match this agent's id.") 206 message["from"] = self.id() 207 message["meta"] = { 208 "id": uuid.uuid4().__str__(), 209 **message.get("meta", {}), 210 } 211 message = Message(**message).dict(by_alias=True, exclude_unset=True) 212 with self._message_log_lock: 213 log("info", f"{self._id}: sending", message) 214 self._message_log.append(message) 215 self._outbound_queue.put(message) 216 return message["meta"]["id"] 217 218 def request(self, message: dict, timeout: float = 3) -> object: 219 """ 220 Synchronously sends a message then waits for and returns the return 221 value of the invoked action. 222 223 This method allows you to call an action synchronously like a function 224 and receive its return value in python. If the action raises an 225 exception an ActionError will be raised containing the error message. 226 227 Args: 228 message: The message to send 229 timeout: 230 The timeout in seconds to wait for the returned value. 231 Defaults to 3 seconds. 232 233 Returns: 234 object: The return value of the action. 235 236 Raises: 237 TimeoutError: If the timeout is reached 238 ActionError: If the action raised an exception 239 RuntimeError: 240 If called before the agent is processing incoming messages 241 """ 242 if not self._is_processing: 243 raise RuntimeError( 244 "request() called while agent is not processing incoming messages. Use send() instead.") 245 246 # Send and mark the request as pending 247 with self._pending_requests_lock: 248 request_id = self.send(message) 249 self._pending_requests[request_id] = threading.Event() 250 251 # Wait for response 252 if not self._pending_requests[request_id].wait(timeout=timeout): 253 raise TimeoutError 254 255 with self._pending_requests_lock: 256 response_message = self._pending_requests.pop(request_id) 257 258 # Raise error or return value from response 259 if response_message["action"]["name"] == _ERROR_ACTION_NAME: 260 raise ActionError(response_message["action"]["args"]["error"]) 261 262 if response_message["action"]["name"] == _RESPONSE_ACTION_NAME: 263 return response_message["action"]["args"]["value"] 264 265 raise RuntimeError("We should never get here") 266 267 def respond_with(self, value): 268 """ 269 Sends a response with the given value. 270 271 Parameters: 272 value (any): The value to be sent in the response message. 273 """ 274 self.send({ 275 "meta": { 276 "parent_id": self.current_message()["meta"]["id"] 277 }, 278 "to": self.current_message()['from'], 279 "action": { 280 "name": _RESPONSE_ACTION_NAME, 281 "args": { 282 "value": value, 283 } 284 } 285 }) 286 287 def raise_with(self, error: Exception): 288 """ 289 Sends an error response. 290 291 Args: 292 error (Exception): The error to send. 293 """ 294 self.send({ 295 "meta": { 296 "parent_id": self.current_message()["meta"]["id"], 297 }, 298 "to": self.current_message()['from'], 299 "action": { 300 "name": _ERROR_ACTION_NAME, 301 "args": { 302 "error": f"{error.__class__.__name__}: {error}" 303 } 304 } 305 }) 306 307 def _receive(self, message: dict): 308 """ 309 Receives and handles an incoming message. 310 311 Args: 312 message: The incoming message 313 """ 314 try: 315 # Ignore own broadcasts if _receive_own_broadcasts is false 316 if not self._receive_own_broadcasts \ 317 and message['from'] == self.id() \ 318 and message['to'] == '*': 319 return 320 321 log("debug", f"{self.id()}: received message", message) 322 323 # Record the received message before handling 324 with self._message_log_lock: 325 self._message_log.append(message) 326 327 # Handle incoming responses 328 # TODO: make serial/fan-out optional 329 if message["action"]["name"] in [_RESPONSE_ACTION_NAME, _ERROR_ACTION_NAME]: 330 parent_id = message["meta"]["parent_id"] 331 if parent_id in self._pending_requests.keys(): 332 # This was a response to a request(). We use a little trick 333 # here and simply swap out the event that is waiting with 334 # the message, then set the event. The request() method will 335 # pick up the response message in the existing thread. 336 event = self._pending_requests[parent_id] 337 self._pending_requests[parent_id] = message 338 event.set() 339 else: 340 # This was a response to a send() 341 if message["action"]["name"] == _RESPONSE_ACTION_NAME: 342 handler_callback = self.handle_action_value 343 arg = message["action"]["args"]["value"] 344 elif message["action"]["name"] == _ERROR_ACTION_NAME: 345 handler_callback = self.handle_action_error 346 arg = ActionError(message["action"]["args"]["error"]) 347 else: 348 raise RuntimeError("Unknown action response") 349 350 # Spawn a thread to handle the response 351 def __process_response(arg, current_message): 352 threading.current_thread( 353 ).name = f"{self.id()}: __process_response {current_message['meta']['id']}" 354 self.__thread_local_current_message.value = current_message 355 handler_callback(arg) 356 357 ResourceManager().thread_pool_executor.submit( 358 __process_response, 359 arg, 360 message, 361 ) 362 363 # Handle all other messages 364 else: 365 # Spawn a thread to process the message. This means that messages 366 # are processed concurrently, but may be processed out of order. 367 ResourceManager().thread_pool_executor.submit( 368 self.__process, 369 message, 370 ) 371 except Exception as e: 372 log("error", f"{self.id()}: raised exception in _receive", e) 373 374 def __process(self, message: dict): 375 """ 376 Top level method within the action processing thread. 377 """ 378 threading.current_thread( 379 ).name = f"{self.id()}: __process {message['meta']['id']}" 380 self.__thread_local_current_message.value = message 381 try: 382 log("debug", f"{self.id()}: committing action", message) 383 self.__commit(message) 384 except Exception as e: 385 # Handle errors (including PermissionError) that occur while 386 # committing an action by reporting back to the sender. 387 log("warning", 388 f"{self.id()}: raised exception while committing action '{message['action']['name']}'", e) 389 self.raise_with(e) 390 391 def __commit(self, message: dict): 392 """ 393 Invokes the action method 394 395 Args: 396 message: The incoming message specifying the action 397 398 Raises: 399 AttributeError: If the action method is not found 400 PermissionError: If the action is not permitted 401 """ 402 try: 403 # Check if the action method exists 404 action_method = self.__action_method(message["action"]["name"]) 405 except KeyError: 406 # the action was not found 407 if message['to'] == '*': 408 return # broadcasts will not raise an error in this situation 409 else: 410 raise AttributeError( 411 f"\"{message['action']['name']}\" not found on \"{self.id()}\"") 412 413 # Check if the action is permitted 414 if not self.__permitted(message): 415 raise PermissionError( 416 f"\"{self.id()}.{message['action']['name']}\" not permitted") 417 418 self.before_action(message) 419 420 return_value = None 421 error = None 422 try: 423 # Invoke the action method 424 return_value = action_method(**message['action'].get('args', {})) 425 except Exception as e: 426 error = e 427 raise 428 finally: 429 self.after_action(message, return_value, error) 430 return return_value 431 432 def __permitted(self, message: dict) -> bool: 433 """ 434 Checks whether the message's action is allowed 435 """ 436 action_method = self.__action_method(message['action']['name']) 437 policy = action_method.action_properties["access_policy"] 438 if policy == ACCESS_PERMITTED: 439 return True 440 elif policy == ACCESS_DENIED: 441 return False 442 elif policy == ACCESS_REQUESTED: 443 return self.request_permission(message) 444 else: 445 raise Exception( 446 f"Invalid access policy for method: {message['action']}, got '{policy}'") 447 448 def __action_methods(self) -> dict: 449 instance_methods = inspect.getmembers(self, inspect.ismethod) 450 action_methods = { 451 method_name: method 452 for method_name, method in instance_methods 453 if hasattr(method, "action_properties") 454 } 455 return action_methods 456 457 def __action_method(self, action_name: str): 458 """ 459 Returns the method for the given action name. 460 """ 461 action_methods = self.__action_methods() 462 return action_methods[action_name] 463 464 def _find_message(self, message_id: str) -> Message: 465 """ 466 Returns a message from the log with the given ID. 467 468 Args: 469 message_id: The ID of the message 470 471 Returns: 472 The message or None if not found 473 """ 474 for message in self._message_log: 475 if message["meta"]["id"] == message_id: 476 return message 477 478 def current_message(self) -> Message: 479 """ 480 Returns the full incoming message which invoked the current action. 481 482 This method may be called within an action or action related callback to 483 retrieve the current message, for example to determine the sender or 484 inspect other details. 485 486 Returns: 487 The current message 488 """ 489 return self.__thread_local_current_message.value 490 491 def parent_message(self, message: Message = None) -> Message: 492 """ 493 Returns the message that the given message is responding to, if any. 494 495 This method may be used within the handle_action_value and 496 handle_action_error callbacks. 497 498 Args: 499 message: The message to get the parent message of. Defaults to the 500 current message. 501 502 Returns: 503 The parent message or None 504 """ 505 if message is None: 506 message = self.current_message() 507 parent_id = message["meta"].get("parent_id", None) 508 if parent_id is not None: 509 return self._find_message(parent_id) 510 511 @action 512 def help(self, action_name: str = None) -> dict: 513 """ 514 Returns a list of actions on this agent. 515 516 If action_name is passed, returns a list with only that action. 517 If no action_name is passed, returns all actions. 518 519 Args: 520 action_name: (Optional) The name of an action to request help for 521 522 Returns: 523 A dictionary of actions 524 """ 525 self.respond_with(self._help(action_name)) 526 527 def _help(self, action_name: str = None) -> dict: 528 """ 529 Generates the help information returned by the help() action 530 """ 531 special_actions = ["help", _RESPONSE_ACTION_NAME, _ERROR_ACTION_NAME] 532 help_list = { 533 method.action_properties["name"]: method.action_properties["help"] 534 for method in self.__action_methods().values() 535 if action_name is None 536 and method.action_properties["name"] not in special_actions 537 or method.action_properties["name"] == action_name 538 } 539 return help_list 540 541 def handle_action_value(self, value): 542 """ 543 Receives a return value from a previous action. 544 545 This method receives return values from actions invoked by the send() 546 method. It is not called when using the request() method, which returns 547 the value directly. 548 549 To inspect the full response message carrying this value, use 550 current_message(). To inspect the message which returned the value, use 551 parent_message(). 552 553 Args: 554 value: 555 The return value 556 """ 557 if not hasattr(self, "_issued_handle_action_value_warning"): 558 self._issued_handle_action_value_warning = True 559 log("warning", 560 f"A value was returned from an action. Implement {self.__class__.__name__}.handle_action_value() to handle it.") 561 562 def handle_action_error(self, error: ActionError): 563 """ 564 Receives an error from a previous action. 565 566 This method receives errors from actions invoked by the send() method. 567 It is not called when using the request() method, which raises an error 568 directly. 569 570 To inspect the full response message carrying this error, use 571 current_message(). To inspect the message which caused the error, use 572 parent_message(). 573 574 Args: 575 error: The error 576 """ 577 if not hasattr(self, "_issued_handle_action_error_warning"): 578 self._issued_handle_action_error_warning = True 579 log("warning", 580 f"An error was raised from an action. Implement {self.__class__.__name__}.handle_action_error() to handle it.") 581 582 def after_add(self): 583 """ 584 Called after the agent is added to a space, but before it begins 585 processing incoming messages. 586 587 The agent may send messages during this callback using the send() 588 method, but may not use the request() method since it relies on 589 processing incoming messages. 590 """ 591 592 def before_remove(self): 593 """ 594 Called before the agent is removed from a space, after it has finished 595 processing incoming messages. 596 597 The agent may send final messages during this callback using the send() 598 method, but may not use the request() method since it relies on 599 processing incoming messages. 600 """ 601 602 def before_action(self, message: dict): 603 """ 604 Called before every action. 605 606 This method will only be called if the action exists and is permitted. 607 608 Args: 609 message: The received message that contains the action 610 """ 611 612 def after_action(self, message: dict, return_value: str, error: str): 613 """ 614 Called after every action, regardless of whether an error occurred. 615 616 Args: 617 message: The message which invoked the action 618 return_value: The return value from the action 619 error: The error from the action if any 620 """ 621 622 def request_permission(self, proposed_message: dict) -> bool: 623 """ 624 Receives a proposed action message and presents it to the agent for 625 review. 626 627 Args: 628 proposed_message: The proposed action message 629 630 Returns: 631 True if access should be permitted 632 """ 633 raise NotImplementedError( 634 f"You must implement {self.__class__.__name__}.request_permission() to use ACCESS_REQUESTED")
115def action(*args, **kwargs): 116 """ 117 Declares instance methods as actions making them accessible to other agents. 118 119 Keyword arguments: 120 name: The name of the action. Defaults to the name of the method. 121 help: The help object. Defaults to a generated object. 122 access_policy: The access policy. Defaults to ACCESS_PERMITTED. 123 """ 124 def decorator(method): 125 action_name = kwargs.get("name", method.__name__) 126 if action_name == _RESPONSE_ACTION_NAME: 127 raise ValueError(f"action name '{action_name}' is reserved") 128 method.action_properties: dict = { 129 "name": method.__name__, 130 "help": _generate_help(method), 131 "access_policy": ACCESS_PERMITTED, 132 **kwargs, 133 } 134 return method 135 136 if len(args) == 1 and callable(args[0]) and not kwargs: 137 return decorator(args[0]) # The decorator was used without parentheses 138 else: 139 return decorator # The decorator was used with parentheses
142class ActionError(Exception): 143 """Raised from the request() method if the action responds with an error"""
Raised from the request() method if the action responds with an error
146class Agent(): 147 """ 148 An Actor that may represent an AI agent, software interface, or human user 149 """ 150 151 def __init__(self, id: str, receive_own_broadcasts: bool = True): 152 """ 153 Initializes an Agent. 154 155 This constructor is not meant to be called directly. It is invoked by 156 the Space class when adding an agent. 157 158 Subclasses should call super().__init__() in their constructor. 159 160 Args: 161 id: The id of the agent 162 receive_own_broadcasts: 163 Whether the agent will receive its own broadcasts. Defaults to 164 True 165 """ 166 if len(id) < 1 or len(id) > 255: 167 raise ValueError("id must be between 1 and 255 characters") 168 if re.match(r"^amq\.", id): 169 raise ValueError("id cannot start with \"amq.\"") 170 if id == "*": 171 raise ValueError("id cannot be \"*\"") 172 self._id: str = id 173 self._receive_own_broadcasts: bool = receive_own_broadcasts 174 # --- non-constructor properties set by Space/Processor --- 175 self._outbound_queue: Queue = None 176 self._is_processing: bool = False 177 # --- non-constructor properties --- 178 self._message_log: List[Message] = [] 179 self._message_log_lock: threading.Lock = threading.Lock() 180 self._pending_requests: Dict[str, Union[threading.Event, Message]] = {} 181 self._pending_requests_lock: threading.Lock = threading.Lock() 182 self.__thread_local_current_message = threading.local() 183 self.__thread_local_current_message.value: Message = None 184 185 def id(self) -> str: 186 return self._id 187 188 def send(self, message: dict) -> str: 189 """ 190 Sends (out) a message from this agent. 191 192 Args: 193 message: The message 194 195 Returns: 196 The meta.id of the sent message 197 198 Raises: 199 TypeError: If the message is not a dict 200 ValueError: If the message is invalid 201 """ 202 if not isinstance(message, dict): 203 raise TypeError("message must be a dict") 204 if "from" in message and message["from"] != self.id(): 205 raise ValueError( 206 f"'from' field value '{message['from']}' does not match this agent's id.") 207 message["from"] = self.id() 208 message["meta"] = { 209 "id": uuid.uuid4().__str__(), 210 **message.get("meta", {}), 211 } 212 message = Message(**message).dict(by_alias=True, exclude_unset=True) 213 with self._message_log_lock: 214 log("info", f"{self._id}: sending", message) 215 self._message_log.append(message) 216 self._outbound_queue.put(message) 217 return message["meta"]["id"] 218 219 def request(self, message: dict, timeout: float = 3) -> object: 220 """ 221 Synchronously sends a message then waits for and returns the return 222 value of the invoked action. 223 224 This method allows you to call an action synchronously like a function 225 and receive its return value in python. If the action raises an 226 exception an ActionError will be raised containing the error message. 227 228 Args: 229 message: The message to send 230 timeout: 231 The timeout in seconds to wait for the returned value. 232 Defaults to 3 seconds. 233 234 Returns: 235 object: The return value of the action. 236 237 Raises: 238 TimeoutError: If the timeout is reached 239 ActionError: If the action raised an exception 240 RuntimeError: 241 If called before the agent is processing incoming messages 242 """ 243 if not self._is_processing: 244 raise RuntimeError( 245 "request() called while agent is not processing incoming messages. Use send() instead.") 246 247 # Send and mark the request as pending 248 with self._pending_requests_lock: 249 request_id = self.send(message) 250 self._pending_requests[request_id] = threading.Event() 251 252 # Wait for response 253 if not self._pending_requests[request_id].wait(timeout=timeout): 254 raise TimeoutError 255 256 with self._pending_requests_lock: 257 response_message = self._pending_requests.pop(request_id) 258 259 # Raise error or return value from response 260 if response_message["action"]["name"] == _ERROR_ACTION_NAME: 261 raise ActionError(response_message["action"]["args"]["error"]) 262 263 if response_message["action"]["name"] == _RESPONSE_ACTION_NAME: 264 return response_message["action"]["args"]["value"] 265 266 raise RuntimeError("We should never get here") 267 268 def respond_with(self, value): 269 """ 270 Sends a response with the given value. 271 272 Parameters: 273 value (any): The value to be sent in the response message. 274 """ 275 self.send({ 276 "meta": { 277 "parent_id": self.current_message()["meta"]["id"] 278 }, 279 "to": self.current_message()['from'], 280 "action": { 281 "name": _RESPONSE_ACTION_NAME, 282 "args": { 283 "value": value, 284 } 285 } 286 }) 287 288 def raise_with(self, error: Exception): 289 """ 290 Sends an error response. 291 292 Args: 293 error (Exception): The error to send. 294 """ 295 self.send({ 296 "meta": { 297 "parent_id": self.current_message()["meta"]["id"], 298 }, 299 "to": self.current_message()['from'], 300 "action": { 301 "name": _ERROR_ACTION_NAME, 302 "args": { 303 "error": f"{error.__class__.__name__}: {error}" 304 } 305 } 306 }) 307 308 def _receive(self, message: dict): 309 """ 310 Receives and handles an incoming message. 311 312 Args: 313 message: The incoming message 314 """ 315 try: 316 # Ignore own broadcasts if _receive_own_broadcasts is false 317 if not self._receive_own_broadcasts \ 318 and message['from'] == self.id() \ 319 and message['to'] == '*': 320 return 321 322 log("debug", f"{self.id()}: received message", message) 323 324 # Record the received message before handling 325 with self._message_log_lock: 326 self._message_log.append(message) 327 328 # Handle incoming responses 329 # TODO: make serial/fan-out optional 330 if message["action"]["name"] in [_RESPONSE_ACTION_NAME, _ERROR_ACTION_NAME]: 331 parent_id = message["meta"]["parent_id"] 332 if parent_id in self._pending_requests.keys(): 333 # This was a response to a request(). We use a little trick 334 # here and simply swap out the event that is waiting with 335 # the message, then set the event. The request() method will 336 # pick up the response message in the existing thread. 337 event = self._pending_requests[parent_id] 338 self._pending_requests[parent_id] = message 339 event.set() 340 else: 341 # This was a response to a send() 342 if message["action"]["name"] == _RESPONSE_ACTION_NAME: 343 handler_callback = self.handle_action_value 344 arg = message["action"]["args"]["value"] 345 elif message["action"]["name"] == _ERROR_ACTION_NAME: 346 handler_callback = self.handle_action_error 347 arg = ActionError(message["action"]["args"]["error"]) 348 else: 349 raise RuntimeError("Unknown action response") 350 351 # Spawn a thread to handle the response 352 def __process_response(arg, current_message): 353 threading.current_thread( 354 ).name = f"{self.id()}: __process_response {current_message['meta']['id']}" 355 self.__thread_local_current_message.value = current_message 356 handler_callback(arg) 357 358 ResourceManager().thread_pool_executor.submit( 359 __process_response, 360 arg, 361 message, 362 ) 363 364 # Handle all other messages 365 else: 366 # Spawn a thread to process the message. This means that messages 367 # are processed concurrently, but may be processed out of order. 368 ResourceManager().thread_pool_executor.submit( 369 self.__process, 370 message, 371 ) 372 except Exception as e: 373 log("error", f"{self.id()}: raised exception in _receive", e) 374 375 def __process(self, message: dict): 376 """ 377 Top level method within the action processing thread. 378 """ 379 threading.current_thread( 380 ).name = f"{self.id()}: __process {message['meta']['id']}" 381 self.__thread_local_current_message.value = message 382 try: 383 log("debug", f"{self.id()}: committing action", message) 384 self.__commit(message) 385 except Exception as e: 386 # Handle errors (including PermissionError) that occur while 387 # committing an action by reporting back to the sender. 388 log("warning", 389 f"{self.id()}: raised exception while committing action '{message['action']['name']}'", e) 390 self.raise_with(e) 391 392 def __commit(self, message: dict): 393 """ 394 Invokes the action method 395 396 Args: 397 message: The incoming message specifying the action 398 399 Raises: 400 AttributeError: If the action method is not found 401 PermissionError: If the action is not permitted 402 """ 403 try: 404 # Check if the action method exists 405 action_method = self.__action_method(message["action"]["name"]) 406 except KeyError: 407 # the action was not found 408 if message['to'] == '*': 409 return # broadcasts will not raise an error in this situation 410 else: 411 raise AttributeError( 412 f"\"{message['action']['name']}\" not found on \"{self.id()}\"") 413 414 # Check if the action is permitted 415 if not self.__permitted(message): 416 raise PermissionError( 417 f"\"{self.id()}.{message['action']['name']}\" not permitted") 418 419 self.before_action(message) 420 421 return_value = None 422 error = None 423 try: 424 # Invoke the action method 425 return_value = action_method(**message['action'].get('args', {})) 426 except Exception as e: 427 error = e 428 raise 429 finally: 430 self.after_action(message, return_value, error) 431 return return_value 432 433 def __permitted(self, message: dict) -> bool: 434 """ 435 Checks whether the message's action is allowed 436 """ 437 action_method = self.__action_method(message['action']['name']) 438 policy = action_method.action_properties["access_policy"] 439 if policy == ACCESS_PERMITTED: 440 return True 441 elif policy == ACCESS_DENIED: 442 return False 443 elif policy == ACCESS_REQUESTED: 444 return self.request_permission(message) 445 else: 446 raise Exception( 447 f"Invalid access policy for method: {message['action']}, got '{policy}'") 448 449 def __action_methods(self) -> dict: 450 instance_methods = inspect.getmembers(self, inspect.ismethod) 451 action_methods = { 452 method_name: method 453 for method_name, method in instance_methods 454 if hasattr(method, "action_properties") 455 } 456 return action_methods 457 458 def __action_method(self, action_name: str): 459 """ 460 Returns the method for the given action name. 461 """ 462 action_methods = self.__action_methods() 463 return action_methods[action_name] 464 465 def _find_message(self, message_id: str) -> Message: 466 """ 467 Returns a message from the log with the given ID. 468 469 Args: 470 message_id: The ID of the message 471 472 Returns: 473 The message or None if not found 474 """ 475 for message in self._message_log: 476 if message["meta"]["id"] == message_id: 477 return message 478 479 def current_message(self) -> Message: 480 """ 481 Returns the full incoming message which invoked the current action. 482 483 This method may be called within an action or action related callback to 484 retrieve the current message, for example to determine the sender or 485 inspect other details. 486 487 Returns: 488 The current message 489 """ 490 return self.__thread_local_current_message.value 491 492 def parent_message(self, message: Message = None) -> Message: 493 """ 494 Returns the message that the given message is responding to, if any. 495 496 This method may be used within the handle_action_value and 497 handle_action_error callbacks. 498 499 Args: 500 message: The message to get the parent message of. Defaults to the 501 current message. 502 503 Returns: 504 The parent message or None 505 """ 506 if message is None: 507 message = self.current_message() 508 parent_id = message["meta"].get("parent_id", None) 509 if parent_id is not None: 510 return self._find_message(parent_id) 511 512 @action 513 def help(self, action_name: str = None) -> dict: 514 """ 515 Returns a list of actions on this agent. 516 517 If action_name is passed, returns a list with only that action. 518 If no action_name is passed, returns all actions. 519 520 Args: 521 action_name: (Optional) The name of an action to request help for 522 523 Returns: 524 A dictionary of actions 525 """ 526 self.respond_with(self._help(action_name)) 527 528 def _help(self, action_name: str = None) -> dict: 529 """ 530 Generates the help information returned by the help() action 531 """ 532 special_actions = ["help", _RESPONSE_ACTION_NAME, _ERROR_ACTION_NAME] 533 help_list = { 534 method.action_properties["name"]: method.action_properties["help"] 535 for method in self.__action_methods().values() 536 if action_name is None 537 and method.action_properties["name"] not in special_actions 538 or method.action_properties["name"] == action_name 539 } 540 return help_list 541 542 def handle_action_value(self, value): 543 """ 544 Receives a return value from a previous action. 545 546 This method receives return values from actions invoked by the send() 547 method. It is not called when using the request() method, which returns 548 the value directly. 549 550 To inspect the full response message carrying this value, use 551 current_message(). To inspect the message which returned the value, use 552 parent_message(). 553 554 Args: 555 value: 556 The return value 557 """ 558 if not hasattr(self, "_issued_handle_action_value_warning"): 559 self._issued_handle_action_value_warning = True 560 log("warning", 561 f"A value was returned from an action. Implement {self.__class__.__name__}.handle_action_value() to handle it.") 562 563 def handle_action_error(self, error: ActionError): 564 """ 565 Receives an error from a previous action. 566 567 This method receives errors from actions invoked by the send() method. 568 It is not called when using the request() method, which raises an error 569 directly. 570 571 To inspect the full response message carrying this error, use 572 current_message(). To inspect the message which caused the error, use 573 parent_message(). 574 575 Args: 576 error: The error 577 """ 578 if not hasattr(self, "_issued_handle_action_error_warning"): 579 self._issued_handle_action_error_warning = True 580 log("warning", 581 f"An error was raised from an action. Implement {self.__class__.__name__}.handle_action_error() to handle it.") 582 583 def after_add(self): 584 """ 585 Called after the agent is added to a space, but before it begins 586 processing incoming messages. 587 588 The agent may send messages during this callback using the send() 589 method, but may not use the request() method since it relies on 590 processing incoming messages. 591 """ 592 593 def before_remove(self): 594 """ 595 Called before the agent is removed from a space, after it has finished 596 processing incoming messages. 597 598 The agent may send final messages during this callback using the send() 599 method, but may not use the request() method since it relies on 600 processing incoming messages. 601 """ 602 603 def before_action(self, message: dict): 604 """ 605 Called before every action. 606 607 This method will only be called if the action exists and is permitted. 608 609 Args: 610 message: The received message that contains the action 611 """ 612 613 def after_action(self, message: dict, return_value: str, error: str): 614 """ 615 Called after every action, regardless of whether an error occurred. 616 617 Args: 618 message: The message which invoked the action 619 return_value: The return value from the action 620 error: The error from the action if any 621 """ 622 623 def request_permission(self, proposed_message: dict) -> bool: 624 """ 625 Receives a proposed action message and presents it to the agent for 626 review. 627 628 Args: 629 proposed_message: The proposed action message 630 631 Returns: 632 True if access should be permitted 633 """ 634 raise NotImplementedError( 635 f"You must implement {self.__class__.__name__}.request_permission() to use ACCESS_REQUESTED")
An Actor that may represent an AI agent, software interface, or human user
151 def __init__(self, id: str, receive_own_broadcasts: bool = True): 152 """ 153 Initializes an Agent. 154 155 This constructor is not meant to be called directly. It is invoked by 156 the Space class when adding an agent. 157 158 Subclasses should call super().__init__() in their constructor. 159 160 Args: 161 id: The id of the agent 162 receive_own_broadcasts: 163 Whether the agent will receive its own broadcasts. Defaults to 164 True 165 """ 166 if len(id) < 1 or len(id) > 255: 167 raise ValueError("id must be between 1 and 255 characters") 168 if re.match(r"^amq\.", id): 169 raise ValueError("id cannot start with \"amq.\"") 170 if id == "*": 171 raise ValueError("id cannot be \"*\"") 172 self._id: str = id 173 self._receive_own_broadcasts: bool = receive_own_broadcasts 174 # --- non-constructor properties set by Space/Processor --- 175 self._outbound_queue: Queue = None 176 self._is_processing: bool = False 177 # --- non-constructor properties --- 178 self._message_log: List[Message] = [] 179 self._message_log_lock: threading.Lock = threading.Lock() 180 self._pending_requests: Dict[str, Union[threading.Event, Message]] = {} 181 self._pending_requests_lock: threading.Lock = threading.Lock() 182 self.__thread_local_current_message = threading.local() 183 self.__thread_local_current_message.value: Message = None
Initializes an Agent.
This constructor is not meant to be called directly. It is invoked by the Space class when adding an agent.
Subclasses should call super().__init__() in their constructor.
Arguments:
- id: The id of the agent
- receive_own_broadcasts: Whether the agent will receive its own broadcasts. Defaults to True
188 def send(self, message: dict) -> str: 189 """ 190 Sends (out) a message from this agent. 191 192 Args: 193 message: The message 194 195 Returns: 196 The meta.id of the sent message 197 198 Raises: 199 TypeError: If the message is not a dict 200 ValueError: If the message is invalid 201 """ 202 if not isinstance(message, dict): 203 raise TypeError("message must be a dict") 204 if "from" in message and message["from"] != self.id(): 205 raise ValueError( 206 f"'from' field value '{message['from']}' does not match this agent's id.") 207 message["from"] = self.id() 208 message["meta"] = { 209 "id": uuid.uuid4().__str__(), 210 **message.get("meta", {}), 211 } 212 message = Message(**message).dict(by_alias=True, exclude_unset=True) 213 with self._message_log_lock: 214 log("info", f"{self._id}: sending", message) 215 self._message_log.append(message) 216 self._outbound_queue.put(message) 217 return message["meta"]["id"]
219 def request(self, message: dict, timeout: float = 3) -> object: 220 """ 221 Synchronously sends a message then waits for and returns the return 222 value of the invoked action. 223 224 This method allows you to call an action synchronously like a function 225 and receive its return value in python. If the action raises an 226 exception an ActionError will be raised containing the error message. 227 228 Args: 229 message: The message to send 230 timeout: 231 The timeout in seconds to wait for the returned value. 232 Defaults to 3 seconds. 233 234 Returns: 235 object: The return value of the action. 236 237 Raises: 238 TimeoutError: If the timeout is reached 239 ActionError: If the action raised an exception 240 RuntimeError: 241 If called before the agent is processing incoming messages 242 """ 243 if not self._is_processing: 244 raise RuntimeError( 245 "request() called while agent is not processing incoming messages. Use send() instead.") 246 247 # Send and mark the request as pending 248 with self._pending_requests_lock: 249 request_id = self.send(message) 250 self._pending_requests[request_id] = threading.Event() 251 252 # Wait for response 253 if not self._pending_requests[request_id].wait(timeout=timeout): 254 raise TimeoutError 255 256 with self._pending_requests_lock: 257 response_message = self._pending_requests.pop(request_id) 258 259 # Raise error or return value from response 260 if response_message["action"]["name"] == _ERROR_ACTION_NAME: 261 raise ActionError(response_message["action"]["args"]["error"]) 262 263 if response_message["action"]["name"] == _RESPONSE_ACTION_NAME: 264 return response_message["action"]["args"]["value"] 265 266 raise RuntimeError("We should never get here")
Synchronously sends a message then waits for and returns the return value of the invoked action.
This method allows you to call an action synchronously like a function and receive its return value in python. If the action raises an exception an ActionError will be raised containing the error message.
Arguments:
- message: The message to send
- timeout: The timeout in seconds to wait for the returned value. Defaults to 3 seconds.
Returns:
object: The return value of the action.
Raises:
- TimeoutError: If the timeout is reached
- ActionError: If the action raised an exception
- RuntimeError: If called before the agent is processing incoming messages
268 def respond_with(self, value): 269 """ 270 Sends a response with the given value. 271 272 Parameters: 273 value (any): The value to be sent in the response message. 274 """ 275 self.send({ 276 "meta": { 277 "parent_id": self.current_message()["meta"]["id"] 278 }, 279 "to": self.current_message()['from'], 280 "action": { 281 "name": _RESPONSE_ACTION_NAME, 282 "args": { 283 "value": value, 284 } 285 } 286 })
288 def raise_with(self, error: Exception): 289 """ 290 Sends an error response. 291 292 Args: 293 error (Exception): The error to send. 294 """ 295 self.send({ 296 "meta": { 297 "parent_id": self.current_message()["meta"]["id"], 298 }, 299 "to": self.current_message()['from'], 300 "action": { 301 "name": _ERROR_ACTION_NAME, 302 "args": { 303 "error": f"{error.__class__.__name__}: {error}" 304 } 305 } 306 })
479 def current_message(self) -> Message: 480 """ 481 Returns the full incoming message which invoked the current action. 482 483 This method may be called within an action or action related callback to 484 retrieve the current message, for example to determine the sender or 485 inspect other details. 486 487 Returns: 488 The current message 489 """ 490 return self.__thread_local_current_message.value
492 def parent_message(self, message: Message = None) -> Message: 493 """ 494 Returns the message that the given message is responding to, if any. 495 496 This method may be used within the handle_action_value and 497 handle_action_error callbacks. 498 499 Args: 500 message: The message to get the parent message of. Defaults to the 501 current message. 502 503 Returns: 504 The parent message or None 505 """ 506 if message is None: 507 message = self.current_message() 508 parent_id = message["meta"].get("parent_id", None) 509 if parent_id is not None: 510 return self._find_message(parent_id)
512 @action 513 def help(self, action_name: str = None) -> dict: 514 """ 515 Returns a list of actions on this agent. 516 517 If action_name is passed, returns a list with only that action. 518 If no action_name is passed, returns all actions. 519 520 Args: 521 action_name: (Optional) The name of an action to request help for 522 523 Returns: 524 A dictionary of actions 525 """ 526 self.respond_with(self._help(action_name))
542 def handle_action_value(self, value): 543 """ 544 Receives a return value from a previous action. 545 546 This method receives return values from actions invoked by the send() 547 method. It is not called when using the request() method, which returns 548 the value directly. 549 550 To inspect the full response message carrying this value, use 551 current_message(). To inspect the message which returned the value, use 552 parent_message(). 553 554 Args: 555 value: 556 The return value 557 """ 558 if not hasattr(self, "_issued_handle_action_value_warning"): 559 self._issued_handle_action_value_warning = True 560 log("warning", 561 f"A value was returned from an action. Implement {self.__class__.__name__}.handle_action_value() to handle it.")
Receives a return value from a previous action.
This method receives return values from actions invoked by the send() method. It is not called when using the request() method, which returns the value directly.
To inspect the full response message carrying this value, use current_message(). To inspect the message which returned the value, use parent_message().
Arguments:
- value: The return value
563 def handle_action_error(self, error: ActionError): 564 """ 565 Receives an error from a previous action. 566 567 This method receives errors from actions invoked by the send() method. 568 It is not called when using the request() method, which raises an error 569 directly. 570 571 To inspect the full response message carrying this error, use 572 current_message(). To inspect the message which caused the error, use 573 parent_message(). 574 575 Args: 576 error: The error 577 """ 578 if not hasattr(self, "_issued_handle_action_error_warning"): 579 self._issued_handle_action_error_warning = True 580 log("warning", 581 f"An error was raised from an action. Implement {self.__class__.__name__}.handle_action_error() to handle it.")
Receives an error from a previous action.
This method receives errors from actions invoked by the send() method. It is not called when using the request() method, which raises an error directly.
To inspect the full response message carrying this error, use current_message(). To inspect the message which caused the error, use parent_message().
Arguments:
- error: The error
583 def after_add(self): 584 """ 585 Called after the agent is added to a space, but before it begins 586 processing incoming messages. 587 588 The agent may send messages during this callback using the send() 589 method, but may not use the request() method since it relies on 590 processing incoming messages. 591 """
Called after the agent is added to a space, but before it begins processing incoming messages.
The agent may send messages during this callback using the send() method, but may not use the request() method since it relies on processing incoming messages.
593 def before_remove(self): 594 """ 595 Called before the agent is removed from a space, after it has finished 596 processing incoming messages. 597 598 The agent may send final messages during this callback using the send() 599 method, but may not use the request() method since it relies on 600 processing incoming messages. 601 """
Called before the agent is removed from a space, after it has finished processing incoming messages.
The agent may send final messages during this callback using the send() method, but may not use the request() method since it relies on processing incoming messages.
613 def after_action(self, message: dict, return_value: str, error: str): 614 """ 615 Called after every action, regardless of whether an error occurred. 616 617 Args: 618 message: The message which invoked the action 619 return_value: The return value from the action 620 error: The error from the action if any 621 """
623 def request_permission(self, proposed_message: dict) -> bool: 624 """ 625 Receives a proposed action message and presents it to the agent for 626 review. 627 628 Args: 629 proposed_message: The proposed action message 630 631 Returns: 632 True if access should be permitted 633 """ 634 raise NotImplementedError( 635 f"You must implement {self.__class__.__name__}.request_permission() to use ACCESS_REQUESTED")