# -*- coding: utf-8 -*-
import json
import logging
-import sched
import threading
import time
+from typing import Optional
-import pika
+from pika.adapters import AsyncioConnection
+from pika.channel import Channel
from distbot.common.action import Action, send_action
from distbot.common.config import conf_set, conf_get
time.sleep(EVENTLOOP_DELAY)
-class ActionThread(threading.Thread):
- event_list = sched.scheduler(time.time, time.sleep)
+class ActionWorker(object):
+ exchange = ''
- def __init__(self, bot):
- super().__init__()
- self.event_thread = EventLoop(self.event_list)
- self.bot = bot
- # TODO: same in worker, check if that is waste
- self.busy = False
+ # event_list = sched.scheduler(time.time, time.sleep)
+
+ @staticmethod
+ def parse_body(body):
+ action = Action.deserialize(body)
+ return action
- connection = pika.BlockingConnection(pika.URLParameters(conf_get("amqp_uri")))
- self.channel = connection.channel()
+ def __init__(self, bot, queue_name):
+ self._bot = bot
+ self._connection: Optional[AsyncioConnection] = None
+ self._channel: Optional[Channel] = None
+ self._queue_name = queue_name
+ self._consumer_tag = None
+
+ def open_connection(self, connection):
+ self._connection = connection
+ connection.channel(on_open_callback=self.open_channel)
+
+ def open_channel(self, channel: Channel):
+ # self.add_on_channel_close_callback()
+ # self.setup_exchange(self.EXCHANGE)
+ self._channel = channel
+ channel.add_on_close_callback(lambda *_: self._bot.disconnect())
+ channel.queue_declare(queue=self._queue_name, durable=True, callback=lambda _: self.start_consuming())
+ channel.add_on_cancel_callback(lambda _: channel.close())
+
+ def start_consuming(self):
+ """This method sets up the consumer by first calling
+ add_on_cancel_callback so that the object is notified if RabbitMQ
+ cancels the consumer. It then issues the Basic.Consume RPC command
+ which returns the consumer tag that is used to uniquely identify the
+ consumer with RabbitMQ. We keep the value to use it when we want to
+ cancel consuming. The on_message method is passed in as a callback pika
+ will invoke when a message is fully received.
- def callback(self, ch, method, properties, body):
+ """
+ logging.info('Issuing consumer related RPC commands')
+ # self.add_on_cancel_callback()
+ self._consumer_tag = self._channel.basic_consume(
+ self.on_message, self._queue_name
+ )
+ logging.info(
+ f"Setup consumer for actions on ch {self._channel} on con {self._connection} with tag {self._consumer_tag}")
+
+ def on_message(self, ch, method, properties, body):
+ logging.info('Received message # %s from %s: %s',
+ method.delivery_tag, properties.app_id, body)
body = json.loads(body.decode("utf-8"))
- self.busy = True
action = self.parse_body(body)
try:
except:
logger.exception("Could not run action: %s", body)
- self.busy = False
logger.debug("Done")
ch.basic_ack(delivery_tag=method.delivery_tag)
- @staticmethod
- def parse_body(body):
- action = Action.deserialize(body)
- return action
-
- def run(self):
- logger.debug("Processing actions in queue %s", self.bot.actionqueue)
- self.event_thread.start()
-
- self.channel.queue_declare(queue=self.bot.actionqueue, durable=True)
- self.channel.basic_consume(self.callback, queue=self.bot.actionqueue)
- self.channel.start_consuming()
-
- def die(self):
- logger.debug("Quitting...")
- self.channel.stop_consuming()
- logger.debug("action_worker gone")
-
def find_scheduled_action_by_mutex(self, mutex):
+ # TODO...
+ return
action_item = None
for item in self.event_list.queue:
if item.kwargs["action"].mutex == mutex:
"""
:type event: Action
"""
+ # TODO: outsource delayed events
+ raise RuntimeError("Die Bahnhofshalle ist derzeit wegen Wartungsarbeiten geschlossen.")
+
# TODO: mutex handling
logger.info("scheduling event: %s", event.serialize())
if event.mutex and self.find_scheduled_action_by_mutex(event.mutex):
self.event_list.enterabs(
event.time, 0, send_action,
- kwargs={'actionqueue': self.bot.actionqueue, 'action': event}
+ kwargs={'actionqueue': self._queue_name, 'action': event}
)
def unschedule_action(self, event):
Remove a scheduled action
:type event: Action
"""
+ # TODO...
+ return
+
item = self.find_scheduled_action_by_mutex(event.mutex)
if item:
self.event_list.cancel(item)
if action.msg: # and rate_limit(RATE_CHAT | plugin.ratelimit_class):
time.sleep(delay)
- self.bot.echo(action.msg, action.recipient)
+ self._bot.echo(action.msg, action.recipient)
# TODO test that
if action.priv_msg: # and rate_limit(RATE_CHAT | plugin.ratelimit_class):
- self.bot.echo(action['priv_msg'], action.sender)
+ self._bot.echo(action.priv_msg, action.sender)
if action.presence:
conf_set('presence', action.presence)
request_counter = int(conf_get('request_counter'))
conf_set('request_counter', request_counter + 1)
+
+ def die(self):
+ self._connection.close()
import shlex
import pika
-import slixmpp
+from pika.adapters import AsyncioConnection
+import slixmpp
from distbot.bot import action_worker
from distbot.common.config import conf_get
from distbot.common.message import process_message, get_nick_from_message
super(Bot, self).__init__(jid, password)
# TODO read from config
- self.actionqueue = 'action_processing'
- self.actionthread = None
+ self.action_queue = 'action_processing'
+ self.action_worker = action_worker.ActionWorker(bot=self, queue_name=self.action_queue)
self.rooms = rooms
self.nick = nick
-
- # from slixmpp.plugins.xep_0045 import XEP_0045
- # self.add_event_handler('handle_groupchat_presence', XEP_0045.handle_groupchat_presence)
self.add_event_handler('session_start', self.session_start)
self.add_event_handler('groupchat_message', self.muc_message)
self.add_event_handler('message', self.message_handler)
self.add_event_handler('muc::%s::got_online' % room, self.muc_online)
self._initialize_plugins()
- # import ssl
- # self.ssl_version = ssl.PROTOCOL_TLSv1_2
def _initialize_plugins(self):
self.register_plugin('xep_0045')
self.register_plugin('xep_0308')
def kill_workers(self):
- if self.actionthread:
- self.actionthread.die()
- self.actionthread.join()
+ if self.action_worker:
+ self.action_worker.die()
- def initialize_actionthreads(self):
- connection = pika.BlockingConnection(
- pika.URLParameters(conf_get("amqp_uri"))
+ def initialize_actionworker(self):
+ connection = AsyncioConnection(
+ pika.URLParameters(conf_get("amqp_uri")),
+ on_open_callback=self.action_worker.open_connection
)
- channel = connection.channel()
- channel.queue_declare(queue=self.actionqueue, durable=True)
-
- self.actionthread = action_worker.ActionThread(bot=self)
- self.actionthread.start()
- # TODO: doesn't stop
def disconnect(self, reconnect=False, wait=None, send_close=True):
logger.info("Stopping all workers...")
self.kill_workers()
logger.info('%s: joining' % room)
ret = self.plugin['xep_0045'].join_muc(room, self.nick, wait=True)
logger.info('%s: joined with code %s' % (room, ret))
- self.initialize_actionthreads()
+ self.initialize_actionworker()
def muc_message(self, msg):
if msg['mucnick'] == self.nick or 'groupchat' != msg['type']:
if "hangup" in message:
self.disconnect()
+ # TODO: probably quite useless
if "reset-worker-one" in message:
self.echo("But I was your fav..ARGH")
- self.actionthread.die()
- self.actionthread.join()
- self.initialize_actionthreads()
+ self.action_worker.die()
+ self.initialize_actionworker()
self.echo("Worker One available for master.")
nick_offset, routing_key = self.get_amqp_routing_key(self.nick, msg)