tg-bot/backend/app/services/scanner_manager.py
2025-12-04 09:52:39 +08:00

164 lines
6.4 KiB
Python

from __future__ import annotations
import asyncio
from datetime import datetime
from typing import Dict, Optional
import regex
from telethon import TelegramClient, events
from ..schemas.scanner import ScannerStatus
from .config_repo import ConfigRepository
from .match_store import MatchStore, StoredMatch
from .pattern_service import PatternService
class ScannerManager:
"""Controls the lifecycle of the Telethon listener."""
def __init__(
self,
repo: ConfigRepository,
pattern_service: PatternService,
match_store: MatchStore,
) -> None:
self._repo = repo
self._pattern_service = pattern_service
self._match_store = match_store
self._client: Optional[TelegramClient] = None
self._runner: Optional[asyncio.Task] = None
self._lock = asyncio.Lock()
self._last_started: Optional[datetime] = None
self._last_error: Optional[str] = None
self._current_channels: list[str] = []
self._current_keywords: int = 0
async def start(self) -> None:
async with self._lock:
if self._runner and not self._runner.done():
return
credentials = self._repo.get_active_credentials()
if not credentials:
raise RuntimeError("Credentials not configured.")
channels = self._repo.read_channels()
if not channels:
raise RuntimeError("No channels configured.")
if not credentials.channel_id:
raise RuntimeError("channel_id missing from credentials.")
patterns = self._pattern_service.load_patterns()
self._current_channels = channels
self._current_keywords = len(patterns)
client = TelegramClient(
credentials.username or "anon_session",
credentials.api_id,
credentials.api_hash,
)
await client.start(phone=credentials.phone)
handler = self._build_handler(patterns, credentials.channel_id)
client.add_event_handler(handler, events.NewMessage(chats=channels))
self._client = client
self._last_started = datetime.utcnow()
self._runner = asyncio.create_task(self._run(client, channels, credentials.channel_id))
async def _run(
self, client: TelegramClient, channels: list[str], channel_id: int
) -> None:
try:
await client.send_message(channel_id, f"Listening to {', '.join(channels)}...")
await client.run_until_disconnected()
except Exception as exc: # pragma: no cover - telemetry only
self._last_error = str(exc)
raise
def _build_handler(
self, word_patterns: Dict[str, regex.Pattern], target_channel_id: int
):
async def handler(event):
try:
message_content = event.message.message if event.message else ""
word_counts = {}
first_match_contexts = {}
for word, pattern in word_patterns.items():
matches = list(pattern.finditer(message_content))
if matches:
count = len(matches)
word_counts[word] = count
if len(first_match_contexts) < 3 and word not in first_match_contexts:
first_match = matches[0]
start_pos = first_match.start()
end_pos = first_match.end()
context_start = max(start_pos - 20, 0)
context_end = min(end_pos + 20, len(message_content))
context = message_content[context_start:context_end]
first_match_contexts[word] = context
if not word_counts:
return
matched_words_with_counts = []
for word, count in word_counts.items():
display = f"{word} ({count})" if count > 1 else word
display = PatternService.escape_markdown(display)
matched_words_with_counts.append(display)
matched_words_str = ", ".join(matched_words_with_counts)
contexts = []
for word, context in first_match_contexts.items():
word_escaped = PatternService.escape_markdown(word)
contexts.append(f"{word_escaped}: {context}")
contexts_str = ";\n".join(contexts)
context_label = "First three different match contexts"
message_text = f"Keyword Match from {event.chat.title}: {matched_words_str}"
if contexts:
message_text += f"\n{context_label}:\n{contexts_str}"
else:
message_text += "\nNo contexts available."
await event.client.send_message(target_channel_id, message_text)
await asyncio.sleep(0.1)
await event.message.forward_to(target_channel_id)
await asyncio.sleep(0.5)
preview = message_content[:200].replace("\n", " ")
stored = StoredMatch(
timestamp=datetime.utcnow(),
source_chat=event.chat.title if event.chat else "Unknown",
matched_words=list(word_counts.keys()),
contexts=contexts,
message_preview=preview,
)
await self._match_store.add_match(stored)
except Exception as exc:
self._last_error = str(exc)
return handler
async def stop(self) -> None:
async with self._lock:
if not self._runner:
return
if self._client:
await self._client.disconnect()
self._runner.cancel()
try:
await self._runner
except asyncio.CancelledError:
pass
finally:
self._runner = None
self._client = None
async def status(self) -> ScannerStatus:
running = self._runner is not None and not self._runner.done()
return ScannerStatus(
running=running,
channels=self._current_channels,
keywords=self._current_keywords,
last_started=self._last_started,
last_error=self._last_error,
)