Skip to content

Instantly share code, notes, and snippets.

@Forevka
Created April 19, 2021 07:26
Show Gist options
  • Save Forevka/02e0a4421b343adf60d9b9f12735ccf9 to your computer and use it in GitHub Desktop.
Save Forevka/02e0a4421b343adf60d9b9f12735ccf9 to your computer and use it in GitHub Desktop.
import os
import sys
import time
from typing import TypeVar
from dataclasses import dataclass
from telethon import TelegramClient, events
import datetime
from telethon import types
import logging
logging.basicConfig(level=logging.WARNING)
T = TypeVar('T')
@dataclass
class ActiveChat:
chat_id: int
my_last_message_date: datetime.datetime
my_last_message_text: str
my_last_message_id: int
is_my_message_last: bool
active_time: datetime.timedelta
def get_env(name: str, message, cast = str) -> str:
if name in os.environ:
return os.environ[name]
while True:
value = input(message)
try:
return cast(value)
except ValueError as e:
print(e, file=sys.stderr)
time.sleep(1)
client = TelegramClient(
os.environ.get('TG_SESSION', 'replier'),
get_env('TG_API_ID', 'Enter your API ID: ', int),
get_env('TG_API_HASH', 'Enter your API hash: '),
proxy=None
)
active_chats = []
@events.register(events.NewMessage(outgoing=False))
async def handler_everybody(event):
if (isinstance(event.message.peer_id, types.PeerUser)):
current_chat_data: ActiveChat = next((i for i in active_chats if i.chat_id == event.message.peer_id.user_id), None)
if (current_chat_data):
current_chat_data.is_my_message_last = False
@events.register(events.NewMessage(outgoing=True,))
async def handler_my(event):
if (isinstance(event.message.peer_id, types.PeerUser)):
current_chat_data: ActiveChat = next((i for i in active_chats if i.chat_id == event.message.peer_id.user_id), ActiveChat(event.message.peer_id.user_id, event.message.date, event.message.message, event.message.id, True, datetime.timedelta(seconds=30)))
if (current_chat_data and current_chat_data.is_my_message_last
and current_chat_data.my_last_message_id != event.message.id
and event.message.date <= current_chat_data.my_last_message_date + current_chat_data.active_time):
latest_text = event.message.message
current_chat_data.my_last_message_text += '\n' + latest_text
current_chat_data.my_last_message_date = event.message.date
await client.delete_messages(None, event.message.id,)
await client.edit_message(event.message.peer_id, current_chat_data.my_last_message_id, current_chat_data.my_last_message_text)
else:
if (current_chat_data.my_last_message_id == event.message.id):
active_chats.append(
current_chat_data
)
else:
current_chat_data.my_last_message_id = event.message.id
current_chat_data.my_last_message_text = event.message.message
current_chat_data.my_last_message_date = event.message.date
with client:
# This remembers the events.NewMessage we registered before
client.add_event_handler(handler_everybody)
client.add_event_handler(handler_my)
print('(Press Ctrl+C to stop this)')
client.run_until_disconnected()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment