from __future__ import annotations import asyncio from collections import deque from dataclasses import dataclass from datetime import datetime from typing import Deque, List, Set from .. import config from ..schemas.match import MatchResult @dataclass class StoredMatch: timestamp: datetime source_chat: str matched_words: List[str] contexts: List[str] message_preview: str def as_model(self) -> MatchResult: return MatchResult( timestamp=self.timestamp, source_chat=self.source_chat, matched_words=self.matched_words, contexts=self.contexts, message_preview=self.message_preview, ) class MatchStore: """Keeps the recent match history and broadcasts new matches.""" def __init__(self, limit: int = config.MATCH_HISTORY_LIMIT) -> None: self._history: Deque[StoredMatch] = deque(maxlen=limit) self._subscribers: Set[asyncio.Queue] = set() self._lock = asyncio.Lock() async def add_match(self, match: StoredMatch) -> None: async with self._lock: self._history.appendleft(match) for queue in list(self._subscribers): await queue.put(match.as_model()) async def get_recent(self, limit: int = 50) -> List[MatchResult]: async with self._lock: return [item.as_model() for item in list(self._history)[:limit]] async def subscribe(self) -> asyncio.Queue: queue: asyncio.Queue = asyncio.Queue() async with self._lock: self._subscribers.add(queue) return queue async def unsubscribe(self, queue: asyncio.Queue) -> None: async with self._lock: self._subscribers.discard(queue)