Last active
June 19, 2021 05:12
-
-
Save stupoid/29798454a0e1329ecc2dc68ee752481c to your computer and use it in GitHub Desktop.
Modified version of lambda-canary blueprint to monitor multiple sites and log via telegram_bot
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
from urllib.parse import urlencode | |
from urllib.request import Request, urlopen | |
def getenvbool(key: str, default: bool) -> bool: | |
envvar = os.getenv(key, "").lower() | |
if envvar in {"true", "1"}: | |
return True | |
elif envvar in {"false", "0"}: | |
return False | |
else: | |
return default | |
CHECKS = os.getenv("checks", "") # Multiple checks in the format "site1>expected_string_in_site1|site2>expected_string in site2" | |
TELEGRAM_BOT_TOKEN = os.getenv("telegram_bot_token") # Token used by telegram bot to send message over telegram | |
TELEGRAM_CHANNEL_ID = os.getenv("telegram_channel_id") # Used to identify channel to send message to (needed exact id for private channels) | |
NOTIFY_ON_SUCCESS = getenvbool("notify_on_success", False) # Toggle send notification if nothing is wrong | |
def lambda_handler(event, context): | |
try: | |
for check in CHECKS.split("|"): | |
site, expected = check.split(">", 1) | |
req = Request(site, headers={"User-Agent": "AWS Lambda"}) | |
res = urlopen(req) | |
body = res.read().decode("utf-8") | |
if res.status != 200 or expected not in body: | |
send_message( | |
f"Check <code>{expected}</code> at {site} <b>failed</b> at {event['time']}!", | |
disable_notification=False, | |
) | |
elif NOTIFY_ON_SUCCESS: | |
send_message( | |
f"Check <code>{expected}</code> at {site} <b>passed</b> at {event['time']}", | |
) | |
else: | |
print(f"Check {expected} at {site} passed at {event['time']}") | |
except Exception as err: | |
send_message( | |
f"Script failed due to {err.args[0]}", | |
disable_notification=False, | |
) | |
def send_message(text: str, disable_notification: bool = True): | |
params = urlencode( | |
{ | |
"chat_id": TELEGRAM_CHANNEL_ID, | |
"text": text, | |
"parse_mode": "HTML", | |
"disable_notification": disable_notification, | |
} | |
) | |
url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage?{params}" | |
urlopen(url) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Useful links:
Introduction to Telegram Bots
Telegram Bot API
Note to self:
As everything is sync and slow, a check that timed out may prevent subsequent checks from running.
Fix anything that that pops up ASAP.
Increase lambda timeout as needed.
TODO: