From: Thorsten Date: Sat, 24 Jun 2023 14:59:51 +0000 (+0200) Subject: fix threading bug X-Git-Url: https://git.aero2k.de/?a=commitdiff_plain;h=75c3da3a276ee5b6cbfbf3df412fa32d0d83e2aa;p=urlbot-v3.git fix threading bug --- diff --git a/deploy/roles/urlbot/tasks/main.yml b/deploy/roles/urlbot/tasks/main.yml index b7d291d..ec98180 100644 --- a/deploy/roles/urlbot/tasks/main.yml +++ b/deploy/roles/urlbot/tasks/main.yml @@ -40,7 +40,7 @@ chdir: ~/urlbot-v3 - name: set configuration - lineinfile: dest=~/urlbot/local_config.ini create=yes line="{{item.key}} = {{item.value}}" regexp="^{{item.key}}.=" + lineinfile: dest=~/urlbot-v3/local_config.ini create=yes line="{{item.key}} = {{item.value}}" regexp="^{{item.key}}.=" with_items: - key: "jid" value: "{{jid}}" diff --git a/distbot/bot/action_worker.py b/distbot/bot/action_worker.py index 8ea11d1..e93be1f 100644 --- a/distbot/bot/action_worker.py +++ b/distbot/bot/action_worker.py @@ -5,6 +5,7 @@ import logging import sched import time +import pika from pika.channel import Channel from distbot.common.action import Action, send_action @@ -70,7 +71,7 @@ class ActionWorker(object): logger.info("Setup consumer for actions on ch %s on con %s with tag %s", self._channel, self._connection, self._consumer_tag) - def on_message(self, ch, method, properties, body): + def on_message(self, ch, method, properties: pika.spec.BasicProperties, body): logger.info('Received message # %s from %s: %s', method.delivery_tag, properties.app_id, body) body = json.loads(body.decode("utf-8")) diff --git a/distbot/bot/worker.py b/distbot/bot/worker.py index 92fcc77..4ce5053 100644 --- a/distbot/bot/worker.py +++ b/distbot/bot/worker.py @@ -3,12 +3,12 @@ import json import logging import threading from collections import deque, defaultdict +from functools import partial +from typing import Optional import pika import pika.exceptions -from functools import partial - from distbot.common.action import Action, send_action from distbot.common.config import conf_get @@ -30,7 +30,7 @@ class Worker(threading.Thread): CATCH_ALL = ["#"] def __init__(self, actionqueue, queue="work"): - super().__init__() + super().__init__(name=self.get_subclass_name()) self.queue = queue self.actionqueue = actionqueue if self.uses_history: @@ -49,14 +49,12 @@ class Worker(threading.Thread): else: self.usage = "(reaction only)" self.used_channel = None + self.connection: Optional[pika.BlockingConnection] = None + self.channel = None - try: - self.register_plugin() - except: - logger.exception("Oops. Registration failed") - - connection = pika.BlockingConnection(pika.URLParameters(conf_get("amqp_uri"))) - self.channel = connection.channel() + def init_channel(self): + self.connection = pika.BlockingConnection(pika.URLParameters(conf_get("amqp_uri"))) + self.channel = self.connection.channel() self.channel.exchange_declare(exchange='classifier', exchange_type='topic') def callback(self, ch, method, properties, body): @@ -97,13 +95,17 @@ class Worker(threading.Thread): return self.__class__.__name__ def die(self): - logger.info("Bye from %s", self.get_subclass_name()) - self.channel.stop_consuming() + self.connection.add_callback_threadsafe(callback=self.channel.stop_consuming) def run(self): + try: + self.register_plugin() + except: + logger.exception("Oops. Registration failed") + self.init_channel() + result = self.channel.queue_declare(exclusive=True, queue="") self.queue = result.method.queue - for binding_key in self.binding_keys: logger.info("Registering plugin %s for %s", self.get_subclass_name(), binding_key) self.channel.queue_bind( diff --git a/distbot/minijobber/run.py b/distbot/minijobber/run.py index f3ecccb..86ef4ab 100644 --- a/distbot/minijobber/run.py +++ b/distbot/minijobber/run.py @@ -4,16 +4,17 @@ import signal from time import sleep import pika -from distbot.common.config import conf_get +from distbot.bot.worker import Worker from distbot.bot import worker as worker_mod - +from distbot.common.config import conf_get from distbot.plugins import ( basic, fun, lookup, url, feeds, muc, translation, searx, queue_management, plugin_help, morse, meta, extended, bugtracker, bots, bofh, didyouknow, - debug, youtube + youtube ) + logger = logging.getLogger(__name__) WORKER_QUEUE = "work" @@ -40,7 +41,7 @@ PLUGIN_MODULES = { youtube: youtube.ALL, # debug: debug.ALL } -job_workers = [] +job_workers: list[Worker] = [] def initialize_workers(): @@ -100,11 +101,10 @@ def run(): signal.signal(signal.SIGALRM, dump_status) initialize_workers() try: - while True: - sleep(100) + while job_workers: + sleep(1) except KeyboardInterrupt: - import os - os.kill(os.getpid(), signal.SIGTERM) + stop_workers(None, None) logger.info("Exiting...") exit(0) diff --git a/logging.ini b/logging.ini index fe43f1a..e5d1a0e 100644 --- a/logging.ini +++ b/logging.ini @@ -28,4 +28,5 @@ formatter=formatter args=(sys.stderr,) [formatter_formatter] -format=%(levelname).1s %(name)s:%(funcName)-15s %(message)s +format=%(threadName)s %(levelname).1s %(name)s:%(funcName)-15s %(message)s +