Last active
May 7, 2021 10:53
-
-
Save meashishsaini/c7650db6eaf4319bd0e2b323a878e797 to your computer and use it in GitHub Desktop.
Check for any slots available for vaccination on Co-Win platform: https://apisetu.gov.in/public/marketplace/api/cowin. Optionally notify on Join: https://joaoapps.com/join/
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from typing import Any, Optional | |
import requests | |
import datetime | |
import logging | |
import argparse | |
import os | |
logger = logging.getLogger("check_for_centers_available") | |
url = "https://cdn-api.co-vin.in/api/v2/appointment/sessions/calendarByPin" | |
rampur_pincode = "244901" | |
today_date = datetime.date.today().strftime("%d-%m-%Y") | |
rampur_district_id = "683" | |
join_api_key_variable = "JOIN_API_KEY" | |
def join_message(message: str, title: str = "Vaccine"): | |
join_api_key = os.environ.get(join_api_key_variable) | |
if not join_api_key: | |
logger.warning(f"{join_api_key_variable} not set.") | |
return False | |
base = "https://joinjoaomgcd.appspot.com/_ah/api/messaging/v1" | |
join_message_url = f"{base}/sendPush?text={message}&title={title}&deviceId=group.all&apikey={join_api_key}" | |
response = requests.get(join_message_url) | |
if not response.ok: | |
logger.warning(response.text) | |
return response.ok | |
def get_centers_by_pincode( | |
pincode: str = rampur_pincode, date: str = today_date | |
) -> Optional[Any]: | |
url = "https://cdn-api.co-vin.in/api/v2/appointment/sessions/public/calendarByPin" | |
querystring = {"pincode": pincode, "date": date} | |
response = requests.request("GET", url, params=querystring) | |
if not response.ok: | |
logger.warning(response.text) | |
return None | |
return response.json() | |
def get_centers_by_district( | |
distric_id: str = rampur_district_id, date: str = today_date | |
) -> Optional[Any]: | |
url = "https://cdn-api.co-vin.in/api/v2/appointment/sessions/public/calendarByDistrict" | |
querystring = {"district_id": distric_id, "date": date} | |
response = requests.request("GET", url, params=querystring) | |
if not response.ok: | |
logger.warning(response.text) | |
return None | |
return response.json() | |
def make_available_message(sessions_available: list) -> str: | |
if len(sessions_available) <= 0: | |
return f"No sessions available for: {today_date} and next 6 days." | |
message_str = "" | |
for session_available in sessions_available: | |
center = session_available["center"] | |
session = session_available["session"] | |
message_str += center["name"] | |
message_str += "\nBlock: " | |
message_str += center["block_name"] | |
message_str += "\nDistrict: " | |
message_str += center["district_name"] | |
message_str += "\nState: " | |
message_str += center["state_name"] | |
message_str += "\nFee: " | |
message_str += center["fee_type"] | |
message_str += "\nDate: " | |
message_str += session["date"] | |
message_str += "\nMinimum Age Limit: " | |
message_str += str(session["min_age_limit"]) | |
message_str += "\nAvailable Capacity: " | |
message_str += str(session["available_capacity"]) | |
message_str += "\nVaccine: " | |
message_str += session["vaccine"] | |
message_str += "\nSlots: " | |
slots = ", ".join(session["slots"]) | |
message_str += slots | |
message_str += "\n\n" | |
return message_str | |
def parse_and_notify(centers: list, cutoff_age: int = 45, notify=False): | |
sessions_available = [] | |
for center in centers: | |
sessions = center["sessions"] | |
for session in sessions: | |
minimum_age_limit = session["min_age_limit"] | |
available_capacity = session["available_capacity"] | |
try: | |
minimum_age_limit = int(minimum_age_limit) | |
available_capacity = int(available_capacity) | |
except SyntaxError as err: | |
logger.warning(err) | |
continue | |
except ValueError as err: | |
logger.warning(err) | |
continue | |
if minimum_age_limit <= cutoff_age and available_capacity > 0: | |
sessions_available.append({"center": center, "session": session}) | |
print(f"{len(sessions_available)} sessions found.") | |
message_str = make_available_message(sessions_available) | |
logger.info(message_str) | |
if notify: | |
if join_message(message_str): | |
print("Message sent successfully!") | |
else: | |
print("Unbale to send message.") | |
return len(sessions_available) > 0 | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser( | |
description=f"Notify with slots available for vaccine. Defaults to Rampur." | |
) | |
parser.add_argument( | |
"-d", | |
"--dictrict_id", | |
help=f"ID of the district. Default: {rampur_district_id}", | |
default=rampur_district_id, | |
type=str, | |
) | |
parser.add_argument("-p", "--pincode", help="Pincode of the place.", type=str) | |
parser.add_argument( | |
"-a", "--age", help="Cutoff age. Default: 26", default=26, type=int | |
) | |
parser.add_argument( | |
"-n", | |
"--notify", | |
help=f"Send a join notification to all devices. {join_api_key_variable} environment variable should be set for this to work.", | |
action="store_true", | |
) | |
args = parser.parse_args() | |
if args.pincode: | |
centers = get_centers_by_pincode(pincode=args.pincode) | |
else: | |
centers = get_centers_by_district(distric_id=args.dictrict_id) | |
if centers: | |
if not parse_and_notify(centers["centers"], args.age, notify=args.notify): | |
print("No sessions found.") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment