From: Thorsten Date: Sat, 23 Mar 2019 09:52:26 +0000 (+0100) Subject: use authenticated amqp connection X-Git-Url: https://git.aero2k.de/?a=commitdiff_plain;h=6771c2a2f95f9b8addfac0b5d5d934c8ea1ac452;p=urlbot-v3.git use authenticated amqp connection --- diff --git a/distbot/bot/action_worker.py b/distbot/bot/action_worker.py index 8740825..cff47e2 100644 --- a/distbot/bot/action_worker.py +++ b/distbot/bot/action_worker.py @@ -39,7 +39,7 @@ class ActionThread(threading.Thread): # TODO: same in worker, check if that is waste self.busy = False - connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) + connection = pika.BlockingConnection(pika.URLParameters(conf_get("amqp_uri"))) self.channel = connection.channel() def callback(self, ch, method, properties, body): diff --git a/distbot/bot/bot.py b/distbot/bot/bot.py index 46e577a..fe3b112 100644 --- a/distbot/bot/bot.py +++ b/distbot/bot/bot.py @@ -6,48 +6,23 @@ import shlex import pika import sleekxmpp -from distbot.bot import action_worker, worker as worker_mod +from distbot.bot import action_worker from distbot.common.config import conf_get from distbot.common.message import process_message, get_nick_from_message -from distbot.plugins import basic, fun, lookup, url, feeds, muc, translation, searx, queue_management, plugin_help, \ - morse, meta, \ - extended, bugtracker, bots, bofh logger = logging.getLogger(__name__) logging.getLogger("pika").setLevel(logging.WARN) logging.getLogger('sleekxmpp').setLevel(logging.INFO) -WORKER_QUEUE = "work" - -PLUGIN_MODULES = { - basic: basic.ALL, - bofh: bofh.ALL, - bots: bots.ALL, - bugtracker: bugtracker.ALL, - extended: extended.ALL, - feeds: feeds.ALL, - fun: fun.ALL, - lookup: lookup.ALL, - meta: meta.ALL, - morse: morse.ALL, - muc: muc.ALL, - plugin_help: plugin_help.ALL, - queue_management: queue_management.ALL, - searx: searx.ALL, - translation: translation.ALL, - url: url.ALL, -} - class Bot(sleekxmpp.ClientXMPP): def __init__(self, jid, password, rooms, nick): super(Bot, self).__init__(jid, password) + # TODO read from config self.actionqueue = 'action_processing' self.actionthread = None - self.job_workers = [] - self.workers = [] self.rooms = rooms self.nick = nick @@ -68,57 +43,20 @@ class Bot(sleekxmpp.ClientXMPP): self.register_plugin('xep_0308') def kill_workers(self): - [t.die() for t in self.workers + self.job_workers] - [t.join() for t in self.workers + self.job_workers] if self.actionthread: self.actionthread.die() self.actionthread.join() - def initialize_workers(self): + def initialize_actionthreads(self): connection = pika.BlockingConnection( - pika.ConnectionParameters('localhost') + pika.URLParameters(conf_get("amqp_uri")) ) channel = connection.channel() - channel.exchange_declare(exchange='topic_command', exchange_type='topic') - channel.exchange_declare(exchange='topic_parse', exchange_type='topic') - channel.queue_declare(queue=WORKER_QUEUE, durable=True) channel.queue_declare(queue=self.actionqueue, durable=True) - for classes in PLUGIN_MODULES.values(): - for cls in classes: - try: - worker = cls(actionqueue=self.actionqueue) - self.job_workers.append(worker) - worker.start() - except Exception as e: - # Whoopsie. Thatz a broken thing. - raise SystemExit() - - def initialize_actionthreads(self): - self.actionthread = action_worker.ActionThread(bot=self) self.actionthread.start() - # TODO: doesnt work... - def reset_all_workerthreads(self): - from importlib import reload - - [t.die() for t in self.job_workers] - [t.join() for t in self.job_workers] - - self.actionthread.die() - self.actionthread.join() - - reload(worker_mod) - reload(action_worker) - - for module in PLUGIN_MODULES.keys(): - reload(module) - - self.initialize_workers() - self.initialize_actionthreads() - self.echo("code blue") - # TODO: doesn't stop def disconnect(self, reconnect=False, wait=None, send_close=True): logger.info("Stopping all workers...") @@ -139,7 +77,6 @@ class Bot(sleekxmpp.ClientXMPP): wait=True ) logger.info('%s: joined with code %s' % (room, ret)) - self.initialize_workers() self.initialize_actionthreads() def muc_message(self, msg): @@ -178,13 +115,6 @@ class Bot(sleekxmpp.ClientXMPP): if "hangup" in message: self.disconnect() - if "reset-workers" in message: - self.echo("No, please... !") - [t.die() for t in self.job_workers] - [t.join() for t in self.job_workers] - self.initialize_workers() - self.echo("argh...") - if "reset-worker-one" in message: self.echo("But I was your fav..ARGH") self.actionthread.die() diff --git a/distbot/bot/worker.py b/distbot/bot/worker.py index cc42b71..04aa8ae 100644 --- a/distbot/bot/worker.py +++ b/distbot/bot/worker.py @@ -53,7 +53,7 @@ class Worker(threading.Thread): except: logger.exception("Oops. Registration failed") - connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) + connection = pika.BlockingConnection(pika.URLParameters(conf_get("amqp_uri"))) self.channel = connection.channel() self.channel.exchange_declare(exchange='classifier', exchange_type='topic') @@ -113,7 +113,7 @@ class Worker(threading.Thread): def register_plugin(self): - connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) + connection = pika.BlockingConnection(pika.URLParameters(conf_get("amqp_uri"))) channel = connection.channel() channel.queue_declare(queue='plugin_registry') channel.basic_publish( diff --git a/distbot/common/action.py b/distbot/common/action.py index 13dfd1b..7ea70fd 100644 --- a/distbot/common/action.py +++ b/distbot/common/action.py @@ -3,6 +3,7 @@ import json from copy import deepcopy import pika +from distbot.common.config import conf_get class Action: @@ -55,7 +56,7 @@ class Action: def send_action(actionqueue, action): connection = pika.BlockingConnection( - pika.ConnectionParameters('localhost') + pika.URLParameters(conf_get("amqp_uri")) ) channel = connection.channel() channel.queue_declare(queue=actionqueue, durable=True) diff --git a/distbot/common/config/local_config.ini.spec b/distbot/common/config/local_config.ini.spec index c912b62..93f649e 100644 --- a/distbot/common/config/local_config.ini.spec +++ b/distbot/common/config/local_config.ini.spec @@ -12,3 +12,5 @@ detectlanguage_api_key = string # rate limiting, TODO hist_max_count = integer(default=5) hist_max_time = integer(default=10*60) + +amqp_uri = string(default="amqp://guest:guest@localhost:5672/%2F") diff --git a/distbot/common/message.py b/distbot/common/message.py index 1366d3e..6b131ab 100644 --- a/distbot/common/message.py +++ b/distbot/common/message.py @@ -5,6 +5,7 @@ import logging import shlex import pika +from distbot.common.config import conf_get from sleekxmpp import Message, Presence from sleekxmpp.jid import JID @@ -42,12 +43,12 @@ def get_nick_from_message(message_obj): def process_message(routing_key, body): - connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) + connection = pika.BlockingConnection(pika.URLParameters(conf_get("amqp_uri"))) channel = connection.channel() channel.exchange_declare(exchange='classifier', exchange_type='topic') connection = pika.BlockingConnection( - pika.ConnectionParameters('localhost') + pika.URLParameters(conf_get("amqp_uri")) ) channel = connection.channel() diff --git a/distbot/minijobber/__init__.py b/distbot/minijobber/__init__.py new file mode 100644 index 0000000..40a96af --- /dev/null +++ b/distbot/minijobber/__init__.py @@ -0,0 +1 @@ +# -*- coding: utf-8 -*- diff --git a/distbot/minijobber/run.py b/distbot/minijobber/run.py new file mode 100644 index 0000000..219ca8e --- /dev/null +++ b/distbot/minijobber/run.py @@ -0,0 +1,99 @@ +# -*- coding: utf-8 -*- +import signal +from time import sleep + +import pika +from distbot.common.config import conf_get + +from distbot.bot import worker as worker_mod + +from distbot.plugins import basic, fun, lookup, url, feeds, muc, translation, searx, queue_management, plugin_help, \ + morse, meta, \ + extended, bugtracker, bots, bofh + +WORKER_QUEUE = "work" +# TODO read from config +ACTION_QUEUE = "action_processing" +PLUGIN_MODULES = { + basic: basic.ALL, + bofh: bofh.ALL, + bots: bots.ALL, + bugtracker: bugtracker.ALL, + extended: extended.ALL, + feeds: feeds.ALL, + fun: fun.ALL, + lookup: lookup.ALL, + meta: meta.ALL, + morse: morse.ALL, + muc: muc.ALL, + plugin_help: plugin_help.ALL, + queue_management: queue_management.ALL, + searx: searx.ALL, + translation: translation.ALL, + url: url.ALL, +} +job_workers = [] + + +def initialize_workers(): + connection = pika.BlockingConnection( + pika.URLParameters(conf_get("amqp_uri")) + ) + channel = connection.channel() + channel.exchange_declare(exchange='topic_command', exchange_type='topic') + channel.exchange_declare(exchange='topic_parse', exchange_type='topic') + channel.queue_declare(queue=WORKER_QUEUE, durable=True) + channel.queue_declare(queue=ACTION_QUEUE, durable=True) + + for classes in PLUGIN_MODULES.values(): + for cls in classes: + try: + worker = cls(actionqueue=ACTION_QUEUE) + job_workers.append(worker) + worker.start() + except Exception as e: + import logging + logging.exception(e) + # Whoopsie. Thatz a broken thing. + raise SystemExit() + + +# TODO: doesnt work... +# https://lists.gt.net/python/python/139992 +def reset_workers(signum, frame): + from importlib import reload + stop_workers(signum, frame) + + reload(worker_mod) + for module in PLUGIN_MODULES.keys(): + reload(module) + + initialize_workers() + + +def stop_workers(signum, frame): + print("stopping workers") + [t.die() for t in job_workers] + print("joining workers") + [t.join() for t in job_workers] + print("stopped workers.") + job_workers.clear() + + +def dump_status(*_): + print("current threads in queue:") + for t in job_workers: + print(t) + + +def run(): + signal.signal(signal.SIGTERM, stop_workers) + # signal.signal(signal.SIGHUP, reset_workers) + signal.signal(signal.SIGALRM, dump_status) + initialize_workers() + while True: + sleep(100) + + +if __name__ == '__main__': + run()