def init_channel(self):
self.connection = pika.BlockingConnection(pika.URLParameters(conf_get("amqp_uri")))
self.channel = self.connection.channel()
+
self.channel.exchange_declare(exchange='classifier', exchange_type='topic')
def init_queue(self):
self.connection.add_callback_threadsafe(callback=self.channel.stop_consuming)
def run(self):
- try:
- self.register_plugin()
- except:
- logger.exception("Oops. Registration failed")
self.init_channel()
+ self.register_plugin()
self.init_queue()
self.channel.basic_consume(queue=self.queue, on_message_callback=self.callback)
raise NotImplementedError()
def register_plugin(self):
-
- connection = pika.BlockingConnection(pika.URLParameters(conf_get("amqp_uri")))
- channel = connection.channel()
- channel.queue_declare(queue='plugin_registry')
- channel.basic_publish(
+ self.channel.queue_declare(queue='plugin_registry')
+ self.channel.basic_publish(
exchange='',
routing_key='plugin_registry',
body=json.dumps(self.get_declaration()).encode("utf-8")