from __future__ import annotations import asyncio from datetime import datetime from typing import Dict, Optional import regex from telethon import TelegramClient, events from ..schemas.scanner import ScannerStatus from .config_repo import ConfigRepository from .match_store import MatchStore, StoredMatch from .pattern_service import PatternService class ScannerManager: """Controls the lifecycle of the Telethon listener.""" def __init__( self, repo: ConfigRepository, pattern_service: PatternService, match_store: MatchStore, ) -> None: self._repo = repo self._pattern_service = pattern_service self._match_store = match_store self._client: Optional[TelegramClient] = None self._runner: Optional[asyncio.Task] = None self._lock = asyncio.Lock() self._last_started: Optional[datetime] = None self._last_error: Optional[str] = None self._current_channels: list[str] = [] self._current_keywords: int = 0 async def start(self) -> None: async with self._lock: if self._runner and not self._runner.done(): return credentials = self._repo.get_active_credentials() if not credentials: raise RuntimeError("Credentials not configured.") channels = self._repo.read_channels() if not channels: raise RuntimeError("No channels configured.") if not credentials.channel_id: raise RuntimeError("channel_id missing from credentials.") patterns = self._pattern_service.load_patterns() self._current_channels = channels self._current_keywords = len(patterns) client = TelegramClient( credentials.username or "anon_session", credentials.api_id, credentials.api_hash, ) await client.start(phone=credentials.phone) handler = self._build_handler(patterns, credentials.channel_id) client.add_event_handler(handler, events.NewMessage(chats=channels)) self._client = client self._last_started = datetime.utcnow() self._runner = asyncio.create_task(self._run(client, channels, credentials.channel_id)) async def _run( self, client: TelegramClient, channels: list[str], channel_id: int ) -> None: try: await client.send_message(channel_id, f"Listening to {', '.join(channels)}...") await client.run_until_disconnected() except Exception as exc: # pragma: no cover - telemetry only self._last_error = str(exc) raise def _build_handler( self, word_patterns: Dict[str, regex.Pattern], target_channel_id: int ): async def handler(event): try: message_content = event.message.message if event.message else "" word_counts = {} first_match_contexts = {} for word, pattern in word_patterns.items(): matches = list(pattern.finditer(message_content)) if matches: count = len(matches) word_counts[word] = count if len(first_match_contexts) < 3 and word not in first_match_contexts: first_match = matches[0] start_pos = first_match.start() end_pos = first_match.end() context_start = max(start_pos - 20, 0) context_end = min(end_pos + 20, len(message_content)) context = message_content[context_start:context_end] first_match_contexts[word] = context if not word_counts: return matched_words_with_counts = [] for word, count in word_counts.items(): display = f"{word} ({count})" if count > 1 else word display = PatternService.escape_markdown(display) matched_words_with_counts.append(display) matched_words_str = ", ".join(matched_words_with_counts) contexts = [] for word, context in first_match_contexts.items(): word_escaped = PatternService.escape_markdown(word) contexts.append(f"{word_escaped}: {context}") contexts_str = ";\n".join(contexts) context_label = "First three different match contexts" message_text = f"Keyword Match from {event.chat.title}: {matched_words_str}" if contexts: message_text += f"\n{context_label}:\n{contexts_str}" else: message_text += "\nNo contexts available." await event.client.send_message(target_channel_id, message_text) await asyncio.sleep(0.1) await event.message.forward_to(target_channel_id) await asyncio.sleep(0.5) preview = message_content[:200].replace("\n", " ") stored = StoredMatch( timestamp=datetime.utcnow(), source_chat=event.chat.title if event.chat else "Unknown", matched_words=list(word_counts.keys()), contexts=contexts, message_preview=preview, ) await self._match_store.add_match(stored) except Exception as exc: self._last_error = str(exc) return handler async def stop(self) -> None: async with self._lock: if not self._runner: return if self._client: await self._client.disconnect() self._runner.cancel() try: await self._runner except asyncio.CancelledError: pass finally: self._runner = None self._client = None async def status(self) -> ScannerStatus: running = self._runner is not None and not self._runner.done() return ScannerStatus( running=running, channels=self._current_channels, keywords=self._current_keywords, last_started=self._last_started, last_error=self._last_error, )