import hashlib
import json
+import logging
import time
from dataclasses import dataclass
from datetime import datetime
from distbot.common.config import conf_get
from distbot.common.message import get_words, get_nick_from_message
+logger = logging.getLogger(__name__)
+
@dataclass
class Poll:
- hashed key for each poll
- "active" key with the hashed key of the active poll
"""
- binding_keys = ["nick.poll.*.vs.*", "nick.vote.*", "nick.pollstatus"]
+ binding_keys = [
+ "nick.poll.*.vs.*", "nick.poll.*.vs.*.*",
+ "nick.vote.*",
+ "nick.pollstatus",
+ "nick.endpoll",
+ "nick.droppoll.*.vs.*",
+ ]
description = "Vote on polls (A vs B)"
+ usage = (
+ "Create a new poll using poll 'ice cream' vs cake, optionally followed by voting time in seconds (max 1h), "
+ "then respond with vote a/b or vote cake to place a vote. Use pollstatus for a report on current votes.")
vote_duration = 60 * 1
+ max_vote_duration = 60 * 60 * 12
KEY_ACTIVE = "active".encode()
# parse options
option_a = words[1]
option_b = words[3]
+ vote_duration = self.vote_duration
+ if len(words) == 5:
+ try:
+ vote_duration = max(30, min(60 * 60 * 3, int(words[4])))
+ except ValueError as e:
+ logger.exception("Failed parsing intended duration", exc_info=e)
# setup new poll
- poll = Poll(option_a, option_b, {}, now + self.vote_duration)
+ poll = Poll(option_a, option_b, {}, now + vote_duration)
# check prior results
if active_poll:
return Action(msg="There is already an active poll.")
self.start_poll(poll)
# setup timeout to disable poll and present results
bot_nick = conf_get("bot_nickname")
- poll_message = f"**New Vote:** {sender} started a vote! Vote for A: {option_a} or B: {option_b} (reply with '{bot_nick}: vote foo' within {self.vote_duration}s)"
+ poll_message = f"**New Vote:** {sender} started a vote! Vote for A: {option_a} or B: {option_b} (reply with '{bot_nick}: vote foo' within {vote_duration}s)"
return Action(
msg=poll_message,
- event=Action(time=now + self.vote_duration, command="nick.pollstatus",
+ event=Action(time=now + vote_duration, command="nick.pollstatus",
msg=f'{sender}: Poll is over!', mutex=f'poll')
)
elif words[0] == "pollstatus":
return self.check_poll()
+ elif words[0] == "endpoll":
+ return self.end_poll()
+ elif words[0] == "droppoll":
+ sudoers = conf_get('sudoers') or []
+ poll_key = Poll.generate_key(words[1], words[3])
+ if sender in sudoers:
+ return self.drop_poll(poll_key)
def get_active_poll(self) -> Poll:
active_key = self.db.get(self.KEY_ACTIVE)
def check_poll(self):
poll = self.get_active_poll()
if not poll:
- return Action(msg="no active poll")
+ return Action(msg="No active poll.")
if poll.due():
return self.end_poll()
else:
def end_poll(self) -> Action | None:
poll = self.get_active_poll()
if not poll:
- return
+ return Action(msg="No active poll.", event=Action(stop_event=True, mutex="poll"))
self.close_poll(poll.key)
report = poll.status_report()
- return Action(msg=report)
+ return Action(msg=report, event=Action(stop_event=True, mutex="poll"))
+
+ def drop_poll(self, key: str | bytes):
+ binkey = key if isinstance(key, bytes) else key.encode()
+ poll = self.get_poll(binkey)
+ if poll:
+ self.db.delete(binkey)
+ return Action(msg="Poll deleted.")
+ else:
+ return Action(msg="libpam_alcohol.so failed? No such poll.")
ALL = [VotePoll]