# TODO: same in worker, check if that is waste
self.busy = False
- connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
+ connection = pika.BlockingConnection(pika.URLParameters(conf_get("amqp_uri")))
self.channel = connection.channel()
def callback(self, ch, method, properties, body):
import pika
import sleekxmpp
-from distbot.bot import action_worker, worker as worker_mod
+from distbot.bot import action_worker
from distbot.common.config import conf_get
from distbot.common.message import process_message, get_nick_from_message
-from distbot.plugins import basic, fun, lookup, url, feeds, muc, translation, searx, queue_management, plugin_help, \
- morse, meta, \
- extended, bugtracker, bots, bofh
logger = logging.getLogger(__name__)
logging.getLogger("pika").setLevel(logging.WARN)
logging.getLogger('sleekxmpp').setLevel(logging.INFO)
-WORKER_QUEUE = "work"
-
-PLUGIN_MODULES = {
- basic: basic.ALL,
- bofh: bofh.ALL,
- bots: bots.ALL,
- bugtracker: bugtracker.ALL,
- extended: extended.ALL,
- feeds: feeds.ALL,
- fun: fun.ALL,
- lookup: lookup.ALL,
- meta: meta.ALL,
- morse: morse.ALL,
- muc: muc.ALL,
- plugin_help: plugin_help.ALL,
- queue_management: queue_management.ALL,
- searx: searx.ALL,
- translation: translation.ALL,
- url: url.ALL,
-}
-
class Bot(sleekxmpp.ClientXMPP):
def __init__(self, jid, password, rooms, nick):
super(Bot, self).__init__(jid, password)
+ # TODO read from config
self.actionqueue = 'action_processing'
self.actionthread = None
- self.job_workers = []
- self.workers = []
self.rooms = rooms
self.nick = nick
self.register_plugin('xep_0308')
def kill_workers(self):
- [t.die() for t in self.workers + self.job_workers]
- [t.join() for t in self.workers + self.job_workers]
if self.actionthread:
self.actionthread.die()
self.actionthread.join()
- def initialize_workers(self):
+ def initialize_actionthreads(self):
connection = pika.BlockingConnection(
- pika.ConnectionParameters('localhost')
+ pika.URLParameters(conf_get("amqp_uri"))
)
channel = connection.channel()
- channel.exchange_declare(exchange='topic_command', exchange_type='topic')
- channel.exchange_declare(exchange='topic_parse', exchange_type='topic')
- channel.queue_declare(queue=WORKER_QUEUE, durable=True)
channel.queue_declare(queue=self.actionqueue, durable=True)
- for classes in PLUGIN_MODULES.values():
- for cls in classes:
- try:
- worker = cls(actionqueue=self.actionqueue)
- self.job_workers.append(worker)
- worker.start()
- except Exception as e:
- # Whoopsie. Thatz a broken thing.
- raise SystemExit()
-
- def initialize_actionthreads(self):
-
self.actionthread = action_worker.ActionThread(bot=self)
self.actionthread.start()
- # TODO: doesnt work...
- def reset_all_workerthreads(self):
- from importlib import reload
-
- [t.die() for t in self.job_workers]
- [t.join() for t in self.job_workers]
-
- self.actionthread.die()
- self.actionthread.join()
-
- reload(worker_mod)
- reload(action_worker)
-
- for module in PLUGIN_MODULES.keys():
- reload(module)
-
- self.initialize_workers()
- self.initialize_actionthreads()
- self.echo("code blue")
-
# TODO: doesn't stop
def disconnect(self, reconnect=False, wait=None, send_close=True):
logger.info("Stopping all workers...")
wait=True
)
logger.info('%s: joined with code %s' % (room, ret))
- self.initialize_workers()
self.initialize_actionthreads()
def muc_message(self, msg):
if "hangup" in message:
self.disconnect()
- if "reset-workers" in message:
- self.echo("No, please... !")
- [t.die() for t in self.job_workers]
- [t.join() for t in self.job_workers]
- self.initialize_workers()
- self.echo("argh...")
-
if "reset-worker-one" in message:
self.echo("But I was your fav..ARGH")
self.actionthread.die()
except:
logger.exception("Oops. Registration failed")
- connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
+ connection = pika.BlockingConnection(pika.URLParameters(conf_get("amqp_uri")))
self.channel = connection.channel()
self.channel.exchange_declare(exchange='classifier', exchange_type='topic')
def register_plugin(self):
- connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
+ connection = pika.BlockingConnection(pika.URLParameters(conf_get("amqp_uri")))
channel = connection.channel()
channel.queue_declare(queue='plugin_registry')
channel.basic_publish(
from copy import deepcopy
import pika
+from distbot.common.config import conf_get
class Action:
def send_action(actionqueue, action):
connection = pika.BlockingConnection(
- pika.ConnectionParameters('localhost')
+ pika.URLParameters(conf_get("amqp_uri"))
)
channel = connection.channel()
channel.queue_declare(queue=actionqueue, durable=True)
# rate limiting, TODO
hist_max_count = integer(default=5)
hist_max_time = integer(default=10*60)
+
+amqp_uri = string(default="amqp://guest:guest@localhost:5672/%2F")
import shlex
import pika
+from distbot.common.config import conf_get
from sleekxmpp import Message, Presence
from sleekxmpp.jid import JID
def process_message(routing_key, body):
- connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
+ connection = pika.BlockingConnection(pika.URLParameters(conf_get("amqp_uri")))
channel = connection.channel()
channel.exchange_declare(exchange='classifier', exchange_type='topic')
connection = pika.BlockingConnection(
- pika.ConnectionParameters('localhost')
+ pika.URLParameters(conf_get("amqp_uri"))
)
channel = connection.channel()
--- /dev/null
+# -*- coding: utf-8 -*-
--- /dev/null
+# -*- coding: utf-8 -*-
+import signal
+from time import sleep
+
+import pika
+from distbot.common.config import conf_get
+
+from distbot.bot import worker as worker_mod
+
+from distbot.plugins import basic, fun, lookup, url, feeds, muc, translation, searx, queue_management, plugin_help, \
+ morse, meta, \
+ extended, bugtracker, bots, bofh
+
+WORKER_QUEUE = "work"
+# TODO read from config
+ACTION_QUEUE = "action_processing"
+PLUGIN_MODULES = {
+ basic: basic.ALL,
+ bofh: bofh.ALL,
+ bots: bots.ALL,
+ bugtracker: bugtracker.ALL,
+ extended: extended.ALL,
+ feeds: feeds.ALL,
+ fun: fun.ALL,
+ lookup: lookup.ALL,
+ meta: meta.ALL,
+ morse: morse.ALL,
+ muc: muc.ALL,
+ plugin_help: plugin_help.ALL,
+ queue_management: queue_management.ALL,
+ searx: searx.ALL,
+ translation: translation.ALL,
+ url: url.ALL,
+}
+job_workers = []
+
+
+def initialize_workers():
+ connection = pika.BlockingConnection(
+ pika.URLParameters(conf_get("amqp_uri"))
+ )
+ channel = connection.channel()
+ channel.exchange_declare(exchange='topic_command', exchange_type='topic')
+ channel.exchange_declare(exchange='topic_parse', exchange_type='topic')
+ channel.queue_declare(queue=WORKER_QUEUE, durable=True)
+ channel.queue_declare(queue=ACTION_QUEUE, durable=True)
+
+ for classes in PLUGIN_MODULES.values():
+ for cls in classes:
+ try:
+ worker = cls(actionqueue=ACTION_QUEUE)
+ job_workers.append(worker)
+ worker.start()
+ except Exception as e:
+ import logging
+ logging.exception(e)
+ # Whoopsie. Thatz a broken thing.
+ raise SystemExit()
+
+
+# TODO: doesnt work...
+# https://lists.gt.net/python/python/139992
+def reset_workers(signum, frame):
+ from importlib import reload
+ stop_workers(signum, frame)
+
+ reload(worker_mod)
+ for module in PLUGIN_MODULES.keys():
+ reload(module)
+
+ initialize_workers()
+
+
+def stop_workers(signum, frame):
+ print("stopping workers")
+ [t.die() for t in job_workers]
+ print("joining workers")
+ [t.join() for t in job_workers]
+ print("stopped workers.")
+ job_workers.clear()
+
+
+def dump_status(*_):
+ print("current threads in queue:")
+ for t in job_workers:
+ print(t)
+
+
+def run():
+ signal.signal(signal.SIGTERM, stop_workers)
+ # signal.signal(signal.SIGHUP, reset_workers)
+ signal.signal(signal.SIGALRM, dump_status)
+ initialize_workers()
+ while True:
+ sleep(100)
+
+
+if __name__ == '__main__':
+ run()