-
-
Save Bibo-Joshi/75f135edf1ca3530decf4c2ae06bd699 to your computer and use it in GitHub Desktop.
Put in python-telegram-bot/tests with API_ID and API_HASH (from my.telegram.org) as envvars. Customize the bot to be created in main(). Doesn't create video sticker set yet. Tested with `python-telegram-bot==20.0a4` and `telethon==1.25.4`. ⚠️ The script needs to be updated to make the bot also an admin in the forum test group (chat id -100161915…
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import itertools | |
import json | |
import os | |
import re | |
import urllib.parse | |
import itertools | |
from telethon import TelegramClient | |
from telethon.utils import get_input_peer | |
from telethon.tl.functions.messages import ( | |
AddChatUserRequest, | |
CreateChatRequest, | |
MigrateChatRequest, | |
) | |
from telethon.tl.functions.channels import InviteToChannelRequest | |
from telegram import Bot | |
from telegram.request import HTTPXRequest | |
api_id = int(os.getenv('API_ID')) | |
api_hash = os.getenv('API_HASH') | |
session_file = os.path.basename(__file__) + '.session' | |
BOT_FATHER = "@BotFather" | |
loop = asyncio.get_event_loop() | |
async def send_message_check_reply(conv, text, reply): | |
await conv.send_message(text) | |
msg = await conv.get_response() | |
if not reply in msg.text: | |
print('Error: "{}" not found in "{}"'.format(reply, msg.text)) | |
raise AssertionError | |
await asyncio.sleep(1) | |
return msg | |
async def send_file_check_reply(conv, file, reply): | |
await conv.send_file(file) | |
msg = await conv.get_response() | |
if not reply in msg.text: | |
print('Error: "{}" not found in "{}"'.format(reply, msg.text)) | |
raise AssertionError | |
await asyncio.sleep(1) | |
return msg | |
async def click_check_edit(conv, msg, text_filter, edit): | |
for button in itertools.chain.from_iterable(msg.buttons): | |
if text_filter in button.text: | |
await button.click() | |
edited_msg = await conv.get_edit() | |
if not edit in edited_msg.text: | |
print('Error: "{}" not found in "{}"'.format(edit, edited_msg.text)) | |
raise AssertionError | |
await asyncio.sleep(1) | |
return edited_msg | |
else: | |
return await click_check_edit(conv, msg, "»", edit) | |
async def create_bot(client, name, username): | |
print("Creating bot") | |
async with client.conversation(BOT_FATHER) as conv: | |
await send_message_check_reply(conv, "/newbot", "Please choose a name for your bot.") | |
await send_message_check_reply(conv, name, "Now let's choose a username for your bot.") | |
msg = await send_message_check_reply( | |
conv, username, "Done! Congratulations on your new bot." | |
) | |
token_search = re.search(r"`(.+)`", msg.text) | |
assert token_search | |
token = token_search.group(1) | |
assert token | |
return token | |
async def set_description(client, username, description): | |
print("Setting description") | |
async with client.conversation(BOT_FATHER) as conv: | |
await send_message_check_reply(conv, "/setdescription", "Choose a bot") | |
await send_message_check_reply(conv, username, "Send me the new description for the bot.") | |
await send_message_check_reply(conv, description, "Success!") | |
async def set_about_text(client, username, about_text): | |
print("Setting about_text") | |
async with client.conversation(BOT_FATHER) as conv: | |
await send_message_check_reply(conv, "/setabouttext", "Choose a bot") | |
await send_message_check_reply(conv, username, "Send me the new 'About' text.") | |
await send_message_check_reply(conv, about_text, "Success!") | |
async def set_join_groups(client, username, join_groups): | |
print("Setting join groups") | |
async with client.conversation(BOT_FATHER) as conv: | |
await send_message_check_reply(conv, "/setjoingroups", "Choose a bot") | |
await send_message_check_reply(conv, username, "Current status is") | |
await send_message_check_reply(conv, join_groups, "Success!") | |
async def create_super_group(client, username, group_title): | |
print("Creating new super group.") | |
r = CreateChatRequest(users=[username], title=group_title) | |
result = await client(r) | |
await asyncio.sleep(1) | |
group_id = result.updates[-1].peer.chat_id | |
r = MigrateChatRequest(group_id) | |
result = await client(r) | |
await asyncio.sleep(1) | |
return result.updates[0].channel_id | |
async def add_to_group(client, group, username): | |
print("Adding to group") | |
r = InviteToChannelRequest(group, [username]) | |
await client(r) | |
await asyncio.sleep(1) | |
async def make_group_admin(client, group, username): | |
print("Making group admin") | |
await client.edit_admin(group, username, add_admins=True, is_admin=True) | |
await asyncio.sleep(1) | |
async def add_to_channel(client, channel, username): | |
print("Setting bot as channel admin") | |
await client.edit_admin( | |
channel, | |
username, | |
change_info=True, | |
post_messages=True, | |
edit_messages=True, | |
delete_messages=True, | |
invite_users=True, | |
) | |
await asyncio.sleep(1) | |
async def set_inline(client, username, inline_placeholder): | |
print("Setting inline") | |
async with client.conversation(BOT_FATHER) as conv: | |
await send_message_check_reply(conv, "/setinline", "Choose a bot") | |
await send_message_check_reply(conv, username, "please send me the placeholder message") | |
await send_message_check_reply(conv, inline_placeholder, "Success!") | |
async def new_game( | |
client, username, game_title, game_description, game_photo, game_animation, game_short_name | |
): | |
print("Creating game") | |
async with client.conversation(BOT_FATHER) as conv: | |
await send_message_check_reply(conv, "/newgame", "Please read our Rules carefully") | |
await send_message_check_reply(conv, "OK", "By accepting these Rules, you agree that") | |
await send_message_check_reply(conv, "Accept", "Which bot will be offering the game?") | |
await send_message_check_reply(conv, username, "Please enter a title") | |
await send_message_check_reply(conv, game_title, "Please enter a short description") | |
await send_message_check_reply( | |
conv, game_description, "Please upload a photo, 640x360 pixels." | |
) | |
await send_file_check_reply(conv, game_photo, "Now upload a demo GIF") | |
await send_file_check_reply(conv, game_animation, "Now please choose a short name") | |
await send_message_check_reply( | |
conv, game_short_name, "Open it to start developing the game!" | |
) | |
async def set_payments(client, username): | |
print("Setting up payments") | |
async with client.conversation(BOT_FATHER) as conv: | |
msg = await send_message_check_reply(conv, "/mybots", "Choose a bot") | |
msg = await click_check_edit(conv, msg, username, "Here it is") | |
msg = await click_check_edit(conv, msg, "Payments", "Payment providers for") | |
msg = await click_check_edit(conv, msg, "Stripe", "Stripe") | |
for button in itertools.chain.from_iterable(msg.buttons): | |
if button.text == "Connect Stripe Test": | |
break | |
else: | |
print( | |
'Error: No "Connect Stripe Test" ' | |
"button found in {}".format(msg.reply_markup.to_dict()) | |
) | |
raise AssertionError | |
stripe_test_bot_url = button.url | |
_, _, path, query, _ = urllib.parse.urlsplit(stripe_test_bot_url) | |
bot = path.replace("/", "@") | |
start = urllib.parse.parse_qs(query)["start"][0] | |
async with client.conversation(bot) as stripe_conv: | |
msg = await send_message_check_reply( | |
stripe_conv, "/start {}".format(start), "Please use this button" | |
) | |
stripe_test_url = msg.buttons[0][0].url | |
print( | |
"Please go to this url in your browser:\n{}\n" | |
'Once there, press the "Skip this account form" at the top of the viewport, ' | |
"and then just close the window/tab." | |
"You have 60 seconds to do this.".format(stripe_test_url) | |
) | |
msg = await conv.get_edit(timeout=60) | |
token_search = re.search(r"\*\*Stripe Test\*\*: `(.+)`", msg.text) | |
assert token_search | |
token = token_search.group(1) | |
assert token | |
return token | |
async def new_sticker_set( | |
client, token, sticker_set_name, sticker_set_title, sticker, sticker_emoji, animated=False | |
): | |
print("Creating new {}sticker set.".format("animated " if animated else "")) | |
me = await client.get_me() | |
bot = Bot( | |
token, | |
request=HTTPXRequest( | |
read_timeout=20, connect_timeout=20, write_timeout=20, pool_timeout=20 | |
), | |
) | |
async with bot: | |
with open(sticker, "rb") as f: | |
if animated: | |
assert await bot.create_new_sticker_set( | |
me.id, sticker_set_name, sticker_set_title, tgs_sticker=f, emojis=sticker_emoji | |
) | |
else: | |
assert await bot.create_new_sticker_set( | |
me.id, sticker_set_name, sticker_set_title, png_sticker=f, emojis=sticker_emoji | |
) | |
async def new_bot( | |
client, | |
name, | |
username, | |
description=None, | |
about_text=None, | |
group_id=None, | |
group_name=None, | |
group_admin=None, | |
channel_id=None, | |
inline_placeholder=None, | |
game_title=None, | |
game_description=None, | |
game_photo=None, | |
game_animation=None, | |
game_short_name=None, | |
sticker_set_name=None, | |
animated_sticker_set_name=None, | |
sticker_set_title=None, | |
animated_sticker_set_title=None, | |
sticker=None, | |
animated_sticker=None, | |
sticker_emoji=None, | |
token=None, | |
payment_provider_token=None, | |
): | |
print("Creating bot") | |
# Create the bot | |
if token is None: | |
token = await create_bot(client, name, username) | |
# Allow the bot to talk to us | |
await client.send_message(username, "/start") | |
username = "@" + username | |
# Set texts in case a user stumbles upon the bot | |
if description: | |
await set_description(client, username, description) | |
if about_text: | |
await set_about_text(client, username, about_text) | |
# Make sure the new bot can join groups | |
if group_id or channel_id: | |
await set_join_groups(client, username, "Enable") | |
# Add it to the developer group | |
if group_id: | |
await add_to_group(client, group_id, username) | |
if group_name: | |
new_group_id = await create_super_group(client, username, group_name) | |
else: | |
new_group_id = group_id | |
# Make it admin in the group | |
if group_admin: | |
if group_id: | |
await make_group_admin(client, group_id, username) | |
if new_group_id: | |
await make_group_admin(client, new_group_id, username) | |
# Add it to the testing channel | |
if channel_id: | |
await add_to_channel(client, channel_id, username) | |
# Turn bot joining off, since we no longer need it | |
if group_id or channel_id: | |
await set_join_groups(client, username, "Disable") | |
# Turn on inline so we can create a game for the bot | |
if inline_placeholder: | |
await set_inline(client, username, inline_placeholder) | |
# Create a game for the bot | |
if game_title and game_description and game_photo and game_animation and game_short_name: | |
await new_game( | |
client, | |
username, | |
game_title, | |
game_description, | |
game_photo, | |
game_animation, | |
game_short_name, | |
) | |
# Now we need to setup payment. | |
if payment_provider_token is None: | |
payment_provider_token = await set_payments(client, username) | |
# Create a sticker set for this bot | |
if sticker_set_name and sticker_set_title and sticker and sticker_emoji: | |
await new_sticker_set( | |
client, token, sticker_set_name, sticker_set_title, sticker, sticker_emoji | |
) | |
# Create an animated sticker set for this bot | |
if ( | |
animated_sticker_set_name | |
and animated_sticker_set_title | |
and animated_sticker | |
and sticker_emoji | |
): | |
await new_sticker_set( | |
client, | |
token, | |
animated_sticker_set_name, | |
animated_sticker_set_title, | |
animated_sticker, | |
sticker_emoji, | |
animated=True, | |
) | |
return token, payment_provider_token, new_group_id | |
def print_bot(name, username, token, payment_provider_token, super_group_id): | |
print() | |
print("-" * 80) | |
print("Name:", name) | |
print("Username: @{}".format(username)) | |
print("Link: https://t.me/{}".format(username)) | |
data = { | |
"token": token, | |
"payment_provider_token": payment_provider_token, | |
"name": name, | |
"username": username, | |
"super_group_id": super_group_id, | |
} | |
print(json.dumps(data, indent=4)) | |
async def main(): | |
async with TelegramClient(session_file, api_id, api_hash) as client: | |
# Bot id/index | |
i = 0 | |
# Fill session cache with chats that we can see | |
# We need this since otherwise we don't know the access | |
# hash of the group and channel | |
await client.get_dialogs() | |
name = "PTB Test Bot [{}]".format(i) | |
username = "ptb_test_{:02d}_bot".format(i) | |
description = ( | |
"This bot is only for running tests for python-telegram-bot" | |
"and has no actual functionality." | |
) | |
token, payment_provider_token, super_group_id = await new_bot( | |
client, | |
name=name, | |
username=username, | |
description=description, | |
about_text=description, | |
# Pass a group id, if there is a group set up for the bot | |
# group_id=-1001493296829, | |
# Pass a group name to create a new super group | |
group_name=">>> telegram.Bot(test) @{}".format(username), | |
group_admin=True, | |
channel_id=1062610360, | |
inline_placeholder="This bot is only for running tests.", | |
game_title="Python-telegram-bot test game", | |
game_description="A no-op test game, for python-telegram-bot bot framework testing.", | |
game_photo="tests/data/game.png", | |
game_animation="tests/data/game.gif", | |
game_short_name="test_game", | |
sticker_set_name="test_by_{}".format(username), | |
animated_sticker_set_name="animated_test_by_{}".format(username), | |
sticker_set_title="Test", | |
animated_sticker_set_title="Animated Test", | |
sticker="tests/data/telegram_sticker.png", | |
animated_sticker="tests/data/telegram_animated_sticker.tgs", | |
sticker_emoji="😄", | |
) | |
print_bot(name, username, token, payment_provider_token, super_group_id) | |
if __name__ == "__main__": | |
import logging | |
logging.basicConfig(level=logging.INFO) | |
loop.run_until_complete(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment