Skip to content

Instantly share code, notes, and snippets.

@Umbrien
Last active June 6, 2023 23:19
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Umbrien/5800633af5be8832f6bb6e1cee32a20f to your computer and use it in GitHub Desktop.
Save Umbrien/5800633af5be8832f6bb6e1cee32a20f to your computer and use it in GitHub Desktop.
Monitor channel and join to private channel via link
import re
from telethon import events, types
from telethon.sync import TelegramClient
from telethon.tl.functions.channels import JoinChannelRequest
from telethon.tl.functions.messages import ImportChatInviteRequest
from telethon.errors.rpcerrorlist import SessionPasswordNeededError
# Replace the values below with your own API credentials
api_id = '12345678'
api_hash = '12345678123456781234567812345678'
phone_number = '+380123456789'
PASS = 'your-account-password-if-enabled'
# Replace with the username or ID of the channel you want to monitor
channel_username = -100
client = TelegramClient('session_name', api_id, api_hash)
client.connect()
if not client.is_user_authorized():
client.send_code_request(phone_number)
try:
client.sign_in(phone_number, input(
'Enter the code sent to your phone: '))
except SessionPasswordNeededError as err:
client.sign_in(password=PASS)
# might need it on first run
# client.get_dialogs()
channel_entity = client.get_entity(channel_username)
@client.on(events.NewMessage(chats=channel_entity))
async def monitor_channel(event):
print("monitoring...")
message = event.message
print(message.message)
print(message.entities)
# will not work if the link is in hyperlink
if '+' in message.message:
invite_link_regex = r'/\+[^\s]+'
invite_link = re.findall(
invite_link_regex, message.message)[0]
hash_index = invite_link.rfind('/+') + 2
chat_hash = invite_link[hash_index:]
print("hash found: ", chat_hash)
result = await client(ImportChatInviteRequest(chat_hash))
if isinstance(result, types.ChatInviteImported):
print('Successfully joined the chat or channel.')
else:
print('Failed to join the chat or channel.')
# will work if the link is in hyperlink
if message.entities:
# Iterate over the entities to find the hyperlink entity
for entity in message.entities:
if isinstance(entity, types.MessageEntityTextUrl):
# Extract the URL from the hyperlink entity
invite_link = entity.url
print('Invite link found:', invite_link)
# Extract the hash from the invite link
hash_index = invite_link.rfind('/+') + 2
chat_hash = invite_link[hash_index:]
result = await client(ImportChatInviteRequest(chat_hash))
# sometimes may result in
# AttributeError: module 'telethon.types' has no attribute 'ChatInviteImported'
# but it will join
if isinstance(result, types.ChatInviteImported):
print('Successfully joined the chat or channel.')
else:
print('Failed to join the chat or channel.')
# Exit the loop once the first hyperlink entity is found
break
client.start()
client.run_until_disconnected()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment