Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Python Telegram weather radar bot
from html.parser import HTMLParser
import http.client
from http.client import HTTPException
import json
import os
from urllib.parse import urljoin
SRI_HOST = "lietus.lv"
TELEGRAM_API_HOST = "api.telegram.org"
# We inherit from the base HTMLParser class and implement our custom handlers
class SRIHTMLParser(HTMLParser):
data_list = []
def handle_starttag(self, tag, attrs):
pass
def handle_endtag(self, tag):
pass
def handle_data(self, data):
if '.png' in data:
self.data_list.append(data)
def fetch_and_parse_sri_url():
# Create an HTTP client (important - this time it is HTTP to match the target host)
conn = http.client.HTTPConnection(host=SRI_HOST, timeout=5)
# Instantiate our parser obj
parser = SRIHTMLParser()
endpoint = "/sri/"
conn.request("GET", endpoint, "")
res = conn.getresponse()
# Check that status is ok
if res.status == 200:
# We read the response binary data, decode to str and feed into our parser
data = res.read()
html = data.decode('utf-8')
parser.feed(html)
parser.close()
# We know that the img resource we need is first in the list, so we return that and strip any whitespace
return parser.data_list[0].strip()
else:
raise HTTPException(f"Unexpected status from {SRI_HOST} : {res.status}")
def lambda_handler(_event, _context):
# Create client connection with Telegram API
conn = http.client.HTTPSConnection(TELEGRAM_API_HOST)
telegram_token = os.getenv('TELEGRAM_TOKEN')
# Check that token is not empty
if telegram_token is not None:
endpoint = f"/bot{telegram_token}/sendPhoto"
sri_img_resource = fetch_and_parse_sri_url()
payload = {
'chat_id': -1001467903359,
'photo': urljoin(f"http://{SRI_HOST}", f"sri/{sri_img_resource}"),
'caption': 'Current radar image.'
}
headers = {'content-type': "application/json"}
# Make a POST request
conn.request("POST", endpoint, json.dumps(payload), headers)
# Get the request response and save it
res = conn.getresponse()
return {
'statusCode': res.status,
'body': json.dumps('Lambda executed.')
}
else:
raise EnvironmentError("Missing TELEGRAM_TOKEN env variable!")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment