]> git.aero2k.de Git - urlbot-v3.git/commitdiff
remove all threads from xmpp part
authorThorsten <mail@aero2k.de>
Sat, 27 Jun 2020 17:08:53 +0000 (19:08 +0200)
committerThorsten <mail@aero2k.de>
Sat, 27 Jun 2020 17:08:53 +0000 (19:08 +0200)
slix is based on asyncio, and asyncio isn't friends with threads.

distbot/bot/action_worker.py
distbot/bot/bot.py

index ade42838def65cea8bac4967bd48b092e311f42b..0d96371c5c61e0209445019f9f8274866d41be70 100644 (file)
@@ -1,11 +1,12 @@
 # -*- coding: utf-8 -*-
 import json
 import logging
-import sched
 import threading
 import time
+from typing import Optional
 
-import pika
+from pika.adapters import AsyncioConnection
+from pika.channel import Channel
 
 from distbot.common.action import Action, send_action
 from distbot.common.config import conf_set, conf_get
@@ -29,22 +30,57 @@ class EventLoop(threading.Thread):
             time.sleep(EVENTLOOP_DELAY)
 
 
-class ActionThread(threading.Thread):
-    event_list = sched.scheduler(time.time, time.sleep)
+class ActionWorker(object):
+    exchange = ''
 
-    def __init__(self, bot):
-        super().__init__()
-        self.event_thread = EventLoop(self.event_list)
-        self.bot = bot
-        # TODO: same in worker, check if that is waste
-        self.busy = False
+    # event_list = sched.scheduler(time.time, time.sleep)
+
+    @staticmethod
+    def parse_body(body):
+        action = Action.deserialize(body)
+        return action
 
-        connection = pika.BlockingConnection(pika.URLParameters(conf_get("amqp_uri")))
-        self.channel = connection.channel()
+    def __init__(self, bot, queue_name):
+        self._bot = bot
+        self._connection: Optional[AsyncioConnection] = None
+        self._channel: Optional[Channel] = None
+        self._queue_name = queue_name
+        self._consumer_tag = None
+
+    def open_connection(self, connection):
+        self._connection = connection
+        connection.channel(on_open_callback=self.open_channel)
+
+    def open_channel(self, channel: Channel):
+        # self.add_on_channel_close_callback()
+        # self.setup_exchange(self.EXCHANGE)
+        self._channel = channel
+        channel.add_on_close_callback(lambda *_: self._bot.disconnect())
+        channel.queue_declare(queue=self._queue_name, durable=True, callback=lambda _: self.start_consuming())
+        channel.add_on_cancel_callback(lambda _: channel.close())
+
+    def start_consuming(self):
+        """This method sets up the consumer by first calling
+        add_on_cancel_callback so that the object is notified if RabbitMQ
+        cancels the consumer. It then issues the Basic.Consume RPC command
+        which returns the consumer tag that is used to uniquely identify the
+        consumer with RabbitMQ. We keep the value to use it when we want to
+        cancel consuming. The on_message method is passed in as a callback pika
+        will invoke when a message is fully received.
 
-    def callback(self, ch, method, properties, body):
+        """
+        logging.info('Issuing consumer related RPC commands')
+        # self.add_on_cancel_callback()
+        self._consumer_tag = self._channel.basic_consume(
+            self.on_message, self._queue_name
+        )
+        logging.info(
+            f"Setup consumer for actions on ch {self._channel} on con {self._connection} with tag {self._consumer_tag}")
+
+    def on_message(self, ch, method, properties, body):
+        logging.info('Received message # %s from %s: %s',
+                     method.delivery_tag, properties.app_id, body)
         body = json.loads(body.decode("utf-8"))
-        self.busy = True
 
         action = self.parse_body(body)
         try:
@@ -52,29 +88,12 @@ class ActionThread(threading.Thread):
         except:
             logger.exception("Could not run action: %s", body)
 
-        self.busy = False
         logger.debug("Done")
         ch.basic_ack(delivery_tag=method.delivery_tag)
 
-    @staticmethod
-    def parse_body(body):
-        action = Action.deserialize(body)
-        return action
-
-    def run(self):
-        logger.debug("Processing actions in queue %s", self.bot.actionqueue)
-        self.event_thread.start()
-
-        self.channel.queue_declare(queue=self.bot.actionqueue, durable=True)
-        self.channel.basic_consume(self.callback, queue=self.bot.actionqueue)
-        self.channel.start_consuming()
-
-    def die(self):
-        logger.debug("Quitting...")
-        self.channel.stop_consuming()
-        logger.debug("action_worker gone")
-
     def find_scheduled_action_by_mutex(self, mutex):
+        # TODO...
+        return
         action_item = None
         for item in self.event_list.queue:
             if item.kwargs["action"].mutex == mutex:
@@ -86,6 +105,9 @@ class ActionThread(threading.Thread):
         """
         :type event: Action
         """
+        # TODO: outsource delayed events
+        raise RuntimeError("Die Bahnhofshalle ist derzeit wegen Wartungsarbeiten geschlossen.")
+
         # TODO: mutex handling
         logger.info("scheduling event: %s", event.serialize())
         if event.mutex and self.find_scheduled_action_by_mutex(event.mutex):
