agency.processor
1import multiprocessing 2import queue 3import threading 4from abc import ABC, ABCMeta 5from concurrent.futures import (Executor, Future) 6from typing import Dict, List, Protocol, Type 7 8from agency.agent import Agent 9from agency.logger import log 10from agency.queue import Queue 11 12 13class _EventProtocol(Protocol): 14 def set(self) -> None: 15 pass 16 17 def clear(self) -> None: 18 pass 19 20 def is_set(self) -> bool: 21 pass 22 23 def wait(self, timeout: float = None) -> bool: 24 pass 25 26 27class Processor(ABC, metaclass=ABCMeta): 28 """ 29 Encapsulates a running Agent instance 30 """ 31 def __init__(self, 32 agent_type: Type[Agent], 33 agent_id: str, 34 agent_args: List, 35 agent_kwargs: Dict, 36 inbound_queue: Queue, 37 outbound_queue: Queue, 38 started: _EventProtocol, 39 stopping: _EventProtocol, 40 new_message_event: _EventProtocol, 41 executor: Executor): 42 self.agent_type: Type[Agent] = agent_type 43 self.agent_id: str = agent_id 44 self.agent_args: List = agent_args 45 self.agent_kwargs: Dict = agent_kwargs 46 self.inbound_queue: Queue = inbound_queue 47 self.outbound_queue: Queue = outbound_queue 48 self.started: _EventProtocol = started 49 self.stopping: _EventProtocol = stopping 50 self.new_message_event: _EventProtocol = new_message_event 51 self.executor: Executor = executor 52 # --- non-constructor properties --- 53 self._future: Future = None 54 self._agent: Agent = None # Accessible if in foreground 55 56 def start(self) -> Agent: 57 log("debug", f"{self.agent_id}: processor starting ...") 58 59 agent_ref: List = [] 60 self._future = self.executor.submit( 61 _process_loop, 62 self.agent_type, 63 self.agent_id, 64 self.agent_args, 65 self.agent_kwargs, 66 self.inbound_queue, 67 self.outbound_queue, 68 self.started, 69 self.stopping, 70 self.new_message_event, 71 agent_ref, 72 ) 73 74 if not self.started.wait(timeout=5): 75 # it couldn't start, force stop it and raise an exception 76 self.stop() 77 error = self._future.exception() 78 if error is not None: 79 raise error 80 else: 81 raise Exception("Processor could not be started.") 82 83 log("debug", f"{self.agent_id}: processor started") 84 85 # return the agent if present. only works in foreground 86 if agent_ref: 87 return agent_ref[0] 88 89 def stop(self): 90 log("debug", f"{self.agent_id}: processor stopping ...") 91 self.stopping.set() 92 if self._future is not None: 93 self._future.result() 94 log("debug", f"{self.agent_id}: processor stopped") 95 96 97# Placed at the top-level to play nice with the multiprocessing module 98def _process_loop(agent_type: Type[Agent], 99 agent_id: str, 100 agent_args: List, 101 agent_kwargs: Dict, 102 inbound_queue: Queue, 103 outbound_queue: Queue, 104 started: _EventProtocol, 105 stopping: _EventProtocol, 106 new_message_event: _EventProtocol, 107 agent_ref: List): 108 """ 109 The main agent processing loop 110 """ 111 # Set process or thread name 112 if isinstance(started, threading.Event): 113 threading.current_thread( 114 ).name = f"{agent_id}: processor loop thread" 115 else: 116 multiprocessing.current_process( 117 ).name = f"{agent_id}: processor loop process" 118 119 try: 120 log("debug", f"{agent_id}: processor loop starting") 121 agent: Agent = agent_type(agent_id, *agent_args, **agent_kwargs) 122 agent_ref.append(agent) # set the agent reference 123 inbound_queue.connect() 124 outbound_queue.connect() 125 agent._outbound_queue = outbound_queue 126 agent.after_add() 127 agent._is_processing = True 128 started.set() 129 stopping.clear() 130 new_message_event.clear() 131 while not stopping.is_set(): 132 new_message_event.wait(timeout=0.1) # TODO make configurable 133 if stopping.is_set(): 134 log("debug", 135 f"{agent_id}: processor loop stopping") 136 break 137 while True: # drain inbound_queue 138 try: 139 message = inbound_queue.get(block=False) 140 log("debug", 141 f"{agent_id}: processor loop got message", message) 142 agent._receive(message) 143 except queue.Empty: 144 break 145 new_message_event.clear() 146 except KeyboardInterrupt: 147 log("debug", f"{agent_id}: processor loop interrupted") 148 pass 149 except Exception as e: 150 log("error", f"{agent_id}: processor loop failed", e) 151 raise 152 finally: 153 log("debug", f"{agent_id}: processor loop cleaning up") 154 agent._is_processing = False 155 agent.before_remove() 156 inbound_queue.disconnect() 157 outbound_queue.disconnect() 158 log("debug", f"{agent_id}: processor loop stopped")
class Processor(abc.ABC):
28class Processor(ABC, metaclass=ABCMeta): 29 """ 30 Encapsulates a running Agent instance 31 """ 32 def __init__(self, 33 agent_type: Type[Agent], 34 agent_id: str, 35 agent_args: List, 36 agent_kwargs: Dict, 37 inbound_queue: Queue, 38 outbound_queue: Queue, 39 started: _EventProtocol, 40 stopping: _EventProtocol, 41 new_message_event: _EventProtocol, 42 executor: Executor): 43 self.agent_type: Type[Agent] = agent_type 44 self.agent_id: str = agent_id 45 self.agent_args: List = agent_args 46 self.agent_kwargs: Dict = agent_kwargs 47 self.inbound_queue: Queue = inbound_queue 48 self.outbound_queue: Queue = outbound_queue 49 self.started: _EventProtocol = started 50 self.stopping: _EventProtocol = stopping 51 self.new_message_event: _EventProtocol = new_message_event 52 self.executor: Executor = executor 53 # --- non-constructor properties --- 54 self._future: Future = None 55 self._agent: Agent = None # Accessible if in foreground 56 57 def start(self) -> Agent: 58 log("debug", f"{self.agent_id}: processor starting ...") 59 60 agent_ref: List = [] 61 self._future = self.executor.submit( 62 _process_loop, 63 self.agent_type, 64 self.agent_id, 65 self.agent_args, 66 self.agent_kwargs, 67 self.inbound_queue, 68 self.outbound_queue, 69 self.started, 70 self.stopping, 71 self.new_message_event, 72 agent_ref, 73 ) 74 75 if not self.started.wait(timeout=5): 76 # it couldn't start, force stop it and raise an exception 77 self.stop() 78 error = self._future.exception() 79 if error is not None: 80 raise error 81 else: 82 raise Exception("Processor could not be started.") 83 84 log("debug", f"{self.agent_id}: processor started") 85 86 # return the agent if present. only works in foreground 87 if agent_ref: 88 return agent_ref[0] 89 90 def stop(self): 91 log("debug", f"{self.agent_id}: processor stopping ...") 92 self.stopping.set() 93 if self._future is not None: 94 self._future.result() 95 log("debug", f"{self.agent_id}: processor stopped")
Encapsulates a running Agent instance
Processor( agent_type: Type[agency.agent.Agent], agent_id: str, agent_args: List, agent_kwargs: Dict, inbound_queue: agency.queue.Queue, outbound_queue: agency.queue.Queue, started: agency.processor._EventProtocol, stopping: agency.processor._EventProtocol, new_message_event: agency.processor._EventProtocol, executor: concurrent.futures._base.Executor)
32 def __init__(self, 33 agent_type: Type[Agent], 34 agent_id: str, 35 agent_args: List, 36 agent_kwargs: Dict, 37 inbound_queue: Queue, 38 outbound_queue: Queue, 39 started: _EventProtocol, 40 stopping: _EventProtocol, 41 new_message_event: _EventProtocol, 42 executor: Executor): 43 self.agent_type: Type[Agent] = agent_type 44 self.agent_id: str = agent_id 45 self.agent_args: List = agent_args 46 self.agent_kwargs: Dict = agent_kwargs 47 self.inbound_queue: Queue = inbound_queue 48 self.outbound_queue: Queue = outbound_queue 49 self.started: _EventProtocol = started 50 self.stopping: _EventProtocol = stopping 51 self.new_message_event: _EventProtocol = new_message_event 52 self.executor: Executor = executor 53 # --- non-constructor properties --- 54 self._future: Future = None 55 self._agent: Agent = None # Accessible if in foreground
agent_type: Type[agency.agent.Agent]
inbound_queue: agency.queue.Queue
outbound_queue: agency.queue.Queue
57 def start(self) -> Agent: 58 log("debug", f"{self.agent_id}: processor starting ...") 59 60 agent_ref: List = [] 61 self._future = self.executor.submit( 62 _process_loop, 63 self.agent_type, 64 self.agent_id, 65 self.agent_args, 66 self.agent_kwargs, 67 self.inbound_queue, 68 self.outbound_queue, 69 self.started, 70 self.stopping, 71 self.new_message_event, 72 agent_ref, 73 ) 74 75 if not self.started.wait(timeout=5): 76 # it couldn't start, force stop it and raise an exception 77 self.stop() 78 error = self._future.exception() 79 if error is not None: 80 raise error 81 else: 82 raise Exception("Processor could not be started.") 83 84 log("debug", f"{self.agent_id}: processor started") 85 86 # return the agent if present. only works in foreground 87 if agent_ref: 88 return agent_ref[0]