tg-bot/backend/app/services/match_store.py
2025-12-04 09:52:39 +08:00

58 lines
1.7 KiB
Python

from __future__ import annotations
import asyncio
from collections import deque
from dataclasses import dataclass
from datetime import datetime
from typing import Deque, List, Set
from .. import config
from ..schemas.match import MatchResult
@dataclass
class StoredMatch:
timestamp: datetime
source_chat: str
matched_words: List[str]
contexts: List[str]
message_preview: str
def as_model(self) -> MatchResult:
return MatchResult(
timestamp=self.timestamp,
source_chat=self.source_chat,
matched_words=self.matched_words,
contexts=self.contexts,
message_preview=self.message_preview,
)
class MatchStore:
"""Keeps the recent match history and broadcasts new matches."""
def __init__(self, limit: int = config.MATCH_HISTORY_LIMIT) -> None:
self._history: Deque[StoredMatch] = deque(maxlen=limit)
self._subscribers: Set[asyncio.Queue] = set()
self._lock = asyncio.Lock()
async def add_match(self, match: StoredMatch) -> None:
async with self._lock:
self._history.appendleft(match)
for queue in list(self._subscribers):
await queue.put(match.as_model())
async def get_recent(self, limit: int = 50) -> List[MatchResult]:
async with self._lock:
return [item.as_model() for item in list(self._history)[:limit]]
async def subscribe(self) -> asyncio.Queue:
queue: asyncio.Queue = asyncio.Queue()
async with self._lock:
self._subscribers.add(queue)
return queue
async def unsubscribe(self, queue: asyncio.Queue) -> None:
async with self._lock:
self._subscribers.discard(queue)