agency.spaces.amqp_space

  1import json
  2import os
  3import queue
  4import socket
  5import threading
  6import time
  7from concurrent.futures import Future
  8from dataclasses import dataclass
  9
 10import amqp
 11import kombu
 12
 13from agency.logger import log
 14from agency.queue import Queue
 15from agency.resources import ResourceManager
 16from agency.schema import Message
 17from agency.space import Space
 18
 19_BROADCAST_KEY = "__broadcast__"
 20
 21@dataclass
 22class AMQPOptions:
 23    """A class that defines AMQP connection options"""
 24    hostname: str = 'localhost'
 25    port: int = '5672'
 26    username: str = 'guest'
 27    password: str = 'guest'
 28    virtual_host: str = '/'
 29    use_ssl: bool = False
 30    heartbeat: float = 60
 31
 32
 33class _AMQPQueue(Queue):
 34    """An AMQP based Queue using the kombu library"""
 35
 36    def __init__(self, amqp_options: AMQPOptions, exchange_name: str, routing_key: str):
 37        self.kombu_connection_options = {
 38            'hostname': amqp_options.hostname,
 39            'port': amqp_options.port,
 40            'userid': amqp_options.username,
 41            'password': amqp_options.password,
 42            'virtual_host': amqp_options.virtual_host,
 43            'ssl': amqp_options.use_ssl,
 44            'heartbeat': amqp_options.heartbeat,
 45        }
 46        self.exchange_name: str = exchange_name
 47        self.routing_key: str = routing_key
 48
 49
 50class _AMQPInboundQueue(_AMQPQueue):
 51
 52    def __init__(self, amqp_options: AMQPOptions, exchange_name: str, routing_key: str):
 53        super().__init__(amqp_options, exchange_name, routing_key)
 54        self._connection: kombu.Connection = None
 55        self._exchange: kombu.Exchange = None
 56        self._direct_queue: kombu.Queue = None
 57        self._broadcast_queue: kombu.Queue = None
 58        self._heartbeat_future: Future = None
 59        self._received_queue: queue.Queue = None
 60        self._disconnecting: threading.Event = None
 61
 62    def connect(self):
 63        log("debug", f"{self.routing_key}: connecting")
 64
 65        self._received_queue = queue.Queue()
 66
 67        def _callback(body, amqp_message):
 68            amqp_message.ack()
 69            self._received_queue.put(json.loads(body))
 70
 71        try:
 72            self._connection = kombu.Connection(
 73                **self.kombu_connection_options)
 74            self._connection.connect()
 75            self._exchange = kombu.Exchange(
 76                self.exchange_name, 'topic', durable=True)
 77            self._direct_queue = kombu.Queue(
 78                self.routing_key,
 79                exchange=self._exchange,
 80                routing_key=self.routing_key,
 81                exclusive=True)
 82            self._broadcast_queue = kombu.Queue(
 83                f"{self.routing_key}-broadcast",
 84                exchange=self._exchange,
 85                routing_key=_BROADCAST_KEY,
 86                exclusive=True)
 87            self._consumer = kombu.Consumer(
 88                self._connection,
 89                [self._direct_queue, self._broadcast_queue],
 90                callbacks=[_callback])
 91            self._consumer.consume()
 92        except amqp.exceptions.ResourceLocked:
 93            raise ValueError(f"Agent '{self.routing_key}' already exists")
 94
 95        # start heartbeat thread
 96        def _heartbeat_thread(disconnecting):
 97            log("debug", f"{self.routing_key}: heartbeat thread starting")
 98            try:
 99                while not disconnecting.is_set():
100                    try:
101                        self._connection.heartbeat_check()
102                        self._connection.drain_events(timeout=0.2)
103                        time.sleep(0.1)
104                    except socket.timeout:
105                        pass
106            except amqp.exceptions.ConnectionForced:
107                log("warning",
108                    f"{self.routing_key}: heartbeat connection force closed")
109            log("debug", f"{self.routing_key}: heartbeat thread stopped")
110        self._disconnecting = threading.Event()
111        self._disconnecting.clear()
112        self._heartbeat_future = ResourceManager(
113        ).thread_pool_executor.submit(_heartbeat_thread, self._disconnecting)
114
115        log("debug", f"{self.routing_key}: connected")
116
117    def disconnect(self):
118        log("debug", f"{self.routing_key}: disconnecting")
119        if self._disconnecting:
120            self._disconnecting.set()
121        try:
122            if self._heartbeat_future is not None:
123                self._heartbeat_future.result(timeout=5)
124        finally:
125            if self._connection:
126                self._connection.close()
127        log("debug", f"{self.routing_key}: disconnected")
128
129    def put(self, message: Message):
130        raise NotImplementedError("AMQPInboundQueue does not support put")
131
132    def get(self, block: bool = True, timeout: float = None) -> Message:
133        message = self._received_queue.get(block=block, timeout=timeout)
134        return message
135
136
137class _AMQPOutboundQueue(_AMQPQueue):
138
139    def __init__(self, amqp_options: AMQPOptions, exchange_name: str, routing_key: str):
140        super().__init__(amqp_options, exchange_name, routing_key)
141        self._exchange: kombu.Exchange = None
142
143    def connect(self):
144        self._exchange = kombu.Exchange(
145            self.exchange_name, 'topic', durable=True)
146        
147    def put(self, message: Message):
148        with kombu.Connection(**self.kombu_connection_options) as connection:
149            with connection.Producer() as producer:
150                if message['to'] == '*':
151                    producer.publish(
152                        json.dumps(message),
153                        exchange=self._exchange,
154                        routing_key=_BROADCAST_KEY,
155                    )
156                else:
157                    producer.publish(
158                        json.dumps(message),
159                        exchange=self._exchange,
160                        routing_key=message['to'],
161                    )
162
163    def get(self, block: bool = True, timeout: float = None) -> Message:
164        raise NotImplementedError("AMQPOutboundQueue does not support get")
165
166
167class AMQPSpace(Space):
168    """
169    A Space that uses AMQP for message delivery.
170
171    This Space type is useful for distributing agents across multiple hosts.
172    """
173
174    def __init__(self, amqp_options: AMQPOptions = None, exchange_name: str = "agency"):
175        super().__init__()
176        if amqp_options is None:
177            amqp_options = self.__default_amqp_options()
178        self.amqp_options = amqp_options
179        self.exchange_name = exchange_name
180
181    def __default_amqp_options(self) -> AMQPOptions:
182        """
183        Returns a default AMQPOptions object configurable from environment
184        variables.
185        """
186        # TODO add support for AMQP_URL
187        return AMQPOptions(
188            hostname=os.environ.get('AMQP_HOST', 'localhost'),
189            port=int(os.environ.get('AMQP_PORT', 5672)),
190            username=os.environ.get('AMQP_USERNAME', 'guest'),
191            password=os.environ.get('AMQP_PASSWORD', 'guest'),
192            virtual_host=os.environ.get('AMQP_VHOST', '/'),
193            use_ssl=False,
194            heartbeat=60,
195        )
196
197    def _create_inbound_queue(self, agent_id) -> Queue:
198        return _AMQPInboundQueue(
199            amqp_options=self.amqp_options,
200            exchange_name=self.exchange_name,
201            routing_key=agent_id,
202        )
203
204    def _create_outbound_queue(self, agent_id) -> Queue:
205        return _AMQPOutboundQueue(
206            amqp_options=self.amqp_options,
207            exchange_name=self.exchange_name,
208            routing_key=agent_id,
209        )
@dataclass
class AMQPOptions:
22@dataclass
23class AMQPOptions:
24    """A class that defines AMQP connection options"""
25    hostname: str = 'localhost'
26    port: int = '5672'
27    username: str = 'guest'
28    password: str = 'guest'
29    virtual_host: str = '/'
30    use_ssl: bool = False
31    heartbeat: float = 60

A class that defines AMQP connection options

AMQPOptions( hostname: str = 'localhost', port: int = '5672', username: str = 'guest', password: str = 'guest', virtual_host: str = '/', use_ssl: bool = False, heartbeat: float = 60)
hostname: str = 'localhost'
port: int = '5672'
username: str = 'guest'
password: str = 'guest'
virtual_host: str = '/'
use_ssl: bool = False
heartbeat: float = 60
class AMQPSpace(agency.space.Space):
168class AMQPSpace(Space):
169    """
170    A Space that uses AMQP for message delivery.
171
172    This Space type is useful for distributing agents across multiple hosts.
173    """
174
175    def __init__(self, amqp_options: AMQPOptions = None, exchange_name: str = "agency"):
176        super().__init__()
177        if amqp_options is None:
178            amqp_options = self.__default_amqp_options()
179        self.amqp_options = amqp_options
180        self.exchange_name = exchange_name
181
182    def __default_amqp_options(self) -> AMQPOptions:
183        """
184        Returns a default AMQPOptions object configurable from environment
185        variables.
186        """
187        # TODO add support for AMQP_URL
188        return AMQPOptions(
189            hostname=os.environ.get('AMQP_HOST', 'localhost'),
190            port=int(os.environ.get('AMQP_PORT', 5672)),
191            username=os.environ.get('AMQP_USERNAME', 'guest'),
192            password=os.environ.get('AMQP_PASSWORD', 'guest'),
193            virtual_host=os.environ.get('AMQP_VHOST', '/'),
194            use_ssl=False,
195            heartbeat=60,
196        )
197
198    def _create_inbound_queue(self, agent_id) -> Queue:
199        return _AMQPInboundQueue(
200            amqp_options=self.amqp_options,
201            exchange_name=self.exchange_name,
202            routing_key=agent_id,
203        )
204
205    def _create_outbound_queue(self, agent_id) -> Queue:
206        return _AMQPOutboundQueue(
207            amqp_options=self.amqp_options,
208            exchange_name=self.exchange_name,
209            routing_key=agent_id,
210        )

A Space that uses AMQP for message delivery.

This Space type is useful for distributing agents across multiple hosts.

AMQPSpace( amqp_options: AMQPOptions = None, exchange_name: str = 'agency')
175    def __init__(self, amqp_options: AMQPOptions = None, exchange_name: str = "agency"):
176        super().__init__()
177        if amqp_options is None:
178            amqp_options = self.__default_amqp_options()
179        self.amqp_options = amqp_options
180        self.exchange_name = exchange_name
amqp_options
exchange_name