From: Thorsten Date: Sun, 28 Jun 2020 15:20:39 +0000 (+0200) Subject: reinvent teatimer X-Git-Url: https://git.aero2k.de/?a=commitdiff_plain;h=3da97e465892b68a645abdb67e50b294185fd101;p=urlbot-v3.git reinvent teatimer --- diff --git a/distbot/bot/action_worker.py b/distbot/bot/action_worker.py index fcf1bc6..207ae0b 100644 --- a/distbot/bot/action_worker.py +++ b/distbot/bot/action_worker.py @@ -1,7 +1,8 @@ # -*- coding: utf-8 -*- +import asyncio import json import logging -import threading +import sched import time from pika.channel import Channel @@ -12,26 +13,11 @@ from distbot.common.message import process_message logger = logging.getLogger(__name__) -EVENTLOOP_DELAY = 0.1 - - -class EventLoop(threading.Thread): - event_list = None - - def __init__(self, event_list): - super().__init__() - self.event_list = event_list - - def run(self): - while 1: - self.event_list.run(False) - time.sleep(EVENTLOOP_DELAY) - class ActionWorker(object): exchange = '' - # event_list = sched.scheduler(time.time, time.sleep) + event_list = sched.scheduler(time.time, time.sleep) @staticmethod def parse_body(body): @@ -45,6 +31,9 @@ class ActionWorker(object): self._queue_name = queue_name self._consumer_tag = None + self.loop = asyncio.get_event_loop() + self.idle_task = self.loop.create_task(self.tick()) + def open_connection(self, connection): self._connection = connection connection.channel(on_open_callback=self.open_channel) @@ -67,15 +56,16 @@ class ActionWorker(object): will invoke when a message is fully received. """ - logging.info('Issuing consumer related RPC commands') + logger.info('Issuing consumer related RPC commands') # self.add_on_cancel_callback() self._consumer_tag = self._channel.basic_consume( self.on_message, self._queue_name ) - logging.info("Setup consumer for actions on ch %s on con %s with tag %s", self._channel, self._connection, self._consumer_tag) + logger.info("Setup consumer for actions on ch %s on con %s with tag %s", self._channel, self._connection, + self._consumer_tag) def on_message(self, ch, method, properties, body): - logging.info('Received message # %s from %s: %s', + logger.info('Received message # %s from %s: %s', method.delivery_tag, properties.app_id, body) body = json.loads(body.decode("utf-8")) @@ -89,8 +79,6 @@ class ActionWorker(object): ch.basic_ack(delivery_tag=method.delivery_tag) def find_scheduled_action_by_mutex(self, mutex): - # TODO... - return action_item = None for item in self.event_list.queue: if item.kwargs["action"].mutex == mutex: @@ -102,28 +90,27 @@ class ActionWorker(object): """ :type event: Action """ - # TODO: outsource delayed events - raise RuntimeError("Die Bahnhofshalle ist derzeit wegen Wartungsarbeiten geschlossen.") - # TODO: mutex handling logger.info("scheduling event: %s", event.serialize()) if event.mutex and self.find_scheduled_action_by_mutex(event.mutex): logger.info("not scheduling that event (prevented by mutex)") raise RuntimeError("not scheduling that event (prevented by mutex)") - self.event_list.enterabs( - event.time, 0, send_action, - kwargs={'actionqueue': self._queue_name, 'action': event} - ) + self.loop.call_later(event.time - time.time(), lambda: send_action(self._queue_name, event)) + + @staticmethod + async def tick(): + while True: + await asyncio.sleep(1) + + def stop_eventqueue(self): + self.idle_task.cancel() def unschedule_action(self, event): """ Remove a scheduled action :type event: Action """ - # TODO... - return - item = self.find_scheduled_action_by_mutex(event.mutex) if item: self.event_list.cancel(item)