|
import json |
|
import os |
|
from datetime import datetime |
|
from pprint import pprint |
|
from sys import platform |
|
from time import sleep |
|
|
|
import requests |
|
|
|
# Edit below this |
|
simplepush_token = '' |
|
telegram_bot_token = '' |
|
slack_webhook = '' |
|
# Edit above this |
|
|
|
time_to_wait = 5 |
|
state_id = 17 # 17 = Kerala (Get it from: https://cdn-api.co-vin.in/api/v2/admin/location/states) |
|
district_id = None |
|
headers = {'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/90.0.4430.93 Safari/537.36'} |
|
cowin_api_base_url = 'https://cdn-api.co-vin.in/api/v2' |
|
|
|
|
|
def get_districts(): |
|
find_districts_url = f'{cowin_api_base_url}/admin/location/districts/{state_id}' |
|
try: |
|
r = requests.get(find_districts_url, headers=headers) |
|
except Exception: |
|
raise SystemExit('Could not get the district details from COWIN API. Try again.') |
|
else: |
|
return r.json()['districts'] |
|
|
|
def get_district(): |
|
districts = get_districts() |
|
for district_serial, district in enumerate(districts, 1): |
|
print(f"{district_serial}. {district['district_name']}") |
|
|
|
district_selection = input('\nSelect your district number (example: 5): ') |
|
district_number = int(district_selection) |
|
|
|
if district_number not in range(1, 15): |
|
raise SystemExit('Invalid district number chosen. Try again.') |
|
|
|
district_details = districts[district_number-1] |
|
print(f"\nChosen district name: {district_details['district_name']}\n") |
|
return district_details['district_id'] |
|
|
|
|
|
def get_json(district_id): |
|
date_str = datetime.today().strftime('%d-%m-%Y') |
|
slot_info_url = f'{cowin_api_base_url}/appointment/sessions/calendarByDistrict?district_id={district_id}&date={date_str}' |
|
try: |
|
return requests.get(slot_info_url, timeout=5, headers=headers).json() |
|
except Exception: |
|
print('Could not get the details from COWIN API.') |
|
|
|
|
|
def notify_simplepush(msg): |
|
simplepush_url = f'https://api.simplepush.io/send/{simplepush_token}/' |
|
msg = requests.utils.quote(msg) |
|
try: |
|
r = requests.get(f'{simplepush_url}COVID_ALERT/{msg}') |
|
except Exception: |
|
print('Failed to send notification to SimplePush.') |
|
|
|
|
|
def notify_telegram(msg): |
|
telegram_url = f'https://api.telegram.org/{telegram_bot_token}' |
|
msg = requests.utils.quote(msg) |
|
|
|
try: |
|
r = requests.get(f'{telegram_url}/getUpdates').json() |
|
chat_id = r['result'][1]['message']['chat']['id'] |
|
r = requests.get( |
|
f'{telegram_url}/sendMessage?chat_id={chat_id}&text={msg}').json() |
|
except Exception: |
|
print('Failed to send notification to Telegram.') |
|
|
|
|
|
def notify_slack(msg): |
|
try: |
|
r = requests.post(slack_webhook, json={'text': msg}) |
|
except Exception: |
|
print('Failed to send notification to Slack.') |
|
|
|
|
|
def say(m): |
|
if platform == "linux" or platform == "linux2": |
|
os.system(f'espeak-ng "{m}"') |
|
elif platform == "darwin": |
|
os.system(f'say "{m}"') |
|
elif platform == "win32": |
|
pass |
|
|
|
if __name__ == "__main__": |
|
while True: |
|
if not district_id: |
|
district_id = get_district() |
|
|
|
data = get_json(district_id) |
|
|
|
if not data: |
|
sleep(5) |
|
continue |
|
|
|
current_time = datetime.today().strftime('%I:%M:%S %p') |
|
|
|
for center in data['centers']: |
|
slots = center['sessions'][0]['available_capacity'] |
|
vaccine = center['sessions'][0]['vaccine'] |
|
center_name = center['name'] |
|
|
|
if slots: |
|
msg = f'{slots} slots for {vaccine} available in {center_name}' |
|
print(msg) |
|
|
|
if telegram_bot_token: |
|
print('Sending Telegram notification') |
|
notify_telegram(msg) |
|
|
|
if simplepush_token: |
|
print('Sending SimplePush notification') |
|
notify_simplepush(msg) |
|
|
|
if slack_webhook: |
|
print('Sending Slack notification') |
|
notify_slack(msg) |
|
|
|
say(msg) |
|
|
|
# If vaccine is not available anywhere |
|
if not any([c['sessions'][0]['available_capacity'] for c in data['centers']]): |
|
print(f'Vaccine not available yet. Last checked at: {current_time}') |
|
|
|
for i in reversed(range(time_to_wait)): |
|
print(f'Sleeping for {i}...', end='\r') |
|
sleep(1) |