]> git.aero2k.de Git - urlbot-v3.git/commitdiff
use authenticated amqp connection
authorThorsten <mail@aero2k.de>
Sat, 23 Mar 2019 09:52:26 +0000 (10:52 +0100)
committerThorsten <mail@aero2k.de>
Sat, 23 Mar 2019 09:52:26 +0000 (10:52 +0100)
distbot/bot/action_worker.py
distbot/bot/bot.py
distbot/bot/worker.py
distbot/common/action.py
distbot/common/config/local_config.ini.spec
distbot/common/message.py
distbot/minijobber/__init__.py [new file with mode: 0644]
distbot/minijobber/run.py [new file with mode: 0644]

index 8740825ce025dd6ec469c733e995877fe499ae87..cff47e25ef302b30be9ff373fd5baa789e1bda73 100644 (file)
@@ -39,7 +39,7 @@ class ActionThread(threading.Thread):
         # TODO: same in worker, check if that is waste
         self.busy = False
 
-        connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
+        connection = pika.BlockingConnection(pika.URLParameters(conf_get("amqp_uri")))
         self.channel = connection.channel()
 
     def callback(self, ch, method, properties, body):
index 46e577ac68b821f11a2ce9947e21e6982ec94384..fe3b11297a149e3000dd94485ffe1a9eae12a527 100644 (file)
@@ -6,48 +6,23 @@ import shlex
 import pika
 import sleekxmpp
 
-from distbot.bot import action_worker, worker as worker_mod
+from distbot.bot import action_worker
 from distbot.common.config import conf_get
 from distbot.common.message import process_message, get_nick_from_message
-from distbot.plugins import basic, fun, lookup, url, feeds, muc, translation, searx, queue_management, plugin_help, \
-    morse, meta, \
-    extended, bugtracker, bots, bofh
 
 logger = logging.getLogger(__name__)
 
 logging.getLogger("pika").setLevel(logging.WARN)
 logging.getLogger('sleekxmpp').setLevel(logging.INFO)
 
-WORKER_QUEUE = "work"
-
-PLUGIN_MODULES = {
-    basic: basic.ALL,
-    bofh: bofh.ALL,
-    bots: bots.ALL,
-    bugtracker: bugtracker.ALL,
-    extended: extended.ALL,
-    feeds: feeds.ALL,
-    fun: fun.ALL,
-    lookup: lookup.ALL,
-    meta: meta.ALL,
-    morse: morse.ALL,
-    muc: muc.ALL,
-    plugin_help: plugin_help.ALL,
-    queue_management: queue_management.ALL,
-    searx: searx.ALL,
-    translation: translation.ALL,
-    url: url.ALL,
-}
-
 
 class Bot(sleekxmpp.ClientXMPP):
     def __init__(self, jid, password, rooms, nick):
         super(Bot, self).__init__(jid, password)
 
+        # TODO read from config
         self.actionqueue = 'action_processing'
         self.actionthread = None
-        self.job_workers = []
-        self.workers = []
 
         self.rooms = rooms
         self.nick = nick
@@ -68,57 +43,20 @@ class Bot(sleekxmpp.ClientXMPP):
         self.register_plugin('xep_0308')
 
     def kill_workers(self):
-        [t.die() for t in self.workers + self.job_workers]
-        [t.join() for t in self.workers + self.job_workers]
         if self.actionthread:
             self.actionthread.die()
             self.actionthread.join()
 
-    def initialize_workers(self):
+    def initialize_actionthreads(self):
         connection = pika.BlockingConnection(
-            pika.ConnectionParameters('localhost')
+            pika.URLParameters(conf_get("amqp_uri"))
         )
         channel = connection.channel()
-        channel.exchange_declare(exchange='topic_command', exchange_type='topic')
-        channel.exchange_declare(exchange='topic_parse', exchange_type='topic')
-        channel.queue_declare(queue=WORKER_QUEUE, durable=True)
         channel.queue_declare(queue=self.actionqueue, durable=True)
 
-        for classes in PLUGIN_MODULES.values():
-            for cls in classes:
-                try:
-                    worker = cls(actionqueue=self.actionqueue)
-                    self.job_workers.append(worker)
-                    worker.start()
-                except Exception as e:
-                    # Whoopsie. Thatz a broken thing.
-                    raise SystemExit()
-
-    def initialize_actionthreads(self):
-
         self.actionthread = action_worker.ActionThread(bot=self)
         self.actionthread.start()
 
-    # TODO: doesnt work...
-    def reset_all_workerthreads(self):
-        from importlib import reload
-
-        [t.die() for t in self.job_workers]
-        [t.join() for t in self.job_workers]
-
-        self.actionthread.die()
-        self.actionthread.join()
-
-        reload(worker_mod)
-        reload(action_worker)
-
-        for module in PLUGIN_MODULES.keys():
-            reload(module)
-
-        self.initialize_workers()
-        self.initialize_actionthreads()
-        self.echo("code blue")
-
     # TODO: doesn't stop
     def disconnect(self, reconnect=False, wait=None, send_close=True):
         logger.info("Stopping all workers...")
@@ -139,7 +77,6 @@ class Bot(sleekxmpp.ClientXMPP):
                 wait=True
             )
             logger.info('%s: joined with code %s' % (room, ret))
-        self.initialize_workers()
         self.initialize_actionthreads()
 
     def muc_message(self, msg):
@@ -178,13 +115,6 @@ class Bot(sleekxmpp.ClientXMPP):
             if "hangup" in message:
                 self.disconnect()
 
-            if "reset-workers" in message:
-                self.echo("No, please... !")
-                [t.die() for t in self.job_workers]
-                [t.join() for t in self.job_workers]
-                self.initialize_workers()
-                self.echo("argh...")
-
             if "reset-worker-one" in message:
                 self.echo("But I was your fav..ARGH")
                 self.actionthread.die()
index cc42b71fb581d3517cadc157767b13eafcb4e1db..04aa8ae48d0ffb164d1332314b7c606b72b192e0 100644 (file)
@@ -53,7 +53,7 @@ class Worker(threading.Thread):
         except:
             logger.exception("Oops. Registration failed")
 
-        connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
+        connection = pika.BlockingConnection(pika.URLParameters(conf_get("amqp_uri")))
         self.channel = connection.channel()
         self.channel.exchange_declare(exchange='classifier', exchange_type='topic')
 
@@ -113,7 +113,7 @@ class Worker(threading.Thread):
 
     def register_plugin(self):
 
