]> git.aero2k.de Git - urlbot-v3.git/commitdiff
fix threading bug
authorThorsten <mail@aero2k.de>
Sat, 24 Jun 2023 14:59:51 +0000 (16:59 +0200)
committerThorsten <mail@aero2k.de>
Sat, 24 Jun 2023 15:02:09 +0000 (17:02 +0200)
deploy/roles/urlbot/tasks/main.yml
distbot/bot/action_worker.py
distbot/bot/worker.py
distbot/minijobber/run.py
logging.ini

index b7d291d0d9689d2704d1f41cf80ae3af0d6ee235..ec981802026285ee05fcd49eceaf236c8eb9c0fd 100644 (file)
@@ -40,7 +40,7 @@
     chdir: ~/urlbot-v3
 
 - name: set configuration
-  lineinfile: dest=~/urlbot/local_config.ini create=yes line="{{item.key}} = {{item.value}}" regexp="^{{item.key}}.="
+  lineinfile: dest=~/urlbot-v3/local_config.ini create=yes line="{{item.key}} = {{item.value}}" regexp="^{{item.key}}.="
   with_items:
     - key: "jid"
       value: "{{jid}}"
index 8ea11d197c9bef3a460fb39df8228962dc4b0944..e93be1fc7a9d65a932b97bf858b2164ff0cbf6ab 100644 (file)
@@ -5,6 +5,7 @@ import logging
 import sched
 import time
 
+import pika
 from pika.channel import Channel
 
 from distbot.common.action import Action, send_action
@@ -70,7 +71,7 @@ class ActionWorker(object):
         logger.info("Setup consumer for actions on ch %s on con %s with tag %s", self._channel, self._connection,
                      self._consumer_tag)
 
-    def on_message(self, ch, method, properties, body):
+    def on_message(self, ch, method, properties: pika.spec.BasicProperties, body):
         logger.info('Received message # %s from %s: %s',
                      method.delivery_tag, properties.app_id, body)
         body = json.loads(body.decode("utf-8"))
index 92fcc77a23c7e6f474cbe12a39970afbf6812764..4ce5053f4c2115d9f4cb7b5ac8401716555fe776 100644 (file)
@@ -3,12 +3,12 @@ import json
 import logging
 import threading
 from collections import deque, defaultdict
+from functools import partial
+from typing import Optional
 
 import pika
 import pika.exceptions
 
-from functools import partial
-
 from distbot.common.action import Action, send_action
 from distbot.common.config import conf_get
 
@@ -30,7 +30,7 @@ class Worker(threading.Thread):
     CATCH_ALL = ["#"]
 
     def __init__(self, actionqueue, queue="work"):
-        super().__init__()
+        super().__init__(name=self.get_subclass_name())
         self.queue = queue
         self.actionqueue = actionqueue
         if self.uses_history:
@@ -49,14 +49,12 @@ class Worker(threading.Thread):
             else:
                 self.usage = "(reaction only)"
         self.used_channel = None
+        self.connection: Optional[pika.BlockingConnection] = None
+        self.channel = None
 
-        try:
-            self.register_plugin()
-        except:
-            logger.exception("Oops. Registration failed")
-
-        connection = pika.BlockingConnection(pika.URLParameters(conf_get("amqp_uri")))
-        self.channel = connection.channel()
+    def init_channel(self):
+        self.connection = pika.BlockingConnection(pika.URLParameters(conf_get("amqp_uri")))
+        self.channel = self.connection.channel()
         self.channel.exchange_declare(exchange='classifier', exchange_type='topic')
 
     def callback(self, ch, method, properties, body):
@@ -97,13 +95,17 @@ class Worker(threading.Thread):
         return self.__class__.__name__
 
     def die(self):
-        logger.info("Bye from %s", self.get_subclass_name())
-        self.channel.stop_consuming()
+        self.connection.add_callback_threadsafe(callback=self.channel.stop_consuming)
 
     def run(self):
+        try:
+            self.register_plugin()
+        except:
+            logger.exception("Oops. Registration failed")
+        self.init_channel()
+
         result = self.channel.queue_declare(exclusive=True, queue="")
         self.queue = result.method.queue
-
         for binding_key in self.binding_keys:
             logger.info("Registering plugin %s for %s", self.get_subclass_name(), binding_key)
             self.channel.queue_bind(
index f3ecccb3e79bbbe4e051ff7d96618dd49e28f9a8..86ef4ab6d93ef2712c5e6abfd3bb240fbbacac25 100644 (file)
@@ -4,16 +4,17 @@ import signal
 from time import sleep
 
 import pika
-from distbot.common.config import conf_get
 
+from distbot.bot.worker import Worker
 from distbot.bot import worker as worker_mod
-
+from distbot.common.config import conf_get
 from distbot.plugins import (
     basic, fun, lookup, url, feeds, muc, translation, searx, queue_management, plugin_help,
     morse, meta,
     extended, bugtracker, bots, bofh, didyouknow,
-    debug, youtube
+    youtube
 )
+
 logger = logging.getLogger(__name__)
 
 WORKER_QUEUE = "work"
@@ -40,7 +41,7 @@ PLUGIN_MODULES = {
     youtube: youtube.ALL,
     # debug: debug.ALL
 }
-job_workers = []
+job_workers: list[Worker] = []
 
 
 def initialize_workers():
@@ -100,11 +101,10 @@ def run():
     signal.signal(signal.SIGALRM, dump_status)
     initialize_workers()
     try:
-        while True:
-            sleep(100)
+        while job_workers:
+            sleep(1)
     except KeyboardInterrupt:
-        import os
-        os.kill(os.getpid(), signal.SIGTERM)
+        stop_workers(None, None)
         logger.info("Exiting...")
         exit(0)
 
index fe43f1a62592561a89d6cf972656f805f6e29769..e5d1a0e76770d3fabe8366d251130d3ae40d864b 100644 (file)
@@ -28,4 +28,5 @@ formatter=formatter
 args=(sys.stderr,)
 
 [formatter_formatter]
-format=%(levelname).1s %(name)s:%(funcName)-15s %(message)s
+format=%(threadName)s %(levelname).1s %(name)s:%(funcName)-15s %(message)s
+