]> git.aero2k.de Git - urlbot-v3.git/commitdiff
fix teatimer/async loop
authorThorsten <mail@aero2k.de>
Tue, 4 Jun 2024 18:15:26 +0000 (20:15 +0200)
committerThorsten <mail@aero2k.de>
Tue, 4 Jun 2024 18:15:26 +0000 (20:15 +0200)
src/distbot/bot/action_worker.py
src/distbot/common/action.py

index d441850b4b71c03cdd4f146db6bb350fe3692bf5..7fa624a447a3f84f53fd4ef02a0946bc9c41d3b0 100644 (file)
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 import asyncio
+import functools
 import json
 import logging
 import time
@@ -89,22 +90,18 @@ class ActionWorker(object):
     def find_scheduled_action_by_mutex(self, mutex):
         action_item = None
         for item in self.event_list:
-            if item._args and item._args[0].mutex == mutex:
+            if item._args and item._args[0].mutex == mutex and item.when() >= self.loop.time():
                 action_item = item
                 break
         return action_item
 
-    def schedule_action(self, event):
-        """
-        :type event: Action
-        """
-        # TODO: mutex handling
-        logger.info("scheduling event: %s", event.serialize())
-        if event.mutex and self.find_scheduled_action_by_mutex(event.mutex):
+    def schedule_action(self, action: Action):
+        logger.info("scheduling event: %s", action.serialize())
+        if action.mutex and self.find_scheduled_action_by_mutex(action.mutex):
             logger.info("not scheduling that event (prevented by mutex)")
             raise RuntimeError("not scheduling that event (prevented by mutex)")
 
-        handle: TimerHandle = self.loop.call_later(event.time - time.time(), lambda ev: send_action(self._queue_name, ev), event)
+        handle: TimerHandle = self.loop.call_later(action.time - time.time(), functools.partial(send_action, self._queue_name), action)
         self.event_list.append(handle)
 
     @staticmethod
index cb4af523ab4db62f04b32d7fdc7aab9006148eb1..5b5f7276bce3eb5a3acfac5284e9be4aa6c1eb9a 100644 (file)
@@ -68,7 +68,7 @@ class Action:
             return False
 
 
-def send_action(actionqueue, action):
+def send_action(actionqueue, action: Action):
     connection = pika.BlockingConnection(
         pika.URLParameters(conf_get("amqp_uri"))
     )