logger.info('Issuing consumer related RPC commands')
# self.add_on_cancel_callback()
self._consumer_tag = self._channel.basic_consume(
- self.on_message, self._queue_name
+ queue=self._queue_name,
+ on_message_callback=self.on_message
)
logger.info("Setup consumer for actions on ch %s on con %s with tag %s", self._channel, self._connection,
self._consumer_tag)
import shlex
import pika
-from pika.adapters import AsyncioConnection
+from pika.adapters.asyncio_connection import AsyncioConnection
import slixmpp
from distbot.bot import action_worker
self.channel.stop_consuming()
def run(self):
- result = self.channel.queue_declare(exclusive=True)
+ result = self.channel.queue_declare(exclusive=True, queue="")
self.queue = result.method.queue
for binding_key in self.binding_keys:
routing_key=binding_key
)
- self.channel.basic_consume(self.callback, queue=self.queue)
+ self.channel.basic_consume(queue=self.queue, on_message_callback=self.callback)
self.channel.start_consuming()
def parse_body(self, msg):
channel.basic_publish(
exchange='',
routing_key='plugin_registry',
- body=json.dumps(self.get_declaration())
+ body=json.dumps(self.get_declaration()).encode("utf-8")
)
def get_declaration(self):
plugin_storage = {}
reverse_lookup = {}
- def __init__(self, actionqueue, queue="work"):
+ def __init__(self, actionqueue, queue=""):
super(Plugins, self).__init__(actionqueue, queue)
self.channel.exchange_declare(exchange='plugin_exc', exchange_type='fanout')
def run(self):
- result = self.channel.queue_declare(exclusive=True)
+ result = self.channel.queue_declare(exclusive=True, queue=self.queue)
plugin_queue = result.method.queue
self.channel.queue_bind(exchange='plugin_exc', queue=plugin_queue)
- self.channel.basic_consume(self.callback_plugins, queue='plugin_registry')
+ self.channel.basic_consume(queue='plugin_registry', on_message_callback=self.callback_plugins)
super(Plugins, self).run()
def callback_plugins(self, ch, method, properties, body):
setup(
name="distbot",
- version=0.1,
+ version="0.1",
description="the distributed bot",
- install_requires=["pika==0.12.0", 'configobj', 'requests'],
+ install_requires=["pika", 'configobj', 'requests'],
test_requires=["pytest"],
extras_require={
'test': ["pytest"],