chdir: ~/urlbot-v3
- name: set configuration
- lineinfile: dest=~/urlbot/local_config.ini create=yes line="{{item.key}} = {{item.value}}" regexp="^{{item.key}}.="
+ lineinfile: dest=~/urlbot-v3/local_config.ini create=yes line="{{item.key}} = {{item.value}}" regexp="^{{item.key}}.="
with_items:
- key: "jid"
value: "{{jid}}"
import sched
import time
+import pika
from pika.channel import Channel
from distbot.common.action import Action, send_action
logger.info("Setup consumer for actions on ch %s on con %s with tag %s", self._channel, self._connection,
self._consumer_tag)
- def on_message(self, ch, method, properties, body):
+ def on_message(self, ch, method, properties: pika.spec.BasicProperties, body):
logger.info('Received message # %s from %s: %s',
method.delivery_tag, properties.app_id, body)
body = json.loads(body.decode("utf-8"))
import logging
import threading
from collections import deque, defaultdict
+from functools import partial
+from typing import Optional
import pika
import pika.exceptions
-from functools import partial
-
from distbot.common.action import Action, send_action
from distbot.common.config import conf_get
CATCH_ALL = ["#"]
def __init__(self, actionqueue, queue="work"):
- super().__init__()
+ super().__init__(name=self.get_subclass_name())
self.queue = queue
self.actionqueue = actionqueue
if self.uses_history:
else:
self.usage = "(reaction only)"
self.used_channel = None
+ self.connection: Optional[pika.BlockingConnection] = None
+ self.channel = None
- try:
- self.register_plugin()
- except:
- logger.exception("Oops. Registration failed")
-
- connection = pika.BlockingConnection(pika.URLParameters(conf_get("amqp_uri")))
- self.channel = connection.channel()
+ def init_channel(self):
+ self.connection = pika.BlockingConnection(pika.URLParameters(conf_get("amqp_uri")))
+ self.channel = self.connection.channel()
self.channel.exchange_declare(exchange='classifier', exchange_type='topic')
def callback(self, ch, method, properties, body):
return self.__class__.__name__
def die(self):
- logger.info("Bye from %s", self.get_subclass_name())
- self.channel.stop_consuming()
+ self.connection.add_callback_threadsafe(callback=self.channel.stop_consuming)
def run(self):
+ try:
+ self.register_plugin()
+ except:
+ logger.exception("Oops. Registration failed")
+ self.init_channel()
+
result = self.channel.queue_declare(exclusive=True, queue="")
self.queue = result.method.queue
-
for binding_key in self.binding_keys:
logger.info("Registering plugin %s for %s", self.get_subclass_name(), binding_key)
self.channel.queue_bind(
from time import sleep
import pika
-from distbot.common.config import conf_get
+from distbot.bot.worker import Worker
from distbot.bot import worker as worker_mod
-
+from distbot.common.config import conf_get
from distbot.plugins import (
basic, fun, lookup, url, feeds, muc, translation, searx, queue_management, plugin_help,
morse, meta,
extended, bugtracker, bots, bofh, didyouknow,
- debug, youtube
+ youtube
)
+
logger = logging.getLogger(__name__)
WORKER_QUEUE = "work"
youtube: youtube.ALL,
# debug: debug.ALL
}
-job_workers = []
+job_workers: list[Worker] = []
def initialize_workers():
signal.signal(signal.SIGALRM, dump_status)
initialize_workers()
try:
- while True:
- sleep(100)
+ while job_workers:
+ sleep(1)
except KeyboardInterrupt:
- import os
- os.kill(os.getpid(), signal.SIGTERM)
+ stop_workers(None, None)
logger.info("Exiting...")
exit(0)
args=(sys.stderr,)
[formatter_formatter]
-format=%(levelname).1s %(name)s:%(funcName)-15s %(message)s
+format=%(threadName)s %(levelname).1s %(name)s:%(funcName)-15s %(message)s
+