Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save imvishalpatel/144398040b7f785e0d971cf6f4c52c30 to your computer and use it in GitHub Desktop.
Save imvishalpatel/144398040b7f785e0d971cf6f4c52c30 to your computer and use it in GitHub Desktop.
Check slots in your district id and change intervals
#!/usr/bin/python
import sys
import json
import requests
import subprocess
from datetime import datetime, timedelta
timeout = 600.0 #seconds
limit = 7
districtId = 395
ageLimit = 19
messages = ""
whatsappUrl = "https://messages-sandbox.nexmo.com/v0.1/messages"
headers = {
'Content-Type': 'application/json',
'Accept': 'application/json',
}
numbers = ['<number1>','<number2>']
data = { "from": { "type": "whatsapp", "number": "<NexmoNumber>" }, "to": { "type": "whatsapp", "number": "<YourNumber>" }, "message": { "content": { "type": "text", "text": "This is a WhatsApp Message sent from the Messages API" } } }
def install(package):
subprocess.check_call([sys.executable, "-m", "pip", "install", package])
def checkForSlots():
day = 1
while day < limit:
currentDate = datetime.now() + timedelta(days=day)
currentDateString = currentDate.strftime('%d-%m-%Y')
day += 1
requestUrl = "https://cdn-api.co-vin.in/api/v2/appointment/sessions/public/calendarByDistrict?district_id={}&date={}".format(districtId, currentDateString)
apiResponse = requests.get(url=requestUrl)
jsonResponse = apiResponse.json()
centers = jsonResponse['centers']
for center in centers:
sessionsAtCenter = center['sessions']
for session in sessionsAtCenter:
if(session['min_age_limit'] <= ageLimit and session['available_capacity'] > 0):
message = "Center: {} - {} is vaccinating people for min age {} on {} with capacity: {}\n".format(center['name'], center['fee_type'], session['min_age_limit'], session['date'], session['available_capacity'])
sendWhatsappNotif(message)
def sendWhatsappNotif(message):
data['message']['content']['text'] = message
for number in numbers:
data['to']['number'] = number
response = requests.post('https://messages-sandbox.nexmo.com/v0.1/messages', headers=headers, data=json.dumps(data), auth=('<username>', '<password>'))
print(response.__dict__)
install("twisted")
from twisted.internet import task, reactor
cowinChecker = task.LoopingCall(checkForSlots)
cowinChecker.start(timeout)
reactor.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment