Skip to content

Instantly share code, notes, and snippets.

@meashishsaini
Last active May 7, 2021 10:53
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save meashishsaini/c7650db6eaf4319bd0e2b323a878e797 to your computer and use it in GitHub Desktop.
Save meashishsaini/c7650db6eaf4319bd0e2b323a878e797 to your computer and use it in GitHub Desktop.
Check for any slots available for vaccination on Co-Win platform: https://apisetu.gov.in/public/marketplace/api/cowin. Optionally notify on Join: https://joaoapps.com/join/
from typing import Any, Optional
import requests
import datetime
import logging
import argparse
import os
logger = logging.getLogger("check_for_centers_available")
url = "https://cdn-api.co-vin.in/api/v2/appointment/sessions/calendarByPin"
rampur_pincode = "244901"
today_date = datetime.date.today().strftime("%d-%m-%Y")
rampur_district_id = "683"
join_api_key_variable = "JOIN_API_KEY"
def join_message(message: str, title: str = "Vaccine"):
join_api_key = os.environ.get(join_api_key_variable)
if not join_api_key:
logger.warning(f"{join_api_key_variable} not set.")
return False
base = "https://joinjoaomgcd.appspot.com/_ah/api/messaging/v1"
join_message_url = f"{base}/sendPush?text={message}&title={title}&deviceId=group.all&apikey={join_api_key}"
response = requests.get(join_message_url)
if not response.ok:
logger.warning(response.text)
return response.ok
def get_centers_by_pincode(
pincode: str = rampur_pincode, date: str = today_date
) -> Optional[Any]:
url = "https://cdn-api.co-vin.in/api/v2/appointment/sessions/public/calendarByPin"
querystring = {"pincode": pincode, "date": date}
response = requests.request("GET", url, params=querystring)
if not response.ok:
logger.warning(response.text)
return None
return response.json()
def get_centers_by_district(
distric_id: str = rampur_district_id, date: str = today_date
) -> Optional[Any]:
url = "https://cdn-api.co-vin.in/api/v2/appointment/sessions/public/calendarByDistrict"
querystring = {"district_id": distric_id, "date": date}
response = requests.request("GET", url, params=querystring)
if not response.ok:
logger.warning(response.text)
return None
return response.json()
def make_available_message(sessions_available: list) -> str:
if len(sessions_available) <= 0:
return f"No sessions available for: {today_date} and next 6 days."
message_str = ""
for session_available in sessions_available:
center = session_available["center"]
session = session_available["session"]
message_str += center["name"]
message_str += "\nBlock: "
message_str += center["block_name"]
message_str += "\nDistrict: "
message_str += center["district_name"]
message_str += "\nState: "
message_str += center["state_name"]
message_str += "\nFee: "
message_str += center["fee_type"]
message_str += "\nDate: "
message_str += session["date"]
message_str += "\nMinimum Age Limit: "
message_str += str(session["min_age_limit"])
message_str += "\nAvailable Capacity: "
message_str += str(session["available_capacity"])
message_str += "\nVaccine: "
message_str += session["vaccine"]
message_str += "\nSlots: "
slots = ", ".join(session["slots"])
message_str += slots
message_str += "\n\n"
return message_str
def parse_and_notify(centers: list, cutoff_age: int = 45, notify=False):
sessions_available = []
for center in centers:
sessions = center["sessions"]
for session in sessions:
minimum_age_limit = session["min_age_limit"]
available_capacity = session["available_capacity"]
try:
minimum_age_limit = int(minimum_age_limit)
available_capacity = int(available_capacity)
except SyntaxError as err:
logger.warning(err)
continue
except ValueError as err:
logger.warning(err)
continue
if minimum_age_limit <= cutoff_age and available_capacity > 0:
sessions_available.append({"center": center, "session": session})
print(f"{len(sessions_available)} sessions found.")
message_str = make_available_message(sessions_available)
logger.info(message_str)
if notify:
if join_message(message_str):
print("Message sent successfully!")
else:
print("Unbale to send message.")
return len(sessions_available) > 0
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description=f"Notify with slots available for vaccine. Defaults to Rampur."
)
parser.add_argument(
"-d",
"--dictrict_id",
help=f"ID of the district. Default: {rampur_district_id}",
default=rampur_district_id,
type=str,
)
parser.add_argument("-p", "--pincode", help="Pincode of the place.", type=str)
parser.add_argument(
"-a", "--age", help="Cutoff age. Default: 26", default=26, type=int
)
parser.add_argument(
"-n",
"--notify",
help=f"Send a join notification to all devices. {join_api_key_variable} environment variable should be set for this to work.",
action="store_true",
)
args = parser.parse_args()
if args.pincode:
centers = get_centers_by_pincode(pincode=args.pincode)
else:
centers = get_centers_by_district(distric_id=args.dictrict_id)
if centers:
if not parse_and_notify(centers["centers"], args.age, notify=args.notify):
print("No sessions found.")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment