agency.processor

  1import multiprocessing
  2import queue
  3import threading
  4from abc import ABC, ABCMeta
  5from concurrent.futures import (Executor, Future)
  6from typing import Dict, List, Protocol, Type
  7
  8from agency.agent import Agent
  9from agency.logger import log
 10from agency.queue import Queue
 11
 12
 13class _EventProtocol(Protocol):
 14    def set(self) -> None:
 15        pass
 16
 17    def clear(self) -> None:
 18        pass
 19
 20    def is_set(self) -> bool:
 21        pass
 22
 23    def wait(self, timeout: float = None) -> bool:
 24        pass
 25
 26
 27class Processor(ABC, metaclass=ABCMeta):
 28    """
 29    Encapsulates a running Agent instance
 30    """
 31    def __init__(self,
 32                 agent_type: Type[Agent],
 33                 agent_id: str,
 34                 agent_args: List,
 35                 agent_kwargs: Dict,
 36                 inbound_queue: Queue,
 37                 outbound_queue: Queue,
 38                 started: _EventProtocol,
 39                 stopping: _EventProtocol,
 40                 new_message_event: _EventProtocol,
 41                 executor: Executor):
 42        self.agent_type: Type[Agent] = agent_type
 43        self.agent_id: str = agent_id
 44        self.agent_args: List = agent_args
 45        self.agent_kwargs: Dict = agent_kwargs
 46        self.inbound_queue: Queue = inbound_queue
 47        self.outbound_queue: Queue = outbound_queue
 48        self.started: _EventProtocol = started
 49        self.stopping: _EventProtocol = stopping
 50        self.new_message_event: _EventProtocol = new_message_event
 51        self.executor: Executor = executor
 52        # --- non-constructor properties ---
 53        self._future: Future = None
 54        self._agent: Agent = None  # Accessible if in foreground
 55
 56    def start(self) -> Agent:
 57        log("debug", f"{self.agent_id}: processor starting ...")
 58
 59        agent_ref: List = []
 60        self._future = self.executor.submit(
 61            _process_loop,
 62            self.agent_type,
 63            self.agent_id,
 64            self.agent_args,
 65            self.agent_kwargs,
 66            self.inbound_queue,
 67            self.outbound_queue,
 68            self.started,
 69            self.stopping,
 70            self.new_message_event,
 71            agent_ref,
 72        )
 73
 74        if not self.started.wait(timeout=5):
 75            # it couldn't start, force stop it and raise an exception
 76            self.stop()
 77            error = self._future.exception()
 78            if error is not None:
 79                raise error
 80            else:
 81                raise Exception("Processor could not be started.")
 82
 83        log("debug", f"{self.agent_id}: processor started")
 84
 85        # return the agent if present. only works in foreground
 86        if agent_ref:
 87            return agent_ref[0]
 88
 89    def stop(self):
 90        log("debug", f"{self.agent_id}: processor stopping ...")
 91        self.stopping.set()
 92        if self._future is not None:
 93            self._future.result()
 94        log("debug", f"{self.agent_id}: processor stopped")
 95
 96
 97# Placed at the top-level to play nice with the multiprocessing module
 98def _process_loop(agent_type: Type[Agent],
 99                  agent_id: str,
100                  agent_args: List,
101                  agent_kwargs: Dict,
102                  inbound_queue: Queue,
103                  outbound_queue: Queue,
104                  started: _EventProtocol,
105                  stopping: _EventProtocol,
106                  new_message_event: _EventProtocol,
107                  agent_ref: List):
108    """
109    The main agent processing loop
110    """
111    # Set process or thread name
112    if isinstance(started, threading.Event):
113        threading.current_thread(
114        ).name = f"{agent_id}: processor loop thread"
115    else:
116        multiprocessing.current_process(
117        ).name = f"{agent_id}: processor loop process"
118
119    try:
120        log("debug", f"{agent_id}: processor loop starting")
121        agent: Agent = agent_type(agent_id, *agent_args, **agent_kwargs)
122        agent_ref.append(agent)  # set the agent reference
123        inbound_queue.connect()
124        outbound_queue.connect()
125        agent._outbound_queue = outbound_queue
126        agent.after_add()
127        agent._is_processing = True
128        started.set()
129        stopping.clear()
130        new_message_event.clear()
131        while not stopping.is_set():
132            new_message_event.wait(timeout=0.1) # TODO make configurable
133            if stopping.is_set():
134                log("debug",
135                    f"{agent_id}: processor loop stopping")
136                break
137            while True:  # drain inbound_queue
138                try:
139                    message = inbound_queue.get(block=False)
140                    log("debug",
141                        f"{agent_id}: processor loop got message", message)
142                    agent._receive(message)
143                except queue.Empty:
144                    break
145            new_message_event.clear()
146    except KeyboardInterrupt:
147        log("debug", f"{agent_id}: processor loop interrupted")
148        pass
149    except Exception as e:
150        log("error", f"{agent_id}: processor loop failed", e)
151        raise
152    finally:
153        log("debug", f"{agent_id}: processor loop cleaning up")
154        agent._is_processing = False
155        agent.before_remove()
156        inbound_queue.disconnect()
157        outbound_queue.disconnect()
158        log("debug", f"{agent_id}: processor loop stopped")
class Processor(abc.ABC):
28class Processor(ABC, metaclass=ABCMeta):
29    """
30    Encapsulates a running Agent instance
31    """
32    def __init__(self,
33                 agent_type: Type[Agent],
34                 agent_id: str,
35                 agent_args: List,
36                 agent_kwargs: Dict,
37                 inbound_queue: Queue,
38                 outbound_queue: Queue,
39                 started: _EventProtocol,
40                 stopping: _EventProtocol,
41                 new_message_event: _EventProtocol,
42                 executor: Executor):
43        self.agent_type: Type[Agent] = agent_type
44        self.agent_id: str = agent_id
45        self.agent_args: List = agent_args
46        self.agent_kwargs: Dict = agent_kwargs
47        self.inbound_queue: Queue = inbound_queue
48        self.outbound_queue: Queue = outbound_queue
49        self.started: _EventProtocol = started
50        self.stopping: _EventProtocol = stopping
51        self.new_message_event: _EventProtocol = new_message_event
52        self.executor: Executor = executor
53        # --- non-constructor properties ---
54        self._future: Future = None
55        self._agent: Agent = None  # Accessible if in foreground
56
57    def start(self) -> Agent:
58        log("debug", f"{self.agent_id}: processor starting ...")
59
60        agent_ref: List = []
61        self._future = self.executor.submit(
62            _process_loop,
63            self.agent_type,
64            self.agent_id,
65            self.agent_args,
66            self.agent_kwargs,
67            self.inbound_queue,
68            self.outbound_queue,
69            self.started,
70            self.stopping,
71            self.new_message_event,
72            agent_ref,
73        )
74
75        if not self.started.wait(timeout=5):
76            # it couldn't start, force stop it and raise an exception
77            self.stop()
78            error = self._future.exception()
79            if error is not None:
80                raise error
81            else:
82                raise Exception("Processor could not be started.")
83
84        log("debug", f"{self.agent_id}: processor started")
85
86        # return the agent if present. only works in foreground
87        if agent_ref:
88            return agent_ref[0]
89
90    def stop(self):
91        log("debug", f"{self.agent_id}: processor stopping ...")
92        self.stopping.set()
93        if self._future is not None:
94            self._future.result()
95        log("debug", f"{self.agent_id}: processor stopped")

