# -*- coding: utf-8 -*-
+import asyncio
import json
import logging
-import threading
+import sched
import time
from pika.channel import Channel
logger = logging.getLogger(__name__)
-EVENTLOOP_DELAY = 0.1
-
-
-class EventLoop(threading.Thread):
- event_list = None
-
- def __init__(self, event_list):
- super().__init__()
- self.event_list = event_list
-
- def run(self):
- while 1:
- self.event_list.run(False)
- time.sleep(EVENTLOOP_DELAY)
-
class ActionWorker(object):
exchange = ''
- # event_list = sched.scheduler(time.time, time.sleep)
+ event_list = sched.scheduler(time.time, time.sleep)
@staticmethod
def parse_body(body):
self._queue_name = queue_name
self._consumer_tag = None
+ self.loop = asyncio.get_event_loop()
+ self.idle_task = self.loop.create_task(self.tick())
+
def open_connection(self, connection):
self._connection = connection
connection.channel(on_open_callback=self.open_channel)
will invoke when a message is fully received.
"""
- logging.info('Issuing consumer related RPC commands')
+ logger.info('Issuing consumer related RPC commands')
# self.add_on_cancel_callback()
self._consumer_tag = self._channel.basic_consume(
self.on_message, self._queue_name
)
- logging.info("Setup consumer for actions on ch %s on con %s with tag %s", self._channel, self._connection, self._consumer_tag)
+ logger.info("Setup consumer for actions on ch %s on con %s with tag %s", self._channel, self._connection,
+ self._consumer_tag)
def on_message(self, ch, method, properties, body):
- logging.info('Received message # %s from %s: %s',
+ logger.info('Received message # %s from %s: %s',
method.delivery_tag, properties.app_id, body)
body = json.loads(body.decode("utf-8"))
ch.basic_ack(delivery_tag=method.delivery_tag)
def find_scheduled_action_by_mutex(self, mutex):
- # TODO...
- return
action_item = None
for item in self.event_list.queue:
if item.kwargs["action"].mutex == mutex:
"""
:type event: Action
"""
- # TODO: outsource delayed events
- raise RuntimeError("Die Bahnhofshalle ist derzeit wegen Wartungsarbeiten geschlossen.")
-
# TODO: mutex handling
logger.info("scheduling event: %s", event.serialize())
if event.mutex and self.find_scheduled_action_by_mutex(event.mutex):
logger.info("not scheduling that event (prevented by mutex)")
raise RuntimeError("not scheduling that event (prevented by mutex)")
- self.event_list.enterabs(
- event.time, 0, send_action,
- kwargs={'actionqueue': self._queue_name, 'action': event}
- )
+ self.loop.call_later(event.time - time.time(), lambda: send_action(self._queue_name, event))
+
+ @staticmethod
+ async def tick():
+ while True:
+ await asyncio.sleep(1)
+
+ def stop_eventqueue(self):
+ self.idle_task.cancel()
def unschedule_action(self, event):
"""
Remove a scheduled action
:type event: Action
"""
- # TODO...
- return
-
item = self.find_scheduled_action_by_mutex(event.mutex)
if item:
self.event_list.cancel(item)