agency.spaces.amqp_space
1import json 2import os 3import queue 4import socket 5import threading 6import time 7from concurrent.futures import Future 8from dataclasses import dataclass 9 10import amqp 11import kombu 12 13from agency.logger import log 14from agency.queue import Queue 15from agency.resources import ResourceManager 16from agency.schema import Message 17from agency.space import Space 18 19_BROADCAST_KEY = "__broadcast__" 20 21@dataclass 22class AMQPOptions: 23 """A class that defines AMQP connection options""" 24 hostname: str = 'localhost' 25 port: int = '5672' 26 username: str = 'guest' 27 password: str = 'guest' 28 virtual_host: str = '/' 29 use_ssl: bool = False 30 heartbeat: float = 60 31 32 33class _AMQPQueue(Queue): 34 """An AMQP based Queue using the kombu library""" 35 36 def __init__(self, amqp_options: AMQPOptions, exchange_name: str, routing_key: str): 37 self.kombu_connection_options = { 38 'hostname': amqp_options.hostname, 39 'port': amqp_options.port, 40 'userid': amqp_options.username, 41 'password': amqp_options.password, 42 'virtual_host': amqp_options.virtual_host, 43 'ssl': amqp_options.use_ssl, 44 'heartbeat': amqp_options.heartbeat, 45 } 46 self.exchange_name: str = exchange_name 47 self.routing_key: str = routing_key 48 49 50class _AMQPInboundQueue(_AMQPQueue): 51 52 def __init__(self, amqp_options: AMQPOptions, exchange_name: str, routing_key: str): 53 super().__init__(amqp_options, exchange_name, routing_key) 54 self._connection: kombu.Connection = None 55 self._exchange: kombu.Exchange = None 56 self._direct_queue: kombu.Queue = None 57 self._broadcast_queue: kombu.Queue = None 58 self._heartbeat_future: Future = None 59 self._received_queue: queue.Queue = None 60 self._disconnecting: threading.Event = None 61 62 def connect(self): 63 log("debug", f"{self.routing_key}: connecting") 64 65 self._received_queue = queue.Queue() 66 67 def _callback(body, amqp_message): 68 amqp_message.ack() 69 self._received_queue.put(json.loads(body)) 70 71 try: 72 self._connection = kombu.Connection( 73 **self.kombu_connection_options) 74 self._connection.connect() 75 self._exchange = kombu.Exchange( 76 self.exchange_name, 'topic', durable=True) 77 self._direct_queue = kombu.Queue( 78 self.routing_key, 79 exchange=self._exchange, 80 routing_key=self.routing_key, 81 exclusive=True) 82 self._broadcast_queue = kombu.Queue( 83 f"{self.routing_key}-broadcast", 84 exchange=self._exchange, 85 routing_key=_BROADCAST_KEY, 86 exclusive=True) 87 self._consumer = kombu.Consumer( 88 self._connection, 89 [self._direct_queue, self._broadcast_queue], 90 callbacks=[_callback]) 91 self._consumer.consume() 92 except amqp.exceptions.ResourceLocked: 93 raise ValueError(f"Agent '{self.routing_key}' already exists") 94 95 # start heartbeat thread 96 def _heartbeat_thread(disconnecting): 97 log("debug", f"{self.routing_key}: heartbeat thread starting") 98 try: 99 while not disconnecting.is_set(): 100 try: 101 self._connection.heartbeat_check() 102 self._connection.drain_events(timeout=0.2) 103 time.sleep(0.1) 104 except socket.timeout: 105 pass 106 except amqp.exceptions.ConnectionForced: 107 log("warning", 108 f"{self.routing_key}: heartbeat connection force closed") 109 log("debug", f"{self.routing_key}: heartbeat thread stopped") 110 self._disconnecting = threading.Event() 111 self._disconnecting.clear() 112 self._heartbeat_future = ResourceManager( 113 ).thread_pool_executor.submit(_heartbeat_thread, self._disconnecting) 114 115 log("debug", f"{self.routing_key}: connected") 116 117 def disconnect(self): 118 log("debug", f"{self.routing_key}: disconnecting") 119 if self._disconnecting: 120 self._disconnecting.set() 121 try: 122 if self._heartbeat_future is not None: 123 self._heartbeat_future.result(timeout=5) 124 finally: 125 if self._connection: 126 self._connection.close() 127 log("debug", f"{self.routing_key}: disconnected") 128 129 def put(self, message: Message): 130 raise NotImplementedError("AMQPInboundQueue does not support put") 131 132 def get(self, block: bool = True, timeout: float = None) -> Message: 133 message = self._received_queue.get(block=block, timeout=timeout) 134 return message 135 136 137class _AMQPOutboundQueue(_AMQPQueue): 138 139 def __init__(self, amqp_options: AMQPOptions, exchange_name: str, routing_key: str): 140 super().__init__(amqp_options, exchange_name, routing_key) 141 self._exchange: kombu.Exchange = None 142 143 def connect(self): 144 self._exchange = kombu.Exchange( 145 self.exchange_name, 'topic', durable=True) 146 147 def put(self, message: Message): 148 with kombu.Connection(**self.kombu_connection_options) as connection: 149 with connection.Producer() as producer: 150 if message['to'] == '*': 151 producer.publish( 152 json.dumps(message), 153 exchange=self._exchange, 154 routing_key=_BROADCAST_KEY, 155 ) 156 else: 157 producer.publish( 158 json.dumps(message), 159 exchange=self._exchange, 160 routing_key=message['to'], 161 ) 162 163 def get(self, block: bool = True, timeout: float = None) -> Message: 164 raise NotImplementedError("AMQPOutboundQueue does not support get") 165 166 167class AMQPSpace(Space): 168 """ 169 A Space that uses AMQP for message delivery. 170 171 This Space type is useful for distributing agents across multiple hosts. 172 """ 173 174 def __init__(self, amqp_options: AMQPOptions = None, exchange_name: str = "agency"): 175 super().__init__() 176 if amqp_options is None: 177 amqp_options = self.__default_amqp_options() 178 self.amqp_options = amqp_options 179 self.exchange_name = exchange_name 180 181 def __default_amqp_options(self) -> AMQPOptions: 182 """ 183 Returns a default AMQPOptions object configurable from environment 184 variables. 185 """ 186 # TODO add support for AMQP_URL 187 return AMQPOptions( 188 hostname=os.environ.get('AMQP_HOST', 'localhost'), 189 port=int(os.environ.get('AMQP_PORT', 5672)), 190 username=os.environ.get('AMQP_USERNAME', 'guest'), 191 password=os.environ.get('AMQP_PASSWORD', 'guest'), 192 virtual_host=os.environ.get('AMQP_VHOST', '/'), 193 use_ssl=False, 194 heartbeat=60, 195 ) 196 197 def _create_inbound_queue(self, agent_id) -> Queue: 198 return _AMQPInboundQueue( 199 amqp_options=self.amqp_options, 200 exchange_name=self.exchange_name, 201 routing_key=agent_id, 202 ) 203 204 def _create_outbound_queue(self, agent_id) -> Queue: 205 return _AMQPOutboundQueue( 206 amqp_options=self.amqp_options, 207 exchange_name=self.exchange_name, 208 routing_key=agent_id, 209 )
@dataclass
class AMQPOptions: 22@dataclass 23class AMQPOptions: 24 """A class that defines AMQP connection options""" 25 hostname: str = 'localhost' 26 port: int = '5672' 27 username: str = 'guest' 28 password: str = 'guest' 29 virtual_host: str = '/' 30 use_ssl: bool = False 31 heartbeat: float = 60
A class that defines AMQP connection options
168class AMQPSpace(Space): 169 """ 170 A Space that uses AMQP for message delivery. 171 172 This Space type is useful for distributing agents across multiple hosts. 173 """ 174 175 def __init__(self, amqp_options: AMQPOptions = None, exchange_name: str = "agency"): 176 super().__init__() 177 if amqp_options is None: 178 amqp_options = self.__default_amqp_options() 179 self.amqp_options = amqp_options 180 self.exchange_name = exchange_name 181 182 def __default_amqp_options(self) -> AMQPOptions: 183 """ 184 Returns a default AMQPOptions object configurable from environment 185 variables. 186 """ 187 # TODO add support for AMQP_URL 188 return AMQPOptions( 189 hostname=os.environ.get('AMQP_HOST', 'localhost'), 190 port=int(os.environ.get('AMQP_PORT', 5672)), 191 username=os.environ.get('AMQP_USERNAME', 'guest'), 192 password=os.environ.get('AMQP_PASSWORD', 'guest'), 193 virtual_host=os.environ.get('AMQP_VHOST', '/'), 194 use_ssl=False, 195 heartbeat=60, 196 ) 197 198 def _create_inbound_queue(self, agent_id) -> Queue: 199 return _AMQPInboundQueue( 200 amqp_options=self.amqp_options, 201 exchange_name=self.exchange_name, 202 routing_key=agent_id, 203 ) 204 205 def _create_outbound_queue(self, agent_id) -> Queue: 206 return _AMQPOutboundQueue( 207 amqp_options=self.amqp_options, 208 exchange_name=self.exchange_name, 209 routing_key=agent_id, 210 )
A Space that uses AMQP for message delivery.
This Space type is useful for distributing agents across multiple hosts.
AMQPSpace( amqp_options: AMQPOptions = None, exchange_name: str = 'agency')