--- /dev/null
+import hashlib
+import json
+import time
+from dataclasses import dataclass
+from datetime import datetime
+
+import plyvel
+
+from distbot.bot.worker import Worker
+from distbot.common.action import Action
+from distbot.common.config import conf_get
+from distbot.common.message import get_words, get_nick_from_message
+
+
+@dataclass
+class Poll:
+ option_a: str
+ option_b: str
+ votes: dict[str, str]
+ ending_at: float
+
+ def add_vote(self, user, vote):
+ # TODO return copy of itself or use mutable object?
+ # TODO maybe forbid changing vote
+ if vote in self.options:
+ self.votes[user] = vote
+ return self
+
+ @property
+ def options(self):
+ return {self.option_a, self.option_b}
+
+ @property
+ def key(self):
+ return self.generate_key(self.option_a, self.option_b)
+
+ def votes_a(self) -> int:
+ return len([vote for vote in self.votes.values() if vote == self.option_a])
+
+ def votes_b(self) -> int:
+ return len([vote for vote in self.votes.values() if vote == self.option_b])
+
+ @staticmethod
+ def generate_key(option_a, option_b):
+ return hashlib.sha256("".join(sorted((option_a, option_b))).encode()).hexdigest()
+
+ def due(self, timestamp=None) -> bool:
+ if not timestamp:
+ timestamp = time.time()
+ return timestamp > self.ending_at
+
+ @staticmethod
+ def from_json(data: dict):
+ try:
+ return Poll(
+ option_a=data['option_a'],
+ option_b=data['option_b'],
+ votes=data['votes'],
+ ending_at=float(data['ending_at'])
+ )
+ except (KeyError, TypeError, ValueError) as e:
+ raise ValueError(f"Invalid poll data: {e}")
+
+ def to_json(self):
+ return json.dumps(self, default=lambda o: o.__dict__)
+
+ def status_report(self):
+ # Calculate winner and format result message
+ winner = "It's a tie!"
+ if self.votes_a() > self.votes_b():
+ winner = f"**Winner:** {self.option_a}"
+ elif self.votes_a() < self.votes_b():
+ winner = f"**Winner:** {self.option_b}"
+
+ status_message = (f"**Poll status:**\n"
+ f" * {self.option_a}: {self.votes_a()}\n"
+ f" * {self.option_b}: {self.votes_b()}"
+ ) + (f"\n{winner}" if self.due() else "")
+ return status_message
+
+
+class VotePoll(Worker):
+ """
+ leveldb supported voting plugin
+ leveldb contents:
+ - hashed key for each poll
+ - "active" key with the hashed key of the active poll
+ """
+ binding_keys = ["nick.poll.*.vs.*", "nick.vote.*", "nick.pollstatus"]
+ description = "Vote on polls (A vs B)"
+
+ vote_duration = 60 * 1
+
+ KEY_ACTIVE = "active".encode()
+
+ def __init__(self, actionqueue, queue="work"):
+ super().__init__(actionqueue, queue)
+ self.db = plyvel.DB("votepoll.db", create_if_missing=True)
+
+ def parse_body(self, msg: dict) -> Action | None:
+ sender = get_nick_from_message(msg)
+ words = get_words(msg)
+ now: float = time.time()
+
+ # check active polls for timeout, disabling any left-overs
+ active_poll = self.get_active_poll()
+ if active_poll and active_poll.due() and words[0] != "pollstatus":
+ # something went wrong - only pollstatus should ever end the poll.
+ self.close_poll(active_poll.key)
+
+ if words[0] == "vote":
+ # check if words contain any option
+ map_ab = {
+ "A": active_poll.option_a,
+ "B": active_poll.option_b,
+ }
+ vote = map_ab.get(words[1].upper(), words[1])
+
+ if vote in active_poll.options:
+ self.persist(active_poll.add_vote(sender, vote))
+ return Action(msg="Vote added.")
+ else:
+ return Action(msg=f"not a valid option: {vote} (valid: {active_poll.options})")
+ elif words[0] == "poll":
+ # parse options
+ option_a = words[1]
+ option_b = words[3]
+ # setup new poll
+ poll = Poll(option_a, option_b, {}, now + self.vote_duration)
+ # check prior results
+ if active_poll:
+ return Action(msg="There is already an active poll.")
+ else:
+ prior_poll = self.get_poll(poll.key)
+ if prior_poll and prior_poll.votes:
+ date = datetime.fromtimestamp(prior_poll.ending_at)
+ return Action(msg=f"This was already decided at {date}: {prior_poll.status_report()}")
+
+ # enable poll
+ self.start_poll(poll)
+ # setup timeout to disable poll and present results
+ bot_nick = conf_get("bot_nickname")
+ poll_message = f"**New Vote:** {sender} started a vote! Vote for A: {option_a} or B: {option_b} (reply with '{bot_nick}: vote foo' within {self.vote_duration}s)"
+ return Action(
+ msg=poll_message,
+ event=Action(time=now + self.vote_duration, command="nick.pollstatus",
+ msg=f'{sender}: Poll is over!', mutex=f'poll')
+ )
+ elif words[0] == "pollstatus":
+ return self.check_poll()
+
+ def get_active_poll(self) -> Poll:
+ active_key = self.db.get(self.KEY_ACTIVE)
+ if active_key:
+ return self.get_poll(active_key)
+
+ def get_poll(self, key: str):
+ poll_bin = self.db.get(key.encode())
+ poll = json.loads(poll_bin.decode("utf-8"))
+ return Poll.from_json(poll)
+
+ def persist(self, poll: Poll):
+ self.db.put(poll.key.encode(), poll.to_json().encode())
+
+ def start_poll(self, poll: Poll):
+ key = poll.key.encode()
+ self.db.put(key, poll.to_json().encode())
+ self.db.put(self.KEY_ACTIVE, key)
+
+ def close_poll(self, key: str):
+ active_key = self.db.get(self.KEY_ACTIVE)
+ if active_key == key.encode():
+ self.db.delete(self.KEY_ACTIVE)
+
+ def check_poll(self):
+ poll = self.get_active_poll()
+ if not poll:
+ return Action(msg="no active poll")
+ if poll.due():
+ return self.end_poll()
+ else:
+ return Action(msg=poll.status_report())
+
+ def end_poll(self) -> Action | None:
+ poll = self.get_active_poll()
+ if not poll:
+ return
+ self.close_poll(poll.key)
+ report = poll.status_report()
+ return Action(msg=report)
+
+
+ALL = [VotePoll]