Skip to content

Instantly share code, notes, and snippets.

@Bibo-Joshi
Forked from jsmnbom/create_ptb_test_bot.py
Last active November 6, 2022 20:55
Show Gist options
  • Save Bibo-Joshi/75f135edf1ca3530decf4c2ae06bd699 to your computer and use it in GitHub Desktop.
Save Bibo-Joshi/75f135edf1ca3530decf4c2ae06bd699 to your computer and use it in GitHub Desktop.
Put in python-telegram-bot/tests with API_ID and API_HASH (from my.telegram.org) as envvars. Customize the bot to be created in main(). Doesn't create video sticker set yet. Tested with `python-telegram-bot==20.0a4` and `telethon==1.25.4`. ⚠️ The script needs to be updated to make the bot also an admin in the forum test group (chat id -100161915…
import asyncio
import itertools
import json
import os
import re
import urllib.parse
import itertools
from telethon import TelegramClient
from telethon.utils import get_input_peer
from telethon.tl.functions.messages import (
AddChatUserRequest,
CreateChatRequest,
MigrateChatRequest,
)
from telethon.tl.functions.channels import InviteToChannelRequest
from telegram import Bot
from telegram.request import HTTPXRequest
api_id = int(os.getenv('API_ID'))
api_hash = os.getenv('API_HASH')
session_file = os.path.basename(__file__) + '.session'
BOT_FATHER = "@BotFather"
loop = asyncio.get_event_loop()
async def send_message_check_reply(conv, text, reply):
await conv.send_message(text)
msg = await conv.get_response()
if not reply in msg.text:
print('Error: "{}" not found in "{}"'.format(reply, msg.text))
raise AssertionError
await asyncio.sleep(1)
return msg
async def send_file_check_reply(conv, file, reply):
await conv.send_file(file)
msg = await conv.get_response()
if not reply in msg.text:
print('Error: "{}" not found in "{}"'.format(reply, msg.text))
raise AssertionError
await asyncio.sleep(1)
return msg
async def click_check_edit(conv, msg, text_filter, edit):
for button in itertools.chain.from_iterable(msg.buttons):
if text_filter in button.text:
await button.click()
edited_msg = await conv.get_edit()
if not edit in edited_msg.text:
print('Error: "{}" not found in "{}"'.format(edit, edited_msg.text))
raise AssertionError
await asyncio.sleep(1)
return edited_msg
else:
return await click_check_edit(conv, msg, "»", edit)
async def create_bot(client, name, username):
print("Creating bot")
async with client.conversation(BOT_FATHER) as conv:
await send_message_check_reply(conv, "/newbot", "Please choose a name for your bot.")
await send_message_check_reply(conv, name, "Now let's choose a username for your bot.")
msg = await send_message_check_reply(
conv, username, "Done! Congratulations on your new bot."
)
token_search = re.search(r"`(.+)`", msg.text)
assert token_search
token = token_search.group(1)
assert token
return token
async def set_description(client, username, description):
print("Setting description")
async with client.conversation(BOT_FATHER) as conv:
await send_message_check_reply(conv, "/setdescription", "Choose a bot")
await send_message_check_reply(conv, username, "Send me the new description for the bot.")
await send_message_check_reply(conv, description, "Success!")
async def set_about_text(client, username, about_text):
print("Setting about_text")
async with client.conversation(BOT_FATHER) as conv:
await send_message_check_reply(conv, "/setabouttext", "Choose a bot")
await send_message_check_reply(conv, username, "Send me the new 'About' text.")
await send_message_check_reply(conv, about_text, "Success!")
async def set_join_groups(client, username, join_groups):
print("Setting join groups")
async with client.conversation(BOT_FATHER) as conv:
await send_message_check_reply(conv, "/setjoingroups", "Choose a bot")
await send_message_check_reply(conv, username, "Current status is")
await send_message_check_reply(conv, join_groups, "Success!")
async def create_super_group(client, username, group_title):
print("Creating new super group.")
r = CreateChatRequest(users=[username], title=group_title)
result = await client(r)
await asyncio.sleep(1)
group_id = result.updates[-1].peer.chat_id
r = MigrateChatRequest(group_id)
result = await client(r)
await asyncio.sleep(1)
return result.updates[0].channel_id
async def add_to_group(client, group, username):
print("Adding to group")
r = InviteToChannelRequest(group, [username])
await client(r)
await asyncio.sleep(1)
async def make_group_admin(client, group, username):
print("Making group admin")
await client.edit_admin(group, username, add_admins=True, is_admin=True)
await asyncio.sleep(1)
async def add_to_channel(client, channel, username):
print("Setting bot as channel admin")
await client.edit_admin(
channel,
username,
change_info=True,
post_messages=True,
edit_messages=True,
delete_messages=True,
invite_users=True,
)
await asyncio.sleep(1)
async def set_inline(client, username, inline_placeholder):
print("Setting inline")
async with client.conversation(BOT_FATHER) as conv:
await send_message_check_reply(conv, "/setinline", "Choose a bot")
await send_message_check_reply(conv, username, "please send me the placeholder message")
await send_message_check_reply(conv, inline_placeholder, "Success!")
async def new_game(
client, username, game_title, game_description, game_photo, game_animation, game_short_name
):
print("Creating game")
async with client.conversation(BOT_FATHER) as conv:
await send_message_check_reply(conv, "/newgame", "Please read our Rules carefully")
await send_message_check_reply(conv, "OK", "By accepting these Rules, you agree that")
await send_message_check_reply(conv, "Accept", "Which bot will be offering the game?")
await send_message_check_reply(conv, username, "Please enter a title")
await send_message_check_reply(conv, game_title, "Please enter a short description")
await send_message_check_reply(
conv, game_description, "Please upload a photo, 640x360 pixels."
)
await send_file_check_reply(conv, game_photo, "Now upload a demo GIF")
await send_file_check_reply(conv, game_animation, "Now please choose a short name")
await send_message_check_reply(
conv, game_short_name, "Open it to start developing the game!"
)
async def set_payments(client, username):
print("Setting up payments")
async with client.conversation(BOT_FATHER) as conv:
msg = await send_message_check_reply(conv, "/mybots", "Choose a bot")
msg = await click_check_edit(conv, msg, username, "Here it is")
msg = await click_check_edit(conv, msg, "Payments", "Payment providers for")
msg = await click_check_edit(conv, msg, "Stripe", "Stripe")
for button in itertools.chain.from_iterable(msg.buttons):
if button.text == "Connect Stripe Test":
break
else:
print(
'Error: No "Connect Stripe Test" '
"button found in {}".format(msg.reply_markup.to_dict())
)
raise AssertionError
stripe_test_bot_url = button.url
_, _, path, query, _ = urllib.parse.urlsplit(stripe_test_bot_url)
bot = path.replace("/", "@")
start = urllib.parse.parse_qs(query)["start"][0]
async with client.conversation(bot) as stripe_conv:
msg = await send_message_check_reply(
stripe_conv, "/start {}".format(start), "Please use this button"
)
stripe_test_url = msg.buttons[0][0].url
print(
"Please go to this url in your browser:\n{}\n"
'Once there, press the "Skip this account form" at the top of the viewport, '
"and then just close the window/tab."
"You have 60 seconds to do this.".format(stripe_test_url)
)
msg = await conv.get_edit(timeout=60)
token_search = re.search(r"\*\*Stripe Test\*\*: `(.+)`", msg.text)
assert token_search
token = token_search.group(1)
assert token
return token
async def new_sticker_set(
client, token, sticker_set_name, sticker_set_title, sticker, sticker_emoji, animated=False
):
print("Creating new {}sticker set.".format("animated " if animated else ""))
me = await client.get_me()
bot = Bot(
token,
request=HTTPXRequest(
read_timeout=20, connect_timeout=20, write_timeout=20, pool_timeout=20
),
)
async with bot:
with open(sticker, "rb") as f:
if animated:
assert await bot.create_new_sticker_set(
me.id, sticker_set_name, sticker_set_title, tgs_sticker=f, emojis=sticker_emoji
)
else:
assert await bot.create_new_sticker_set(
me.id, sticker_set_name, sticker_set_title, png_sticker=f, emojis=sticker_emoji
)
async def new_bot(
client,
name,
username,
description=None,
about_text=None,
group_id=None,
group_name=None,
group_admin=None,
channel_id=None,
inline_placeholder=None,
game_title=None,
game_description=None,
game_photo=None,
game_animation=None,
game_short_name=None,
sticker_set_name=None,
animated_sticker_set_name=None,
sticker_set_title=None,
animated_sticker_set_title=None,
sticker=None,
animated_sticker=None,
sticker_emoji=None,
token=None,
payment_provider_token=None,
):
print("Creating bot")
# Create the bot
if token is None:
token = await create_bot(client, name, username)
# Allow the bot to talk to us
await client.send_message(username, "/start")
username = "@" + username
# Set texts in case a user stumbles upon the bot
if description:
await set_description(client, username, description)
if about_text:
await set_about_text(client, username, about_text)
# Make sure the new bot can join groups
if group_id or channel_id:
await set_join_groups(client, username, "Enable")
# Add it to the developer group
if group_id:
await add_to_group(client, group_id, username)
if group_name:
new_group_id = await create_super_group(client, username, group_name)
else:
new_group_id = group_id
# Make it admin in the group
if group_admin:
if group_id:
await make_group_admin(client, group_id, username)
if new_group_id:
await make_group_admin(client, new_group_id, username)
# Add it to the testing channel
if channel_id:
await add_to_channel(client, channel_id, username)
# Turn bot joining off, since we no longer need it
if group_id or channel_id:
await set_join_groups(client, username, "Disable")
# Turn on inline so we can create a game for the bot
if inline_placeholder:
await set_inline(client, username, inline_placeholder)
# Create a game for the bot
if game_title and game_description and game_photo and game_animation and game_short_name:
await new_game(
client,
username,
game_title,
game_description,
game_photo,
game_animation,
game_short_name,
)
# Now we need to setup payment.
if payment_provider_token is None:
payment_provider_token = await set_payments(client, username)
# Create a sticker set for this bot
if sticker_set_name and sticker_set_title and sticker and sticker_emoji:
await new_sticker_set(
client, token, sticker_set_name, sticker_set_title, sticker, sticker_emoji
)
# Create an animated sticker set for this bot
if (
animated_sticker_set_name
and animated_sticker_set_title
and animated_sticker
and sticker_emoji
):
await new_sticker_set(
client,
token,
animated_sticker_set_name,
animated_sticker_set_title,
animated_sticker,
sticker_emoji,
animated=True,
)
return token, payment_provider_token, new_group_id
def print_bot(name, username, token, payment_provider_token, super_group_id):
print()
print("-" * 80)
print("Name:", name)
print("Username: @{}".format(username))
print("Link: https://t.me/{}".format(username))
data = {
"token": token,
"payment_provider_token": payment_provider_token,
"name": name,
"username": username,
"super_group_id": super_group_id,
}
print(json.dumps(data, indent=4))
async def main():
async with TelegramClient(session_file, api_id, api_hash) as client:
# Bot id/index
i = 0
# Fill session cache with chats that we can see
# We need this since otherwise we don't know the access
# hash of the group and channel
await client.get_dialogs()
name = "PTB Test Bot [{}]".format(i)
username = "ptb_test_{:02d}_bot".format(i)
description = (
"This bot is only for running tests for python-telegram-bot"
"and has no actual functionality."
)
token, payment_provider_token, super_group_id = await new_bot(
client,
name=name,
username=username,
description=description,
about_text=description,
# Pass a group id, if there is a group set up for the bot
# group_id=-1001493296829,
# Pass a group name to create a new super group
group_name=">>> telegram.Bot(test) @{}".format(username),
group_admin=True,
channel_id=1062610360,
inline_placeholder="This bot is only for running tests.",
game_title="Python-telegram-bot test game",
game_description="A no-op test game, for python-telegram-bot bot framework testing.",
game_photo="tests/data/game.png",
game_animation="tests/data/game.gif",
game_short_name="test_game",
sticker_set_name="test_by_{}".format(username),
animated_sticker_set_name="animated_test_by_{}".format(username),
sticker_set_title="Test",
animated_sticker_set_title="Animated Test",
sticker="tests/data/telegram_sticker.png",
animated_sticker="tests/data/telegram_animated_sticker.tgs",
sticker_emoji="😄",
)
print_bot(name, username, token, payment_provider_token, super_group_id)
if __name__ == "__main__":
import logging
logging.basicConfig(level=logging.INFO)
loop.run_until_complete(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment