
  1import multiprocessing
  2import queue
  3import threading
  4from abc import ABC, ABCMeta
  5from concurrent.futures import (Executor, Future)
  6from typing import Dict, List, Protocol, Type
  8from agency.agent import Agent
  9from agency.logger import log
 10from agency.queue import Queue
 13class _EventProtocol(Protocol):
 14    def set(self) -> None:
 15        pass
 17    def clear(self) -> None:
 18        pass
 20    def is_set(self) -> bool:
 21        pass
 23    def wait(self, timeout: float = None) -> bool:
 24        pass
 27class Processor(ABC, metaclass=ABCMeta):
 28    """
 29    Encapsulates a running Agent instance
 30    """
 31    def __init__(self,
 32                 agent_type: Type[Agent],
 33                 agent_id: str,
 34                 agent_args: List,
 35                 agent_kwargs: Dict,
 36                 inbound_queue: Queue,
 37                 outbound_queue: Queue,
 38                 started: _EventProtocol,
 39                 stopping: _EventProtocol,
 40                 new_message_event: _EventProtocol,
 41                 executor: Executor):
 42        self.agent_type: Type[Agent] = agent_type
 43        self.agent_id: str = agent_id
 44        self.agent_args: List = agent_args
 45        self.agent_kwargs: Dict = agent_kwargs
 46        self.inbound_queue: Queue = inbound_queue
 47        self.outbound_queue: Queue = outbound_queue
 48        self.started: _EventProtocol = started
 49        self.stopping: _EventProtocol = stopping
 50        self.new_message_event: _EventProtocol = new_message_event
 51        self.executor: Executor = executor
 52        # --- non-constructor properties ---
 53        self._future: Future = None
 54        self._agent: Agent = None  # Accessible if in foreground
 56    def start(self) -> Agent:
 57        log("debug", f"{self.agent_id}: processor starting ...")
 59        agent_ref: List = []
 60        self._future = self.executor.submit(
 61            _process_loop,
 62            self.agent_type,
 63            self.agent_id,
 64            self.agent_args,
 65            self.agent_kwargs,
 66            self.inbound_queue,
 67            self.outbound_queue,
 68            self.started,
 69            self.stopping,
 70            self.new_message_event,
 71            agent_ref,
 72        )
 74        if not self.started.wait(timeout=5):
 75            # it couldn't start, force stop it and raise an exception
 76            self.stop()
 77            error = self._future.exception()
 78            if error is not None:
 79                raise error
 80            else:
 81                raise Exception("Processor could not be started.")
 83        log("debug", f"{self.agent_id}: processor started")
 85        # return the agent if present. only works in foreground
 86        if agent_ref:
 87            return agent_ref[0]
 89    def stop(self):
 90        log("debug", f"{self.agent_id}: processor stopping ...")
 91        self.stopping.set()
 92        if self._future is not None:
 93            self._future.result()
 94        log("debug", f"{self.agent_id}: processor stopped")
 97# Placed at the top-level to play nice with the multiprocessing module
 98def _process_loop(agent_type: Type[Agent],
 99                  agent_id: str,
100                  agent_args: List,
101                  agent_kwargs: Dict,
102                  inbound_queue: Queue,
103                  outbound_queue: Queue,
104                  started: _EventProtocol,
105                  stopping: _EventProtocol,
106                  new_message_event: _EventProtocol,
107                  agent_ref: List):
108    """
109    The main agent processing loop
110    """
111    # Set process or thread name
112    if isinstance(started, threading.Event):
113        threading.current_thread(
114        ).name = f"{agent_id}: processor loop thread"
115    else:
116        multiprocessing.current_process(
117        ).name = f"{agent_id}: processor loop process"
119    try:
120        log("debug", f"{agent_id}: processor loop starting")
121        agent: Agent = agent_type(agent_id, *agent_args, **agent_kwargs)
122        agent_ref.append(agent)  # set the agent reference
123        inbound_queue.connect()
124        outbound_queue.connect()
125        agent._outbound_queue = outbound_queue
126        agent.after_add()
127        agent._is_processing = True
128        started.set()
129        stopping.clear()
130        new_message_event.clear()
131        while not stopping.is_set():
132            new_message_event.wait(timeout=0.1) # TODO make configurable
133            if stopping.is_set():
134                log("debug",
135                    f"{agent_id}: processor loop stopping")
136                break
137            while True:  # drain inbound_queue
138                try:
139                    message = inbound_queue.get(block=False)
140                    log("debug",
141                        f"{agent_id}: processor loop got message", message)
142                    agent._receive(message)
143                except queue.Empty:
144                    break
145            new_message_event.clear()
146    except KeyboardInterrupt:
147        log("debug", f"{agent_id}: processor loop interrupted")
148        pass
149    except Exception as e:
150        log("error", f"{agent_id}: processor loop failed", e)
151        raise
152    finally:
153        log("debug", f"{agent_id}: processor loop cleaning up")
154        agent._is_processing = False
155        agent.before_remove()
156        inbound_queue.disconnect()
157        outbound_queue.disconnect()
158        log("debug", f"{agent_id}: processor loop stopped")
class Processor(abc.ABC):
28class Processor(ABC, metaclass=ABCMeta):
29    """
30    Encapsulates a running Agent instance
31    """
32    def __init__(self,
33                 agent_type: Type[Agent],
34                 agent_id: str,
35                 agent_args: List,
36                 agent_kwargs: Dict,
37                 inbound_queue: Queue,
38                 outbound_queue: Queue,
39                 started: _EventProtocol,
40                 stopping: _EventProtocol,
41                 new_message_event: _EventProtocol,
42                 executor: Executor):
43        self.agent_type: Type[Agent] = agent_type
44        self.agent_id: str = agent_id
45        self.agent_args: List = agent_args
46        self.agent_kwargs: Dict = agent_kwargs
47        self.inbound_queue: Queue = inbound_queue
48        self.outbound_queue: Queue = outbound_queue
49        self.started: _EventProtocol = started
50        self.stopping: _EventProtocol = stopping
51        self.new_message_event: _EventProtocol = new_message_event
52        self.executor: Executor = executor
53        # --- non-constructor properties ---
54        self._future: Future = None
55        self._agent: Agent = None  # Accessible if in foreground
57    def start(self) -> Agent:
58        log("debug", f"{self.agent_id}: processor starting ...")
60        agent_ref: List = []
61        self._future = self.executor.submit(
62            _process_loop,
63            self.agent_type,
64            self.agent_id,
65            self.agent_args,
66            self.agent_kwargs,
67            self.inbound_queue,
68            self.outbound_queue,
69            self.started,
70            self.stopping,
71            self.new_message_event,
72            agent_ref,
73        )
75        if not self.started.wait(timeout=5):
76            # it couldn't start, force stop it and raise an exception
77            self.stop()
78            error = self._future.exception()
79            if error is not None:
80                raise error
81            else:
82                raise Exception("Processor could not be started.")
84        log("debug", f"{self.agent_id}: processor started")
86        # return the agent if present. only works in foreground
87        if agent_ref:
88            return agent_ref[0]
90    def stop(self):
91        log("debug", f"{self.agent_id}: processor stopping ...")
92        self.stopping.set()
93        if self._future is not None:
94            self._future.result()
95        log("debug", f"{self.agent_id}: processor stopped")

Encapsulates a running Agent instance

Processor( agent_type: Type[agency.agent.Agent], agent_id: str, agent_args: List, agent_kwargs: Dict, inbound_queue: agency.queue.Queue, outbound_queue: agency.queue.Queue, started: agency.processor._EventProtocol, stopping: agency.processor._EventProtocol, new_message_event: agency.processor._EventProtocol, executor: concurrent.futures._base.Executor)
32    def __init__(self,
33                 agent_type: Type[Agent],
34                 agent_id: str,
35                 agent_args: List,
36                 agent_kwargs: Dict,
37                 inbound_queue: Queue,
38                 outbound_queue: Queue,
39                 started: _EventProtocol,
40                 stopping: _EventProtocol,
41                 new_message_event: _EventProtocol,
42                 executor: Executor):
43        self.agent_type: Type[Agent] = agent_type
44        self.agent_id: str = agent_id
45        self.agent_args: List = agent_args
46        self.agent_kwargs: Dict = agent_kwargs
47        self.inbound_queue: Queue = inbound_queue
48        self.outbound_queue: Queue = outbound_queue
49        self.started: _EventProtocol = started
50        self.stopping: _EventProtocol = stopping
51        self.new_message_event: _EventProtocol = new_message_event
52        self.executor: Executor = executor
53        # --- non-constructor properties ---
54        self._future: Future = None
55        self._agent: Agent = None  # Accessible if in foreground
agent_type: Type[agency.agent.Agent]
agent_id: str
agent_args: List
agent_kwargs: Dict
inbound_queue: agency.queue.Queue
outbound_queue: agency.queue.Queue
started: agency.processor._EventProtocol
stopping: agency.processor._EventProtocol
new_message_event: agency.processor._EventProtocol
executor: concurrent.futures._base.Executor
def start(self) -> agency.agent.Agent:
57    def start(self) -> Agent:
58        log("debug", f"{self.agent_id}: processor starting ...")
60        agent_ref: List = []
61        self._future = self.executor.submit(
62            _process_loop,
63            self.agent_type,
64            self.agent_id,
65            self.agent_args,
66            self.agent_kwargs,
67            self.inbound_queue,
68            self.outbound_queue,
69            self.started,
70            self.stopping,
71            self.new_message_event,
72            agent_ref,
73        )
75        if not self.started.wait(timeout=5):
76            # it couldn't start, force stop it and raise an exception
77            self.stop()
78            error = self._future.exception()
79            if error is not None:
80                raise error
81            else:
82                raise Exception("Processor could not be started.")
84        log("debug", f"{self.agent_id}: processor started")
86        # return the agent if present. only works in foreground
87        if agent_ref:
88            return agent_ref[0]
def stop(self):
90    def stop(self):
91        log("debug", f"{self.agent_id}: processor stopping ...")
92        self.stopping.set()
93        if self._future is not None:
94            self._future.result()
95        log("debug", f"{self.agent_id}: processor stopped")