Encapsulates a running Agent instance

Processor( agent_type: Type[agency.agent.Agent], agent_id: str, agent_args: List, agent_kwargs: Dict, inbound_queue: agency.queue.Queue, outbound_queue: agency.queue.Queue, started: agency.processor._EventProtocol, stopping: agency.processor._EventProtocol, new_message_event: agency.processor._EventProtocol, executor: concurrent.futures._base.Executor)
32    def __init__(self,
33                 agent_type: Type[Agent],
34                 agent_id: str,
35                 agent_args: List,
36                 agent_kwargs: Dict,
37                 inbound_queue: Queue,
38                 outbound_queue: Queue,
39                 started: _EventProtocol,
40                 stopping: _EventProtocol,
41                 new_message_event: _EventProtocol,
42                 executor: Executor):
43        self.agent_type: Type[Agent] = agent_type
44        self.agent_id: str = agent_id
45        self.agent_args: List = agent_args
46        self.agent_kwargs: Dict = agent_kwargs
47        self.inbound_queue: Queue = inbound_queue
48        self.outbound_queue: Queue = outbound_queue
49        self.started: _EventProtocol = started
50        self.stopping: _EventProtocol = stopping
51        self.new_message_event: _EventProtocol = new_message_event
52        self.executor: Executor = executor
53        # --- non-constructor properties ---
54        self._future: Future = None
55        self._agent: Agent = None  # Accessible if in foreground
agent_type: Type[agency.agent.Agent]
agent_id: str
agent_args: List
agent_kwargs: Dict
inbound_queue: agency.queue.Queue
outbound_queue: agency.queue.Queue
started: agency.processor._EventProtocol
stopping: agency.processor._EventProtocol
new_message_event: agency.processor._EventProtocol
executor: concurrent.futures._base.Executor
def start(self) -> agency.agent.Agent:
57    def start(self) -> Agent:
58        log("debug", f"{self.agent_id}: processor starting ...")
59
60        agent_ref: List = []
61        self._future = self.executor.submit(
62            _process_loop,
63            self.agent_type,
64            self.agent_id,
65            self.agent_args,
66            self.agent_kwargs,
67            self.inbound_queue,
68            self.outbound_queue,
69            self.started,
70            self.stopping,
71            self.new_message_event,
72            agent_ref,
73        )
74
75        if not self.started.wait(timeout=5):
76            # it couldn't start, force stop it and raise an exception
77            self.stop()
78            error = self._future.exception()
79            if error is not None:
80                raise error
81            else:
82                raise Exception("Processor could not be started.")
83
84        log("debug", f"{self.agent_id}: processor started")
85
86        # return the agent if present. only works in foreground
87        if agent_ref:
88            return agent_ref[0]
def stop(self):
90    def stop(self):
91        log("debug", f"{self.agent_id}: processor stopping ...")
92        self.stopping.set()
93        if self._future is not None:
94            self._future.result()
95        log("debug", f"{self.agent_id}: processor stopped")