-        connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
+        connection = pika.BlockingConnection(pika.URLParameters(conf_get("amqp_uri")))
         channel = connection.channel()
         channel.queue_declare(queue='plugin_registry')
         channel.basic_publish(
index 13dfd1b09654af7a6c612c4aea6e164f48a2d07c..7ea70fd1b7906e773c6ae4a64d6823ce70415055 100644 (file)
@@ -3,6 +3,7 @@ import json
 from copy import deepcopy
 
 import pika
+from distbot.common.config import conf_get
 
 
 class Action:
@@ -55,7 +56,7 @@ class Action:
 
 def send_action(actionqueue, action):
     connection = pika.BlockingConnection(
-        pika.ConnectionParameters('localhost')
+        pika.URLParameters(conf_get("amqp_uri"))
     )
     channel = connection.channel()
     channel.queue_declare(queue=actionqueue, durable=True)
index c912b627c47c69bc26ac0bc92ed841c55e661a65..93f649e73b80e56680f72aefc6a9399709753fb1 100644 (file)
@@ -12,3 +12,5 @@ detectlanguage_api_key = string
 # rate limiting, TODO
 hist_max_count = integer(default=5)
 hist_max_time = integer(default=10*60)
+
+amqp_uri = string(default="amqp://guest:guest@localhost:5672/%2F")
index 1366d3e0fb5eb71322b8e3efd9042cc168e0884d..6b131ab6ef311d79309cfd8e8e2de2bbef391c14 100644 (file)
@@ -5,6 +5,7 @@ import logging
 import shlex
 
 import pika
+from distbot.common.config import conf_get
 from sleekxmpp import Message, Presence
 from sleekxmpp.jid import JID
 
@@ -42,12 +43,12 @@ def get_nick_from_message(message_obj):
 
 
 def process_message(routing_key, body):
-    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
+    connection = pika.BlockingConnection(pika.URLParameters(conf_get("amqp_uri")))
     channel = connection.channel()
     channel.exchange_declare(exchange='classifier', exchange_type='topic')
 
     connection = pika.BlockingConnection(
-        pika.ConnectionParameters('localhost')
+        pika.URLParameters(conf_get("amqp_uri"))
     )
     channel = connection.channel()
 
diff --git a/distbot/minijobber/__init__.py b/distbot/minijobber/__init__.py
new file mode 100644 (file)
index 0000000..40a96af
--- /dev/null
@@ -0,0 +1 @@
+# -*- coding: utf-8 -*-
diff --git a/distbot/minijobber/run.py b/distbot/minijobber/run.py
new file mode 100644 (file)
index 0000000..219ca8e
--- /dev/null
@@ -0,0 +1,99 @@
+# -*- coding: utf-8 -*-
+import signal
+from time import sleep
+
+import pika
+from distbot.common.config import conf_get
+
+from distbot.bot import worker as worker_mod
+
+from distbot.plugins import basic, fun, lookup, url, feeds, muc, translation, searx, queue_management, plugin_help, \
+    morse, meta, \
+    extended, bugtracker, bots, bofh
+
+WORKER_QUEUE = "work"
+# TODO read from config
+ACTION_QUEUE = "action_processing"
+PLUGIN_MODULES = {
+    basic: basic.ALL,
+    bofh: bofh.ALL,
+    bots: bots.ALL,
+    bugtracker: bugtracker.ALL,
+    extended: extended.ALL,
+    feeds: feeds.ALL,
+    fun: fun.ALL,
+    lookup: lookup.ALL,
+    meta: meta.ALL,
+    morse: morse.ALL,
+    muc: muc.ALL,
+    plugin_help: plugin_help.ALL,
+    queue_management: queue_management.ALL,
+    searx: searx.ALL,
+    translation: translation.ALL,
+    url: url.ALL,
+}
+job_workers = []
+
+
+def initialize_workers():
+    connection = pika.BlockingConnection(
+        pika.URLParameters(conf_get("amqp_uri"))
+    )
+    channel = connection.channel()
+    channel.exchange_declare(exchange='topic_command', exchange_type='topic')
+    channel.exchange_declare(exchange='topic_parse', exchange_type='topic')
+    channel.queue_declare(queue=WORKER_QUEUE, durable=True)
+    channel.queue_declare(queue=ACTION_QUEUE, durable=True)
+
+    for classes in PLUGIN_MODULES.values():
+        for cls in classes:
+            try:
+                worker = cls(actionqueue=ACTION_QUEUE)
+                job_workers.append(worker)
+                worker.start()
+            except Exception as e:
+                import logging
+                logging.exception(e)
+                # Whoopsie. Thatz a broken thing.
+                raise SystemExit()
+
+
+# TODO: doesnt work...
+# https://lists.gt.net/python/python/139992
+def reset_workers(signum, frame):
+    from importlib import reload
+    stop_workers(signum, frame)
+
+    reload(worker_mod)
+    for module in PLUGIN_MODULES.keys():
+        reload(module)
+
+    initialize_workers()
+
+
+def stop_workers(signum, frame):
+    print("stopping workers")
+    [t.die() for t in job_workers]
+    print("joining workers")
+    [t.join() for t in job_workers]
+    print("stopped workers.")
+    job_workers.clear()
+
+
+def dump_status(*_):
+    print("current threads in queue:")
+    for t in job_workers:
+        print(t)
+
+
+def run():
+    signal.signal(signal.SIGTERM, stop_workers)
+    # signal.signal(signal.SIGHUP, reset_workers)
+    signal.signal(signal.SIGALRM, dump_status)
+    initialize_workers()
+    while True:
+        sleep(100)
+
+
+if __name__ == '__main__':
+    run()