From: Thorsten Date: Sat, 12 Nov 2022 14:34:03 +0000 (+0100) Subject: fixes for pika incompatible upgrade X-Git-Url: https://git.aero2k.de/?a=commitdiff_plain;h=dbff3f403a2f971587c1dd49aa90e6d741511b5a;p=urlbot-v3.git fixes for pika incompatible upgrade --- diff --git a/distbot/bot/action_worker.py b/distbot/bot/action_worker.py index a57d47c..8ea11d1 100644 --- a/distbot/bot/action_worker.py +++ b/distbot/bot/action_worker.py @@ -64,7 +64,8 @@ class ActionWorker(object): logger.info('Issuing consumer related RPC commands') # self.add_on_cancel_callback() self._consumer_tag = self._channel.basic_consume( - self.on_message, self._queue_name + queue=self._queue_name, + on_message_callback=self.on_message ) logger.info("Setup consumer for actions on ch %s on con %s with tag %s", self._channel, self._connection, self._consumer_tag) diff --git a/distbot/bot/bot.py b/distbot/bot/bot.py index ab31191..bfc7401 100644 --- a/distbot/bot/bot.py +++ b/distbot/bot/bot.py @@ -5,7 +5,7 @@ import re import shlex import pika -from pika.adapters import AsyncioConnection +from pika.adapters.asyncio_connection import AsyncioConnection import slixmpp from distbot.bot import action_worker diff --git a/distbot/bot/worker.py b/distbot/bot/worker.py index 7aad677..ccd1f41 100644 --- a/distbot/bot/worker.py +++ b/distbot/bot/worker.py @@ -101,7 +101,7 @@ class Worker(threading.Thread): self.channel.stop_consuming() def run(self): - result = self.channel.queue_declare(exclusive=True) + result = self.channel.queue_declare(exclusive=True, queue="") self.queue = result.method.queue for binding_key in self.binding_keys: @@ -112,7 +112,7 @@ class Worker(threading.Thread): routing_key=binding_key ) - self.channel.basic_consume(self.callback, queue=self.queue) + self.channel.basic_consume(queue=self.queue, on_message_callback=self.callback) self.channel.start_consuming() def parse_body(self, msg): @@ -126,7 +126,7 @@ class Worker(threading.Thread): channel.basic_publish( exchange='', routing_key='plugin_registry', - body=json.dumps(self.get_declaration()) + body=json.dumps(self.get_declaration()).encode("utf-8") ) def get_declaration(self): diff --git a/distbot/plugins/plugin_help.py b/distbot/plugins/plugin_help.py index ff0e26a..fd5aad1 100644 --- a/distbot/plugins/plugin_help.py +++ b/distbot/plugins/plugin_help.py @@ -28,15 +28,15 @@ class Plugins(Worker): plugin_storage = {} reverse_lookup = {} - def __init__(self, actionqueue, queue="work"): + def __init__(self, actionqueue, queue=""): super(Plugins, self).__init__(actionqueue, queue) self.channel.exchange_declare(exchange='plugin_exc', exchange_type='fanout') def run(self): - result = self.channel.queue_declare(exclusive=True) + result = self.channel.queue_declare(exclusive=True, queue=self.queue) plugin_queue = result.method.queue self.channel.queue_bind(exchange='plugin_exc', queue=plugin_queue) - self.channel.basic_consume(self.callback_plugins, queue='plugin_registry') + self.channel.basic_consume(queue='plugin_registry', on_message_callback=self.callback_plugins) super(Plugins, self).run() def callback_plugins(self, ch, method, properties, body): diff --git a/setup.py b/setup.py index ae96beb..d0cea7e 100644 --- a/setup.py +++ b/setup.py @@ -2,9 +2,9 @@ from setuptools import setup, find_packages setup( name="distbot", - version=0.1, + version="0.1", description="the distributed bot", - install_requires=["pika==0.12.0", 'configobj', 'requests'], + install_requires=["pika", 'configobj', 'requests'], test_requires=["pytest"], extras_require={ 'test': ["pytest"],