From beb24492e27a5533885093f8040f1b89a233eec0 Mon Sep 17 00:00:00 2001 From: Thorsten Date: Tue, 4 Jun 2024 20:15:26 +0200 Subject: [PATCH] fix teatimer/async loop --- src/distbot/bot/action_worker.py | 15 ++++++--------- src/distbot/common/action.py | 2 +- 2 files changed, 7 insertions(+), 10 deletions(-) diff --git a/src/distbot/bot/action_worker.py b/src/distbot/bot/action_worker.py index d441850..7fa624a 100644 --- a/src/distbot/bot/action_worker.py +++ b/src/distbot/bot/action_worker.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- import asyncio +import functools import json import logging import time @@ -89,22 +90,18 @@ class ActionWorker(object): def find_scheduled_action_by_mutex(self, mutex): action_item = None for item in self.event_list: - if item._args and item._args[0].mutex == mutex: + if item._args and item._args[0].mutex == mutex and item.when() >= self.loop.time(): action_item = item break return action_item - def schedule_action(self, event): - """ - :type event: Action - """ - # TODO: mutex handling - logger.info("scheduling event: %s", event.serialize()) - if event.mutex and self.find_scheduled_action_by_mutex(event.mutex): + def schedule_action(self, action: Action): + logger.info("scheduling event: %s", action.serialize()) + if action.mutex and self.find_scheduled_action_by_mutex(action.mutex): logger.info("not scheduling that event (prevented by mutex)") raise RuntimeError("not scheduling that event (prevented by mutex)") - handle: TimerHandle = self.loop.call_later(event.time - time.time(), lambda ev: send_action(self._queue_name, ev), event) + handle: TimerHandle = self.loop.call_later(action.time - time.time(), functools.partial(send_action, self._queue_name), action) self.event_list.append(handle) @staticmethod diff --git a/src/distbot/common/action.py b/src/distbot/common/action.py index cb4af52..5b5f727 100644 --- a/src/distbot/common/action.py +++ b/src/distbot/common/action.py @@ -68,7 +68,7 @@ class Action: return False -def send_action(actionqueue, action): +def send_action(actionqueue, action: Action): connection = pika.BlockingConnection( pika.URLParameters(conf_get("amqp_uri")) ) -- 2.39.2