# -*- coding: utf-8 -*-
import asyncio
+import functools
import json
import logging
import time
def find_scheduled_action_by_mutex(self, mutex):
action_item = None
for item in self.event_list:
- if item._args and item._args[0].mutex == mutex:
+ if item._args and item._args[0].mutex == mutex and item.when() >= self.loop.time():
action_item = item
break
return action_item
- def schedule_action(self, event):
- """
- :type event: Action
- """
- # TODO: mutex handling
- logger.info("scheduling event: %s", event.serialize())
- if event.mutex and self.find_scheduled_action_by_mutex(event.mutex):
+ def schedule_action(self, action: Action):
+ logger.info("scheduling event: %s", action.serialize())
+ if action.mutex and self.find_scheduled_action_by_mutex(action.mutex):
logger.info("not scheduling that event (prevented by mutex)")
raise RuntimeError("not scheduling that event (prevented by mutex)")
- handle: TimerHandle = self.loop.call_later(event.time - time.time(), lambda ev: send_action(self._queue_name, ev), event)
+ handle: TimerHandle = self.loop.call_later(action.time - time.time(), functools.partial(send_action, self._queue_name), action)
self.event_list.append(handle)
@staticmethod
return False
-def send_action(actionqueue, action):
+def send_action(actionqueue, action: Action):
connection = pika.BlockingConnection(
pika.URLParameters(conf_get("amqp_uri"))
)