]> git.aero2k.de Git - urlbot-v3.git/commitdiff
reinvent teatimer
authorThorsten <mail@aero2k.de>
Sun, 28 Jun 2020 15:20:39 +0000 (17:20 +0200)
committerThorsten <mail@aero2k.de>
Sun, 28 Jun 2020 15:23:10 +0000 (17:23 +0200)
distbot/bot/action_worker.py

index fcf1bc64421176bc95acef8655dffdd8e985d78a..207ae0b0ba527ea21d1c4570a86d362caf431d85 100644 (file)
@@ -1,7 +1,8 @@
 # -*- coding: utf-8 -*-
+import asyncio
 import json
 import logging
-import threading
+import sched
 import time
 
 from pika.channel import Channel
@@ -12,26 +13,11 @@ from distbot.common.message import process_message
 
 logger = logging.getLogger(__name__)
 
-EVENTLOOP_DELAY = 0.1
-
-
-class EventLoop(threading.Thread):
-    event_list = None
-
-    def __init__(self, event_list):
-        super().__init__()
-        self.event_list = event_list
-
-    def run(self):
-        while 1:
-            self.event_list.run(False)
-            time.sleep(EVENTLOOP_DELAY)
-
 
 class ActionWorker(object):
     exchange = ''
 
-    event_list = sched.scheduler(time.time, time.sleep)
+    event_list = sched.scheduler(time.time, time.sleep)
 
     @staticmethod
     def parse_body(body):
@@ -45,6 +31,9 @@ class ActionWorker(object):
         self._queue_name = queue_name
         self._consumer_tag = None
 
+        self.loop = asyncio.get_event_loop()
+        self.idle_task = self.loop.create_task(self.tick())
+
     def open_connection(self, connection):
         self._connection = connection
         connection.channel(on_open_callback=self.open_channel)
@@ -67,15 +56,16 @@ class ActionWorker(object):
         will invoke when a message is fully received.
 
         """
-        logging.info('Issuing consumer related RPC commands')
+        logger.info('Issuing consumer related RPC commands')
         # self.add_on_cancel_callback()
         self._consumer_tag = self._channel.basic_consume(
             self.on_message, self._queue_name
         )
-        logging.info("Setup consumer for actions on ch %s on con %s with tag %s", self._channel, self._connection, self._consumer_tag)
+        logger.info("Setup consumer for actions on ch %s on con %s with tag %s", self._channel, self._connection,
+                     self._consumer_tag)
 
     def on_message(self, ch, method, properties, body):
-        logging.info('Received message # %s from %s: %s',
+        logger.info('Received message # %s from %s: %s',
                      method.delivery_tag, properties.app_id, body)
         body = json.loads(body.decode("utf-8"))
 
@@ -89,8 +79,6 @@ class ActionWorker(object):
         ch.basic_ack(delivery_tag=method.delivery_tag)
 
     def find_scheduled_action_by_mutex(self, mutex):
-        # TODO...
-        return
         action_item = None
         for item in self.event_list.queue:
             if item.kwargs["action"].mutex == mutex:
@@ -102,28 +90,27 @@ class ActionWorker(object):
         """
         :type event: Action
         """
-        # TODO: outsource delayed events
-        raise RuntimeError("Die Bahnhofshalle ist derzeit wegen Wartungsarbeiten geschlossen.")
-
         # TODO: mutex handling
         logger.info("scheduling event: %s", event.serialize())
         if event.mutex and self.find_scheduled_action_by_mutex(event.mutex):
             logger.info("not scheduling that event (prevented by mutex)")
             raise RuntimeError("not scheduling that event (prevented by mutex)")
 
-        self.event_list.enterabs(
-            event.time, 0, send_action,
-            kwargs={'actionqueue': self._queue_name, 'action': event}
-        )
+        self.loop.call_later(event.time - time.time(), lambda: send_action(self._queue_name, event))
+
+    @staticmethod
+    async def tick():
+        while True:
+            await asyncio.sleep(1)
+
+    def stop_eventqueue(self):
+        self.idle_task.cancel()
 
     def unschedule_action(self, event):
         """
         Remove a scheduled action
         :type event: Action
         """
-        # TODO...
-        return
-
         item = self.find_scheduled_action_by_mutex(event.mutex)
         if item:
             self.event_list.cancel(item)