-
-
Save jsmnbom/2e8044ca5cc55813a0e0380ad375b320 to your computer and use it in GitHub Desktop.
Put in python-telegram-bot/tests with API_ID and API_HASH (from my.telegram.org) as envvars. Customize the bot to be created in main().
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import itertools | |
import json | |
import os | |
import re | |
import urllib.parse | |
import itertools | |
from telethon import TelegramClient | |
from telethon.utils import get_input_peer | |
from telethon.tl.functions.messages import AddChatUserRequest | |
from telethon.tl.functions.channels import InviteToChannelRequest | |
from telegram import Bot | |
api_id = int(os.getenv('API_ID')) | |
api_hash = os.getenv('API_HASH') | |
session_file = os.path.basename(__file__) + '.session' | |
BOT_FATHER = '@BotFather' | |
loop = asyncio.get_event_loop() | |
async def send_message_check_reply(conv, text, reply): | |
await conv.send_message(text) | |
msg = await conv.get_response() | |
if not reply in msg.text: | |
print('Error: "{}" not found in "{}"'.format(reply, msg.text)) | |
raise AssertionError | |
await asyncio.sleep(1) | |
return msg | |
async def send_file_check_reply(conv, file, reply): | |
await conv.send_file(file) | |
msg = await conv.get_response() | |
if not reply in msg.text: | |
print('Error: "{}" not found in "{}"'.format(reply, msg.text)) | |
raise AssertionError | |
await asyncio.sleep(1) | |
return msg | |
async def click_check_edit(conv, msg, text_filter, edit): | |
for button in itertools.chain.from_iterable(msg.buttons): | |
if text_filter in button.text: | |
await button.click() | |
edited_msg = await conv.get_edit() | |
if not edit in edited_msg.text: | |
print('Error: "{}" not found in "{}"'.format(edit, edited_msg.text)) | |
raise AssertionError | |
await asyncio.sleep(1) | |
return edited_msg | |
else: | |
return await click_check_edit(conv, msg, '»', edit) | |
async def create_bot(client, name, username): | |
print('Creating bot') | |
async with client.conversation(BOT_FATHER) as conv: | |
await send_message_check_reply(conv, '/newbot', 'Please choose a name for your bot.') | |
await send_message_check_reply(conv, name, 'Now let\'s choose a username for your bot.') | |
msg = await send_message_check_reply(conv, username, | |
'Done! Congratulations on your new bot.') | |
token_search = re.search(r'`(.+)`', msg.text) | |
assert token_search | |
token = token_search.group(1) | |
assert token | |
return token | |
async def set_description(client, username, description): | |
print('Setting description') | |
async with client.conversation(BOT_FATHER) as conv: | |
await send_message_check_reply(conv, '/setdescription', 'Choose a bot') | |
await send_message_check_reply(conv, username, 'Send me the new description for the bot.') | |
await send_message_check_reply(conv, description, 'Success!') | |
async def set_about_text(client, username, about_text): | |
print('Setting about_text') | |
async with client.conversation(BOT_FATHER) as conv: | |
await send_message_check_reply(conv, '/setabouttext', 'Choose a bot') | |
await send_message_check_reply(conv, username, 'Send me the new \'About\' text.') | |
await send_message_check_reply(conv, about_text, 'Success!') | |
async def set_join_groups(client, username, join_groups): | |
print('Setting join groups') | |
async with client.conversation(BOT_FATHER) as conv: | |
await send_message_check_reply(conv, '/setjoingroups', 'Choose a bot') | |
await send_message_check_reply(conv, username, 'Current status is') | |
await send_message_check_reply(conv, join_groups, 'Success!') | |
async def add_to_group(client, group, username): | |
print('Adding to group') | |
r = InviteToChannelRequest(group, [username]) | |
await client(r) | |
await asyncio.sleep(1) | |
async def add_to_channel(client, channel, username): | |
print('Setting bot as channel admin') | |
await client.edit_admin( | |
channel, | |
username, | |
change_info=True, | |
post_messages=True, | |
edit_messages=True, | |
delete_messages=True, | |
) | |
await asyncio.sleep(1) | |
async def set_inline(client, username, inline_placeholder): | |
print('Setting inline') | |
async with client.conversation(BOT_FATHER) as conv: | |
await send_message_check_reply(conv, '/setinline', 'Choose a bot') | |
await send_message_check_reply(conv, username, 'please send me the placeholder message') | |
await send_message_check_reply(conv, inline_placeholder, 'Success!') | |
async def new_game(client, username, game_title, game_description, game_photo, game_animation, | |
game_short_name): | |
print('Creating game') | |
async with client.conversation(BOT_FATHER) as conv: | |
await send_message_check_reply(conv, '/newgame', 'Please read our Rules carefully') | |
await send_message_check_reply(conv, 'OK', 'By accepting these Rules, you agree that') | |
await send_message_check_reply(conv, 'Accept', 'Which bot will be offering the game?') | |
await send_message_check_reply(conv, username, 'Please enter a title') | |
await send_message_check_reply(conv, game_title, 'Please enter a short description') | |
await send_message_check_reply(conv, game_description, | |
'Please upload a photo, 640x360 pixels.') | |
await send_file_check_reply(conv, game_photo, 'Now upload a demo GIF') | |
await send_file_check_reply(conv, game_animation, 'Now please choose a short name') | |
await send_message_check_reply(conv, game_short_name, | |
'Open it to start developing the game!') | |
async def set_payments(client, username): | |
print('Setting up payments') | |
async with client.conversation(BOT_FATHER) as conv: | |
msg = await send_message_check_reply(conv, '/mybots', 'Choose a bot') | |
msg = await click_check_edit(conv, msg, username, 'Here it is') | |
msg = await click_check_edit(conv, msg, 'Payments', | |
'Payment providers for') | |
msg = await click_check_edit(conv, msg, 'Stripe', 'Stripe') | |
for button in itertools.chain.from_iterable(msg.buttons): | |
if button.text == 'Connect Stripe Test': | |
break | |
else: | |
print('Error: No "Connect Stripe Test" ' | |
'button found in {}'.format(msg.reply_markup.to_dict())) | |
raise AssertionError | |
stripe_test_bot_url = button.url | |
_, _, path, query, _ = urllib.parse.urlsplit(stripe_test_bot_url) | |
bot = path.replace('/', '@') | |
start = urllib.parse.parse_qs(query)['start'][0] | |
async with client.conversation(bot) as stripe_conv: | |
msg = await send_message_check_reply(stripe_conv, '/start {}'.format(start), | |
'Please use this button') | |
stripe_test_url = msg.buttons[0][0].url | |
print('Please go to this url in your browser:\n{}\n' | |
'Once there, press the "Skip this account form" at the top of the viewport, ' | |
'and then just close the window/tab.' | |
'You have 60 seconds to do this.'.format(stripe_test_url)) | |
msg = await conv.get_edit(timeout=60) | |
token_search = re.search(r'\*\*Stripe Test\*\*: `(.+)`', msg.text) | |
assert token_search | |
token = token_search.group(1) | |
assert token | |
return token | |
async def new_sticker_set(client, token, | |
sticker_set_name, sticker_set_title, | |
sticker, sticker_emoji): | |
me = await client.get_me() | |
bot = Bot(token) | |
with open(sticker, 'rb') as f: | |
assert bot.create_new_sticker_set(me.id, sticker_set_name, sticker_set_title, | |
f, sticker_emoji) | |
async def new_bot(client, | |
name, | |
username, | |
description=None, | |
about_text=None, | |
group_id=None, | |
channel_id=None, | |
inline_placeholder=None, | |
game_title=None, | |
game_description=None, | |
game_photo=None, | |
game_animation=None, | |
game_short_name=None, | |
sticker_set_name=None, | |
sticker_set_title=None, | |
sticker=None, | |
sticker_emoji=None, | |
token=None, | |
payment_provider_token=None): | |
print('Creating bot') | |
# Create the bot | |
if token is None: | |
token = await create_bot(client, name, username) | |
# Allow the bot to talk to us | |
await client.send_message(username, '/start') | |
username = '@' + username | |
# Set texts in case a user stumbles upon the bot | |
if description: | |
await set_description(client, username, description) | |
if about_text: | |
await set_about_text(client, username, about_text) | |
# Make sure the new bot can join groups | |
if group_id or channel_id: | |
await set_join_groups(client, username, 'Enable') | |
# Add it to the developer group | |
if group_id: | |
await add_to_group(client, group_id, username) | |
# Add it to the testing channel | |
if channel_id: | |
await add_to_channel(client, channel_id, username) | |
# Turn bot joining off, since we no longer need it | |
if group_id or channel_id: | |
await set_join_groups(client, username, 'Disable') | |
# Turn on inline so we can create a game for the bot | |
if inline_placeholder: | |
await set_inline(client, username, inline_placeholder) | |
# Create a game for the bot | |
if game_title and game_description and game_photo and game_animation and game_short_name: | |
await new_game(client, username, | |
game_title, game_description, | |
game_photo, game_animation, | |
game_short_name) | |
# Now we need to setup payment. | |
if payment_provider_token is None: | |
payment_provider_token = await set_payments(client, username) | |
# Create a sticker set for this bot | |
if sticker_set_name and sticker_set_title and sticker and sticker_emoji: | |
await new_sticker_set(client, token, | |
sticker_set_name, sticker_set_title, | |
sticker, sticker_emoji) | |
return token, payment_provider_token | |
def print_bot(name, username, token, payment_provider_token): | |
print() | |
print('-' * 80) | |
print('Name:', name) | |
print('Username: @{}'.format(username)) | |
print('Link: https://t.me/{}'.format(username)) | |
data = { | |
'token': token, | |
'payment_provider_token': payment_provider_token, | |
'name': name, | |
'username': username | |
} | |
print(json.dumps(data, indent=4)) | |
async def main(): | |
async with TelegramClient(session_file, api_id, api_hash) as client: | |
# Bot id/index | |
i = 2 | |
# Fill session cache with chats that we can see | |
# We need this since otherwise we don't know the access | |
# hash of the group and channel | |
await client.get_dialogs() | |
name = 'PTB tests [{}]'.format(i) | |
username = 'ptb_{}_bot'.format(i) | |
description = ('This bot is only for running tests for python-telegram-bot' | |
'and has no actual functionality.') | |
token, payment_provider_token = await new_bot( | |
client, | |
name=name, | |
username=username, | |
description=description, | |
about_text=description, | |
group_id=-1001493296829, | |
channel_id=1062610360, | |
inline_placeholder='This bot is only for running tests.', | |
game_title='Python-telegram-bot test game', | |
game_description='A no-op test game, for python-telegram-bot bot framework testing.', | |
game_photo='python-telegram-bot/tests/data/game.png', | |
game_animation='python-telegram-bot/tests/data/game.gif', | |
game_short_name='test_game', | |
sticker_set_name='test_by_{}'.format(username), | |
sticker_set_title='Test', | |
sticker='python-telegram-bot/tests/data/telegram_sticker.png', | |
sticker_emoji='😄' | |
) | |
print_bot(name, username, token, payment_provider_token) | |
if __name__ == '__main__': | |
import logging | |
logging.basicConfig(level=logging.DEBUG) | |
loop.run_until_complete(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
For a version that also creates an animated sticker set, see [here](Updated gist that also creates an animated sticker set at https://gist.github.com/Bibo-Joshi/75f135edf1ca3530decf4c2ae06bd699)