Skip to content

Instantly share code, notes, and snippets.

@jsmnbom
Last active August 11, 2021 16:32
Show Gist options
  • Save jsmnbom/2e8044ca5cc55813a0e0380ad375b320 to your computer and use it in GitHub Desktop.
Save jsmnbom/2e8044ca5cc55813a0e0380ad375b320 to your computer and use it in GitHub Desktop.
Put in python-telegram-bot/tests with API_ID and API_HASH (from my.telegram.org) as envvars. Customize the bot to be created in main().
import asyncio
import itertools
import json
import os
import re
import urllib.parse
import itertools
from telethon import TelegramClient
from telethon.utils import get_input_peer
from telethon.tl.functions.messages import AddChatUserRequest
from telethon.tl.functions.channels import InviteToChannelRequest
from telegram import Bot
api_id = int(os.getenv('API_ID'))
api_hash = os.getenv('API_HASH')
session_file = os.path.basename(__file__) + '.session'
BOT_FATHER = '@BotFather'
loop = asyncio.get_event_loop()
async def send_message_check_reply(conv, text, reply):
await conv.send_message(text)
msg = await conv.get_response()
if not reply in msg.text:
print('Error: "{}" not found in "{}"'.format(reply, msg.text))
raise AssertionError
await asyncio.sleep(1)
return msg
async def send_file_check_reply(conv, file, reply):
await conv.send_file(file)
msg = await conv.get_response()
if not reply in msg.text:
print('Error: "{}" not found in "{}"'.format(reply, msg.text))
raise AssertionError
await asyncio.sleep(1)
return msg
async def click_check_edit(conv, msg, text_filter, edit):
for button in itertools.chain.from_iterable(msg.buttons):
if text_filter in button.text:
await button.click()
edited_msg = await conv.get_edit()
if not edit in edited_msg.text:
print('Error: "{}" not found in "{}"'.format(edit, edited_msg.text))
raise AssertionError
await asyncio.sleep(1)
return edited_msg
else:
return await click_check_edit(conv, msg, '»', edit)
async def create_bot(client, name, username):
print('Creating bot')
async with client.conversation(BOT_FATHER) as conv:
await send_message_check_reply(conv, '/newbot', 'Please choose a name for your bot.')
await send_message_check_reply(conv, name, 'Now let\'s choose a username for your bot.')
msg = await send_message_check_reply(conv, username,
'Done! Congratulations on your new bot.')
token_search = re.search(r'`(.+)`', msg.text)
assert token_search
token = token_search.group(1)
assert token
return token
async def set_description(client, username, description):
print('Setting description')
async with client.conversation(BOT_FATHER) as conv:
await send_message_check_reply(conv, '/setdescription', 'Choose a bot')
await send_message_check_reply(conv, username, 'Send me the new description for the bot.')
await send_message_check_reply(conv, description, 'Success!')
async def set_about_text(client, username, about_text):
print('Setting about_text')
async with client.conversation(BOT_FATHER) as conv:
await send_message_check_reply(conv, '/setabouttext', 'Choose a bot')
await send_message_check_reply(conv, username, 'Send me the new \'About\' text.')
await send_message_check_reply(conv, about_text, 'Success!')
async def set_join_groups(client, username, join_groups):
print('Setting join groups')
async with client.conversation(BOT_FATHER) as conv:
await send_message_check_reply(conv, '/setjoingroups', 'Choose a bot')
await send_message_check_reply(conv, username, 'Current status is')
await send_message_check_reply(conv, join_groups, 'Success!')
async def add_to_group(client, group, username):
print('Adding to group')
r = InviteToChannelRequest(group, [username])
await client(r)
await asyncio.sleep(1)
async def add_to_channel(client, channel, username):
print('Setting bot as channel admin')
await client.edit_admin(
channel,
username,
change_info=True,
post_messages=True,
edit_messages=True,
delete_messages=True,
)
await asyncio.sleep(1)
async def set_inline(client, username, inline_placeholder):
print('Setting inline')
async with client.conversation(BOT_FATHER) as conv:
await send_message_check_reply(conv, '/setinline', 'Choose a bot')
await send_message_check_reply(conv, username, 'please send me the placeholder message')
await send_message_check_reply(conv, inline_placeholder, 'Success!')
async def new_game(client, username, game_title, game_description, game_photo, game_animation,
game_short_name):
print('Creating game')
async with client.conversation(BOT_FATHER) as conv:
await send_message_check_reply(conv, '/newgame', 'Please read our Rules carefully')
await send_message_check_reply(conv, 'OK', 'By accepting these Rules, you agree that')
await send_message_check_reply(conv, 'Accept', 'Which bot will be offering the game?')
await send_message_check_reply(conv, username, 'Please enter a title')
await send_message_check_reply(conv, game_title, 'Please enter a short description')
await send_message_check_reply(conv, game_description,
'Please upload a photo, 640x360 pixels.')
await send_file_check_reply(conv, game_photo, 'Now upload a demo GIF')
await send_file_check_reply(conv, game_animation, 'Now please choose a short name')
await send_message_check_reply(conv, game_short_name,
'Open it to start developing the game!')
async def set_payments(client, username):
print('Setting up payments')
async with client.conversation(BOT_FATHER) as conv:
msg = await send_message_check_reply(conv, '/mybots', 'Choose a bot')
msg = await click_check_edit(conv, msg, username, 'Here it is')
msg = await click_check_edit(conv, msg, 'Payments',
'Payment providers for')
msg = await click_check_edit(conv, msg, 'Stripe', 'Stripe')
for button in itertools.chain.from_iterable(msg.buttons):
if button.text == 'Connect Stripe Test':
break
else:
print('Error: No "Connect Stripe Test" '
'button found in {}'.format(msg.reply_markup.to_dict()))
raise AssertionError
stripe_test_bot_url = button.url
_, _, path, query, _ = urllib.parse.urlsplit(stripe_test_bot_url)
bot = path.replace('/', '@')
start = urllib.parse.parse_qs(query)['start'][0]
async with client.conversation(bot) as stripe_conv:
msg = await send_message_check_reply(stripe_conv, '/start {}'.format(start),
'Please use this button')
stripe_test_url = msg.buttons[0][0].url
print('Please go to this url in your browser:\n{}\n'
'Once there, press the "Skip this account form" at the top of the viewport, '
'and then just close the window/tab.'
'You have 60 seconds to do this.'.format(stripe_test_url))
msg = await conv.get_edit(timeout=60)
token_search = re.search(r'\*\*Stripe Test\*\*: `(.+)`', msg.text)
assert token_search
token = token_search.group(1)
assert token
return token
async def new_sticker_set(client, token,
sticker_set_name, sticker_set_title,
sticker, sticker_emoji):
me = await client.get_me()
bot = Bot(token)
with open(sticker, 'rb') as f:
assert bot.create_new_sticker_set(me.id, sticker_set_name, sticker_set_title,
f, sticker_emoji)
async def new_bot(client,
name,
username,
description=None,
about_text=None,
group_id=None,
channel_id=None,
inline_placeholder=None,
game_title=None,
game_description=None,
game_photo=None,
game_animation=None,
game_short_name=None,
sticker_set_name=None,
sticker_set_title=None,
sticker=None,
sticker_emoji=None,
token=None,
payment_provider_token=None):
print('Creating bot')
# Create the bot
if token is None:
token = await create_bot(client, name, username)
# Allow the bot to talk to us
await client.send_message(username, '/start')
username = '@' + username
# Set texts in case a user stumbles upon the bot
if description:
await set_description(client, username, description)
if about_text:
await set_about_text(client, username, about_text)
# Make sure the new bot can join groups
if group_id or channel_id:
await set_join_groups(client, username, 'Enable')
# Add it to the developer group
if group_id:
await add_to_group(client, group_id, username)
# Add it to the testing channel
if channel_id:
await add_to_channel(client, channel_id, username)
# Turn bot joining off, since we no longer need it
if group_id or channel_id:
await set_join_groups(client, username, 'Disable')
# Turn on inline so we can create a game for the bot
if inline_placeholder:
await set_inline(client, username, inline_placeholder)
# Create a game for the bot
if game_title and game_description and game_photo and game_animation and game_short_name:
await new_game(client, username,
game_title, game_description,
game_photo, game_animation,
game_short_name)
# Now we need to setup payment.
if payment_provider_token is None:
payment_provider_token = await set_payments(client, username)
# Create a sticker set for this bot
if sticker_set_name and sticker_set_title and sticker and sticker_emoji:
await new_sticker_set(client, token,
sticker_set_name, sticker_set_title,
sticker, sticker_emoji)
return token, payment_provider_token
def print_bot(name, username, token, payment_provider_token):
print()
print('-' * 80)
print('Name:', name)
print('Username: @{}'.format(username))
print('Link: https://t.me/{}'.format(username))
data = {
'token': token,
'payment_provider_token': payment_provider_token,
'name': name,
'username': username
}
print(json.dumps(data, indent=4))
async def main():
async with TelegramClient(session_file, api_id, api_hash) as client:
# Bot id/index
i = 2
# Fill session cache with chats that we can see
# We need this since otherwise we don't know the access
# hash of the group and channel
await client.get_dialogs()
name = 'PTB tests [{}]'.format(i)
username = 'ptb_{}_bot'.format(i)
description = ('This bot is only for running tests for python-telegram-bot'
'and has no actual functionality.')
token, payment_provider_token = await new_bot(
client,
name=name,
username=username,
description=description,
about_text=description,
group_id=-1001493296829,
channel_id=1062610360,
inline_placeholder='This bot is only for running tests.',
game_title='Python-telegram-bot test game',
game_description='A no-op test game, for python-telegram-bot bot framework testing.',
game_photo='python-telegram-bot/tests/data/game.png',
game_animation='python-telegram-bot/tests/data/game.gif',
game_short_name='test_game',
sticker_set_name='test_by_{}'.format(username),
sticker_set_title='Test',
sticker='python-telegram-bot/tests/data/telegram_sticker.png',
sticker_emoji='😄'
)
print_bot(name, username, token, payment_provider_token)
if __name__ == '__main__':
import logging
logging.basicConfig(level=logging.DEBUG)
loop.run_until_complete(main())
@Bibo-Joshi
Copy link

For a version that also creates an animated sticker set, see [here](Updated gist that also creates an animated sticker set at https://gist.github.com/Bibo-Joshi/75f135edf1ca3530decf4c2ae06bd699)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment