From 707b30f0698e7ef763d98b50cc830cba1fcc7e3b Mon Sep 17 00:00:00 2001 From: Thorsten Date: Sat, 27 Jun 2020 19:08:53 +0200 Subject: [PATCH] remove all threads from xmpp part slix is based on asyncio, and asyncio isn't friends with threads. --- distbot/bot/action_worker.py | 100 ++++++++++++++++++++++------------- distbot/bot/bot.py | 38 +++++-------- 2 files changed, 78 insertions(+), 60 deletions(-) diff --git a/distbot/bot/action_worker.py b/distbot/bot/action_worker.py index ade4283..0d96371 100644 --- a/distbot/bot/action_worker.py +++ b/distbot/bot/action_worker.py @@ -1,11 +1,12 @@ # -*- coding: utf-8 -*- import json import logging -import sched import threading import time +from typing import Optional -import pika +from pika.adapters import AsyncioConnection +from pika.channel import Channel from distbot.common.action import Action, send_action from distbot.common.config import conf_set, conf_get @@ -29,22 +30,57 @@ class EventLoop(threading.Thread): time.sleep(EVENTLOOP_DELAY) -class ActionThread(threading.Thread): - event_list = sched.scheduler(time.time, time.sleep) +class ActionWorker(object): + exchange = '' - def __init__(self, bot): - super().__init__() - self.event_thread = EventLoop(self.event_list) - self.bot = bot - # TODO: same in worker, check if that is waste - self.busy = False + # event_list = sched.scheduler(time.time, time.sleep) + + @staticmethod + def parse_body(body): + action = Action.deserialize(body) + return action - connection = pika.BlockingConnection(pika.URLParameters(conf_get("amqp_uri"))) - self.channel = connection.channel() + def __init__(self, bot, queue_name): + self._bot = bot + self._connection: Optional[AsyncioConnection] = None + self._channel: Optional[Channel] = None + self._queue_name = queue_name + self._consumer_tag = None + + def open_connection(self, connection): + self._connection = connection + connection.channel(on_open_callback=self.open_channel) + + def open_channel(self, channel: Channel): + # self.add_on_channel_close_callback() + # self.setup_exchange(self.EXCHANGE) + self._channel = channel + channel.add_on_close_callback(lambda *_: self._bot.disconnect()) + channel.queue_declare(queue=self._queue_name, durable=True, callback=lambda _: self.start_consuming()) + channel.add_on_cancel_callback(lambda _: channel.close()) + + def start_consuming(self): + """This method sets up the consumer by first calling + add_on_cancel_callback so that the object is notified if RabbitMQ + cancels the consumer. It then issues the Basic.Consume RPC command + which returns the consumer tag that is used to uniquely identify the + consumer with RabbitMQ. We keep the value to use it when we want to + cancel consuming. The on_message method is passed in as a callback pika + will invoke when a message is fully received. - def callback(self, ch, method, properties, body): + """ + logging.info('Issuing consumer related RPC commands') + # self.add_on_cancel_callback() + self._consumer_tag = self._channel.basic_consume( + self.on_message, self._queue_name + ) + logging.info( + f"Setup consumer for actions on ch {self._channel} on con {self._connection} with tag {self._consumer_tag}") + + def on_message(self, ch, method, properties, body): + logging.info('Received message # %s from %s: %s', + method.delivery_tag, properties.app_id, body) body = json.loads(body.decode("utf-8")) - self.busy = True action = self.parse_body(body) try: @@ -52,29 +88,12 @@ class ActionThread(threading.Thread): except: logger.exception("Could not run action: %s", body) - self.busy = False logger.debug("Done") ch.basic_ack(delivery_tag=method.delivery_tag) - @staticmethod - def parse_body(body): - action = Action.deserialize(body) - return action - - def run(self): - logger.debug("Processing actions in queue %s", self.bot.actionqueue) - self.event_thread.start() - - self.channel.queue_declare(queue=self.bot.actionqueue, durable=True) - self.channel.basic_consume(self.callback, queue=self.bot.actionqueue) - self.channel.start_consuming() - - def die(self): - logger.debug("Quitting...") - self.channel.stop_consuming() - logger.debug("action_worker gone") - def find_scheduled_action_by_mutex(self, mutex): + # TODO... + return action_item = None for item in self.event_list.queue: if item.kwargs["action"].mutex == mutex: @@ -86,6 +105,9 @@ class ActionThread(threading.Thread): """ :type event: Action """ + # TODO: outsource delayed events + raise RuntimeError("Die Bahnhofshalle ist derzeit wegen Wartungsarbeiten geschlossen.") + # TODO: mutex handling logger.info("scheduling event: %s", event.serialize()) if event.mutex and self.find_scheduled_action_by_mutex(event.mutex): @@ -94,7 +116,7 @@ class ActionThread(threading.Thread): self.event_list.enterabs( event.time, 0, send_action, - kwargs={'actionqueue': self.bot.actionqueue, 'action': event} + kwargs={'actionqueue': self._queue_name, 'action': event} ) def unschedule_action(self, event): @@ -102,6 +124,9 @@ class ActionThread(threading.Thread): Remove a scheduled action :type event: Action """ + # TODO... + return + item = self.find_scheduled_action_by_mutex(event.mutex) if item: self.event_list.cancel(item) @@ -135,11 +160,11 @@ class ActionThread(threading.Thread): if action.msg: # and rate_limit(RATE_CHAT | plugin.ratelimit_class): time.sleep(delay) - self.bot.echo(action.msg, action.recipient) + self._bot.echo(action.msg, action.recipient) # TODO test that if action.priv_msg: # and rate_limit(RATE_CHAT | plugin.ratelimit_class): - self.bot.echo(action['priv_msg'], action.sender) + self._bot.echo(action.priv_msg, action.sender) if action.presence: conf_set('presence', action.presence) @@ -153,3 +178,6 @@ class ActionThread(threading.Thread): request_counter = int(conf_get('request_counter')) conf_set('request_counter', request_counter + 1) + + def die(self): + self._connection.close() diff --git a/distbot/bot/bot.py b/distbot/bot/bot.py index cf1af70..8d1c42e 100644 --- a/distbot/bot/bot.py +++ b/distbot/bot/bot.py @@ -5,8 +5,9 @@ import re import shlex import pika -import slixmpp +from pika.adapters import AsyncioConnection +import slixmpp from distbot.bot import action_worker from distbot.common.config import conf_get from distbot.common.message import process_message, get_nick_from_message @@ -23,14 +24,11 @@ class Bot(slixmpp.ClientXMPP): super(Bot, self).__init__(jid, password) # TODO read from config - self.actionqueue = 'action_processing' - self.actionthread = None + self.action_queue = 'action_processing' + self.action_worker = action_worker.ActionWorker(bot=self, queue_name=self.action_queue) self.rooms = rooms self.nick = nick - - # from slixmpp.plugins.xep_0045 import XEP_0045 - # self.add_event_handler('handle_groupchat_presence', XEP_0045.handle_groupchat_presence) self.add_event_handler('session_start', self.session_start) self.add_event_handler('groupchat_message', self.muc_message) self.add_event_handler('message', self.message_handler) @@ -38,8 +36,6 @@ class Bot(slixmpp.ClientXMPP): self.add_event_handler('muc::%s::got_online' % room, self.muc_online) self._initialize_plugins() - # import ssl - # self.ssl_version = ssl.PROTOCOL_TLSv1_2 def _initialize_plugins(self): self.register_plugin('xep_0045') @@ -47,21 +43,15 @@ class Bot(slixmpp.ClientXMPP): self.register_plugin('xep_0308') def kill_workers(self): - if self.actionthread: - self.actionthread.die() - self.actionthread.join() + if self.action_worker: + self.action_worker.die() - def initialize_actionthreads(self): - connection = pika.BlockingConnection( - pika.URLParameters(conf_get("amqp_uri")) + def initialize_actionworker(self): + connection = AsyncioConnection( + pika.URLParameters(conf_get("amqp_uri")), + on_open_callback=self.action_worker.open_connection ) - channel = connection.channel() - channel.queue_declare(queue=self.actionqueue, durable=True) - - self.actionthread = action_worker.ActionThread(bot=self) - self.actionthread.start() - # TODO: doesn't stop def disconnect(self, reconnect=False, wait=None, send_close=True): logger.info("Stopping all workers...") self.kill_workers() @@ -77,7 +67,7 @@ class Bot(slixmpp.ClientXMPP): logger.info('%s: joining' % room) ret = self.plugin['xep_0045'].join_muc(room, self.nick, wait=True) logger.info('%s: joined with code %s' % (room, ret)) - self.initialize_actionthreads() + self.initialize_actionworker() def muc_message(self, msg): if msg['mucnick'] == self.nick or 'groupchat' != msg['type']: @@ -140,11 +130,11 @@ class Bot(slixmpp.ClientXMPP): if "hangup" in message: self.disconnect() + # TODO: probably quite useless if "reset-worker-one" in message: self.echo("But I was your fav..ARGH") - self.actionthread.die() - self.actionthread.join() - self.initialize_actionthreads() + self.action_worker.die() + self.initialize_actionworker() self.echo("Worker One available for master.") nick_offset, routing_key = self.get_amqp_routing_key(self.nick, msg) -- 2.39.2