Skip to content

Instantly share code, notes, and snippets.

@xmoiduts
Created March 17, 2020 16:43
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save xmoiduts/fedc0309f571041d40fbcac321cde3c2 to your computer and use it in GitHub Desktop.
Save xmoiduts/fedc0309f571041d40fbcac321cde3c2 to your computer and use it in GitHub Desktop.
Micropython ESP32 Telegram Bot async API uasyncio
# Async Telegram-bot interface inspired by https://github.com/Lepeshka92/TelegaGraph .
# async examples: https://github.com/micropython/micropython-lib/blob/master/uasyncio/example_http_client.py,
# DO NOT use their ping-pong test case 'cause I failed.
import gc
import ujson
import uasyncio as asyncio
import uerrno
class Telegram_API:
def __init__(self, TOKEN):
self.hostname = 'api.telegram.org'
self.bot_token = 'bot'+TOKEN
self.kbd = {
'keyboard': [
['Good Morning', 'Nap 40 Minutes'],
['Good Night', 'HW reboot']
], #TODO: Split this out
'resize_keyboard': True,
'one_time_keyboard': False
}
self.upd = {
'offset': 0,
'limit': 1,
'timeout': 60, # Known issue: Max 50s timeout for Telegram long polling.
'allowed_updates': ['message'] #TODO: inlineQuery support ?
}
self.writer = None
self.reader = None
def setKbdKeys(self, kbd_keys: list):
self.kbd['keyboard'] = kbd_keys
def setTimeout(self, timeout: int):
self.upd['timeout'] = timeout
def dict2querystring(self, data:dict):
# in: {'limit':20, 'offset':20333, 'id':'abc'}
# out: limit=20&offset=20333&id=abc
res = ''
for key in data.keys():
res += '{}={}'.format(str(key),str(data[key]))
res += '&'
return res[:-1]
async def Request_GET(self, action, query_string) -> dict:
# send a GET request to telegram server
# return an http response with json content
#
# action: sendMessage, getUpdates
# query_string: chat_id=777555333&text=received
#gc.collect() # collect as you wish.
reader, writer = await asyncio.open_connection(self.hostname, 443, True)
#print("sendRequest: {}, {}".format(action, query_string), end='')
query = "GET /{}/{}?{} HTTP/1.0\r\n\r\n".format(self.bot_token, action, query_string)
await writer.awrite(query.encode('latin-1'))
while True:
line = await reader.readline()
if line:
pass
#print('<', end='')
if line == b'\r\n':
break #break on reading a full blank line.
else:
return # return (None) when only received HTTP responce head.
#print('')
response = ujson.loads (await reader.read())
await reader.aclose()
await writer.aclose()
return response #return the response body parsed as dict
async def send(self, chat_id, text, keyboard:list=None, silent = True):
data = {'chat_id': chat_id, 'text': text, 'disable_notification': silent}
if keyboard:
self.kbd['keyboard'] = keyboard
data['reply_markup'] = ujson.dumps(self.kbd)
try:
jo = await self.Request_GET('sendMessage', self.dict2querystring(data))
print('OK:' + str(jo['ok']))
except Exception as e:
print('TG_Send err: {}'.format(e))
finally:
pass
async def update(self):
result = []
try:
jo = await self.Request_GET('getUpdates', self.dict2querystring(self.upd))
#print('TG_update: {}'.format(jo))
except Exception as e:
if e.args[0] == -104:
print('TG_Updat e: Conn Reset') # Guess: -104 connection reset
else:
print('TG_Updat e: {}'.format(e))
# TODO: when wlan is not connected: 'list index out of range'
return None
finally:
pass
if 'result' in jo:
for item in jo['result']:
try:
if 'username' not in item['message']['chat']:
item['message']['chat']['username'] = 'notset'
result.append((item['message']['chat']['id'],
item['message']['chat']['username'],
item['message']['text']))
except KeyError:
print('TG_update: skiping an unqualified message')
#~~prepare to confirm this update in future HTTP requests,~~
#~~sometimes the `if 'text'..` statements are False~~
#~~while those messages still need confirmation.~~
finally:
self.upd['offset'] = jo['result'][-1]['update_id'] + 1
return result
async def listen(self, handler = None):
while True:
try:
messages = await self.update()
if messages:
#await asyncio.sleep(1)
print(messages)
#await self.send(messages[0][0], messages[0][2]) #ping-pong the reseived message
if handler != None:
await handler(messages)#TODO: CONFIRM UPDATE before HW reboot command.
finally:
gc.collect()
#print('free mem: {} Bytes'.format(gc.mem_free()))
#await asyncio.sleep(1)
# TODO possible ceveats:
# flooding retries when wifi not connected/interrupted;
# ctrl+c as well as unhandled exceptions will leave reader/writers open,
# causing ENOMEM OSError if you try to restart the loop without soft reboot.
# this can't be aclose():d using try-except schemes since your\
# KeyBoardInterrupt will mostly hit other executing coroutines;
# Message including ': H' cannot be sent and will result in an JSON error
#Test
'''
from tg_async_api import *
tg = Telegram_API(TOKEN='123456789:AbcdefgHijKLmnOPqrsT8')
loop = asyncio.get_event_loop()
loop.create_task(tg.listen())
loop.run_forever()
#loop.create_task(tg.send(your_id_int, 'hello'))
#loop.create_task(tg.update())
#loop.run_forever()
'''
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment