self.channel = self.connection.channel()
self.channel.exchange_declare(exchange='classifier', exchange_type='topic')
+ def init_queue(self):
+ result = self.channel.queue_declare(exclusive=True, queue="")
+ self.queue = result.method.queue
+
+ for binding_key in self.binding_keys:
+ logger.info("Registering plugin %s for %s to queue %s", self.get_subclass_name(), binding_key, self.queue)
+ self.channel.queue_bind(
+ exchange='classifier',
+ queue=self.queue,
+ routing_key=binding_key
+ )
+
def callback(self, ch, method, properties, body):
logger.debug("Reacting on %s in %s", str(method.routing_key), self.get_subclass_name())
body = json.loads(body.decode("utf-8"))
except:
logger.exception("Oops. Registration failed")
self.init_channel()
-
- result = self.channel.queue_declare(exclusive=True, queue="")
- self.queue = result.method.queue
- for binding_key in self.binding_keys:
- logger.info("Registering plugin %s for %s", self.get_subclass_name(), binding_key)
- self.channel.queue_bind(
- exchange='classifier',
- queue=self.queue,
- routing_key=binding_key
- )
+ self.init_queue()
self.channel.basic_consume(queue=self.queue, on_message_callback=self.callback)
self.channel.start_consuming()
plugin_storage = {}
reverse_lookup = {}
- def init_channel(self):
- super().init_channel()
- self.channel.exchange_declare(exchange='plugin_exc', exchange_type='fanout')
- result = self.channel.queue_declare(exclusive=True, queue=self.queue)
- plugin_queue = result.method.queue
- self.channel.queue_bind(exchange='plugin_exc', queue=plugin_queue)
+ def init_queue(self):
+ super().init_queue()
self.channel.basic_consume(queue='plugin_registry', on_message_callback=self.callback_plugins)
def callback_plugins(self, ch, method, properties, body):
body = json.loads(body.decode("utf-8"))
- logger.debug("received plugin in registry")
+ logger.debug("received plugin in registry: %s", body.get("name", "ERROR"))
self.plugin_storage[body["name"].lower()] = body
ch.basic_ack(delivery_tag=method.delivery_tag)