Restructured code in regex matching for better performance

This commit is contained in:
afolivieri 2024-10-09 17:58:41 +03:00
parent edbef97bdc
commit 99d6675688

View File

@ -4,6 +4,10 @@ import os
import asyncio import asyncio
import signal import signal
from telethon import TelegramClient, events from telethon import TelegramClient, events
import logging
import traceback
import platform
import sys
# Configuration files # Configuration files
CREDENTIALS_FILE = 'credentials.json' CREDENTIALS_FILE = 'credentials.json'
@ -11,27 +15,40 @@ KEYWORDS_FILE = 'keywords.txt'
CHANNELS_FILE = 'channels.txt' CHANNELS_FILE = 'channels.txt'
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler("bot.log"),
logging.StreamHandler()
]
)
async def get_credentials(): async def get_credentials():
if os.path.exists(CREDENTIALS_FILE): try:
with open(CREDENTIALS_FILE, 'r') as file: if os.path.exists(CREDENTIALS_FILE):
credentials = json.load(file) with open(CREDENTIALS_FILE, 'r') as file:
if not credentials.get('channel_id'): credentials = json.load(file)
if not credentials.get('channel_id'):
credentials = await fetch_channel_id(credentials)
with open(CREDENTIALS_FILE, 'w') as file:
json.dump(credentials, file, indent=4)
else:
credentials = {
'api_id': input('Enter Telegram API ID: ').strip(),
'api_hash': input('Enter Telegram API Hash: ').strip(),
'phone': input('Enter phone number: ').strip(),
'username': input('Enter username (optional, press enter to skip): ').strip() or None,
'bot_token': input('Enter Telegram bot token: ').strip(),
'channel_id': None
}
credentials = await fetch_channel_id(credentials) credentials = await fetch_channel_id(credentials)
with open(CREDENTIALS_FILE, 'w') as file: with open(CREDENTIALS_FILE, 'w') as file:
json.dump(credentials, file, indent=4) json.dump(credentials, file, indent=4)
else: return credentials
credentials = { except Exception as e:
'api_id': input('Enter Telegram API ID: ').strip(), logging.error(f"Error in get_credentials: {e}")
'api_hash': input('Enter Telegram API Hash: ').strip(), raise
'phone': input('Enter phone number: ').strip(),
'username': input('Enter username (optional, press enter to skip): ').strip() or None,
'bot_token': input('Enter Telegram bot token: ').strip(),
'channel_id': None
}
credentials = await fetch_channel_id(credentials)
with open(CREDENTIALS_FILE, 'w') as file:
json.dump(credentials, file, indent=4)
return credentials
async def fetch_channel_id(credentials): async def fetch_channel_id(credentials):
@ -61,6 +78,7 @@ def load_patterns():
emoji_pattern = r'[\U0001F600-\U0001F64F\U0001F300-\U0001F5FF\U0001F680-\U0001F6FF\U0001F1E0-\U0001F1FF]' emoji_pattern = r'[\U0001F600-\U0001F64F\U0001F300-\U0001F5FF\U0001F680-\U0001F6FF\U0001F1E0-\U0001F1FF]'
for word in keywords: for word in keywords:
original_word = word
if word.endswith('**'): if word.endswith('**'):
word = word[:-2] word = word[:-2]
pattern = rf'(?i)(?:{emoji_pattern})*{regex.escape(word)}\p{{L}}{{0,6}}' pattern = rf'(?i)(?:{emoji_pattern})*{regex.escape(word)}\p{{L}}{{0,6}}'
@ -75,49 +93,69 @@ def load_patterns():
pattern = rf'(?i)(?:{emoji_pattern})*\d{{0,3}}{regex.escape(word)}' pattern = rf'(?i)(?:{emoji_pattern})*\d{{0,3}}{regex.escape(word)}'
else: else:
pattern = rf'(?i)(?:{emoji_pattern})*{regex.escape(word)}' pattern = rf'(?i)(?:{emoji_pattern})*{regex.escape(word)}'
word_patterns[word] = pattern try:
compiled = regex.compile(pattern)
word_patterns[original_word] = compiled
except regex.error as e:
logging.error(f'Invalid regex pattern for word "{word}": {e}')
return word_patterns return word_patterns
def signal_handler(signal, frame):
print('Detected Ctrl+C! Gracefully shutting down.') async def shutdown(signal, client, loop):
exit(0) logging.info(f"Received exit signal {signal.name}...")
await client.disconnect()
loop.stop()
async def main(): async def main():
creds = await get_credentials()
client = TelegramClient(creds['username'] or 'anon_session', creds['api_id'], creds['api_hash'])
await client.start(phone=creds['phone'])
word_patterns = load_patterns()
channels = load_entries_from_file(CHANNELS_FILE)
channel_id = creds['channel_id'] # Use the channel ID from credentials
await client.send_message(channel_id, f"Listening to {', '.join(channels)}...")
@client.on(events.NewMessage(chats=channels))
async def handler(event):
message_content = event.message.message if event.message else ""
for word, pattern in word_patterns.items():
match = regex.search(pattern, message_content)
if match:
start_pos = max(match.start() - 20, 0)
end_pos = min(match.end() + 20, len(message_content))
context = message_content[start_pos:end_pos]
await client.send_message(channel_id, f"Keyword Match: {word}\nContext: {context}")
await asyncio.sleep(0.1)
await event.message.forward_to(channel_id)
await asyncio.sleep(0.1)
print(f'Forwarded Message: {message_content}')
break
print(f"Listening to {', '.join(channels)}...")
signal.signal(signal.SIGINT, signal_handler)
try: try:
creds = await get_credentials()
client = TelegramClient(creds['username'] or 'anon_session', creds['api_id'], creds['api_hash'])
await client.start(phone=creds['phone'])
word_patterns = load_patterns()
channels = load_entries_from_file(CHANNELS_FILE)
channel_id = creds['channel_id'] # Use the channel ID from credentials
await client.send_message(channel_id, f"Listening to {', '.join(channels)}...")
@client.on(events.NewMessage(chats=channels))
async def handler(event):
try:
message_content = event.message.message if event.message else ""
for word, pattern in word_patterns.items():
for match in pattern.finditer(message_content):
start_pos = max(match.start() - 20, 0)
end_pos = min(match.end() + 20, len(message_content))
context = message_content[start_pos:end_pos]
await client.send_message(channel_id, f"Keyword Match: {word}\nContext: {context}")
await asyncio.sleep(0.1)
await event.message.forward_to(channel_id)
await asyncio.sleep(0.5)
print(f'Forwarded Message: {message_content}')
break
except Exception as e:
logging.error(f"Error in message handler: {e}")
logging.info(f"Listening to {', '.join(channels)}...")
loop = asyncio.get_event_loop()
if platform.system() != 'Windows':
for sig in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(sig, lambda s=sig: asyncio.create_task(shutdown(s, client, loop)))
else:
try:
await client.run_until_disconnected()
except KeyboardInterrupt:
await shutdown(signal.SIGINT, client, loop)
await client.run_until_disconnected() await client.run_until_disconnected()
except Exception as e:
logging.error(f"Error in main: {e}")
logging.error(traceback.format_exc())
finally: finally:
print("Disconnecting client...") logging.info("Disconnecting client...")
await client.disconnect() await client.disconnect()
print("Client disconnected safely.") logging.info("Client disconnected safely.")
if __name__ == '__main__': if __name__ == '__main__':