@@ -94,7 +116,7 @@ class ActionThread(threading.Thread):
 
         self.event_list.enterabs(
             event.time, 0, send_action,
-            kwargs={'actionqueue': self.bot.actionqueue, 'action': event}
+            kwargs={'actionqueue': self._queue_name, 'action': event}
         )
 
     def unschedule_action(self, event):
@@ -102,6 +124,9 @@ class ActionThread(threading.Thread):
         Remove a scheduled action
         :type event: Action
         """
+        # TODO...
+        return
+
         item = self.find_scheduled_action_by_mutex(event.mutex)
         if item:
             self.event_list.cancel(item)
@@ -135,11 +160,11 @@ class ActionThread(threading.Thread):
 
         if action.msg:  # and rate_limit(RATE_CHAT | plugin.ratelimit_class):
             time.sleep(delay)
-            self.bot.echo(action.msg, action.recipient)
+            self._bot.echo(action.msg, action.recipient)
 
         # TODO test that
         if action.priv_msg:  # and rate_limit(RATE_CHAT | plugin.ratelimit_class):
-            self.bot.echo(action['priv_msg'], action.sender)
+            self._bot.echo(action.priv_msg, action.sender)
 
         if action.presence:
             conf_set('presence', action.presence)
@@ -153,3 +178,6 @@ class ActionThread(threading.Thread):
 
         request_counter = int(conf_get('request_counter'))
         conf_set('request_counter', request_counter + 1)
+
+    def die(self):
+        self._connection.close()
index cf1af7042ef5e223ea291c6456b2d8849c9c0d0c..8d1c42eb9cc2994db311378cf8bdc6454ed7a34d 100644 (file)
@@ -5,8 +5,9 @@ import re
 import shlex
 
 import pika
-import slixmpp
+from pika.adapters import AsyncioConnection
 
+import slixmpp
 from distbot.bot import action_worker
 from distbot.common.config import conf_get
 from distbot.common.message import process_message, get_nick_from_message
@@ -23,14 +24,11 @@ class Bot(slixmpp.ClientXMPP):
         super(Bot, self).__init__(jid, password)
 
         # TODO read from config
-        self.actionqueue = 'action_processing'
-        self.actionthread = None
+        self.action_queue = 'action_processing'
+        self.action_worker = action_worker.ActionWorker(bot=self, queue_name=self.action_queue)
 
         self.rooms = rooms
         self.nick = nick
-
-        # from slixmpp.plugins.xep_0045 import XEP_0045
-        # self.add_event_handler('handle_groupchat_presence', XEP_0045.handle_groupchat_presence)
         self.add_event_handler('session_start', self.session_start)
         self.add_event_handler('groupchat_message', self.muc_message)
         self.add_event_handler('message', self.message_handler)
@@ -38,8 +36,6 @@ class Bot(slixmpp.ClientXMPP):
             self.add_event_handler('muc::%s::got_online' % room, self.muc_online)
 
         self._initialize_plugins()
-        # import ssl
-        # self.ssl_version = ssl.PROTOCOL_TLSv1_2
 
     def _initialize_plugins(self):
         self.register_plugin('xep_0045')
@@ -47,21 +43,15 @@ class Bot(slixmpp.ClientXMPP):
         self.register_plugin('xep_0308')
 
     def kill_workers(self):
-        if self.actionthread:
-            self.actionthread.die()
-            self.actionthread.join()
+        if self.action_worker:
+            self.action_worker.die()
 
-    def initialize_actionthreads(self):
-        connection = pika.BlockingConnection(
-            pika.URLParameters(conf_get("amqp_uri"))
+    def initialize_actionworker(self):
+        connection = AsyncioConnection(
+            pika.URLParameters(conf_get("amqp_uri")),
+            on_open_callback=self.action_worker.open_connection
         )
-        channel = connection.channel()
-        channel.queue_declare(queue=self.actionqueue, durable=True)
-
-        self.actionthread = action_worker.ActionThread(bot=self)
-        self.actionthread.start()
 
-    # TODO: doesn't stop
     def disconnect(self, reconnect=False, wait=None, send_close=True):
         logger.info("Stopping all workers...")
         self.kill_workers()
@@ -77,7 +67,7 @@ class Bot(slixmpp.ClientXMPP):
             logger.info('%s: joining' % room)
             ret = self.plugin['xep_0045'].join_muc(room, self.nick, wait=True)
             logger.info('%s: joined with code %s' % (room, ret))
-        self.initialize_actionthreads()
+        self.initialize_actionworker()
 
     def muc_message(self, msg):
         if msg['mucnick'] == self.nick or 'groupchat' != msg['type']:
@@ -140,11 +130,11 @@ class Bot(slixmpp.ClientXMPP):
             if "hangup" in message:
                 self.disconnect()
 
+            # TODO: probably quite useless
             if "reset-worker-one" in message:
                 self.echo("But I was your fav..ARGH")
-                self.actionthread.die()
-                self.actionthread.join()
-                self.initialize_actionthreads()
+                self.action_worker.die()
+                self.initialize_actionworker()
                 self.echo("Worker One available for master.")
 
             nick_offset, routing_key = self.get_amqp_routing_key(self.nick, msg)