Last active
October 11, 2015 19:18
-
-
Save zhuzhuor/3906906 to your computer and use it in GitHub Desktop.
search earlier date for g2 test
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# this script doesn't work anymore due to changes/upgrades of the exam booking website | |
# driver license number | |
DLNumberA = "S1111" | |
DLNumberB = "22222" | |
DLNumberC = "33333" | |
# post code | |
PostCodeA = "N2L" | |
PostCodeB = "3G1" | |
# DL expire date | |
ExpireDateYear = "2123" | |
ExpireDateMonth = "01" | |
ExpireDateDay = "31" | |
# searching time interval | |
# yyyymmddhhmm | |
after_date_time = "201111300000" | |
before_date_time = "201112311200" | |
# only search morning | |
only_morning = False | |
# email to | |
recipient_list = ["aaa@uwaterloo.ca", "bbb@gmail.com"] | |
# email from | |
from_address = "no-reply@nowhere.com" | |
### don't change the setting after this ### | |
import urllib | |
import urllib2 | |
import re | |
import time | |
import urlparse | |
import smtplib | |
time_to_wait = 12 * 60 # seconds | |
login_url = "https://www.rtbo.rus.mto.gov.on.ca/scripts/english/index.asp" | |
login_data = { | |
"SessionId": "", | |
"Submitted": "1", | |
"Option": "1", | |
"DLNumberA": DLNumberA, | |
"DLNumberB": DLNumberB, | |
"DLNumberC": DLNumberC, | |
"PostCodeA": PostCodeA, | |
"PostCodeB": PostCodeB, | |
"ExpireDateYear": ExpireDateYear, | |
"ExpireDateMonth": ExpireDateMonth, | |
"ExpireDateDay": ExpireDateDay, | |
"PhoneAreaCode": "519", | |
"PhoneExchange": "888", | |
"PhoneNumber": "4567", | |
"Continue": "Continue", | |
} | |
search_url_base = "https://www.rtbo.rus.mto.gov.on.ca/scripts/english/booking1.asp?SessionId=%s" | |
search_data_base = { | |
"SessionId": None, | |
"Submitted": "1", | |
"Option": "1", | |
"OldCertificate": "0", | |
"BookingClass": "G1", | |
"MotorcycleType": "F", | |
"TheRegion": "30", | |
"TheList": "N420PUBLIC", | |
"Continue": "Continue", | |
} | |
next_url_base = "https://www.rtbo.rus.mto.gov.on.ca/scripts/english/location.asp?sessionid=%s&DLClass=G1" | |
next_data_base = { | |
"SessionId": None, | |
"Submitted": "1", | |
"Option": "2", | |
"LastDateTime": None, | |
"SearchType": "0", | |
"AvailableDate": "-1", | |
"SpecificDate": "", | |
"btnSearchN": "Search Next Available Date", | |
} | |
common_headers = { | |
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", | |
"Accept-Language": "en-us,en;q=0.5", | |
"Accept-Encoding": "deflate", | |
"Accept-Charset": "ISO-8859-1,utf-8;q=0.7,*;q=0.7", | |
"Connection": "keep-alive", | |
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.7; rv:7.0.1) Gecko/20100101 Firefox/7.0.1", | |
} | |
def get_new_session_id(): | |
# first login | |
url = login_url | |
data = urllib.urlencode(login_data) | |
req = urllib2.Request(url, data, common_headers) | |
req.add_header("Referer", "https://www.rtbo.rus.mto.gov.on.ca/scripts/english/index.asp") | |
result_login = urllib2.urlopen(req) | |
# get the session id | |
current_url = result_login.geturl() | |
query = urlparse.urlparse(current_url).query | |
query_dict = urlparse.parse_qs(query) | |
if query_dict.has_key("SessionId"): | |
return query_dict["SessionId"][0] | |
else: | |
return None | |
def keep_alive(current_session_id, num_secs): | |
url = search_url_base % current_session_id | |
req = urllib2.Request(url) | |
req.add_header("Referer", url) | |
for i in range(num_secs / 10 - 1): | |
time.sleep(10) # wait for 10 secs | |
result_refresh = urllib2.urlopen(req) | |
result_refresh.read() | |
time.sleep(10) | |
def first_available_times(current_session_id): | |
# search the first available date for Kitchener | |
url = search_url_base % current_session_id | |
search_data = search_data_base | |
search_data["SessionId"] = current_session_id | |
data = urllib.urlencode(search_data) | |
req = urllib2.Request(url, data, common_headers) | |
#req.add_header("Referer", current_url) | |
req.add_header("Referer", url) | |
result_search = urllib2.urlopen(req) | |
# parse html to get available dates | |
#current_url = result_search.geturl() | |
current_html = result_search.read() | |
#print current_html | |
available_time_list = re.findall(r'<OPTION value="(\d+)"', current_html) | |
#print available_time_list | |
return available_time_list | |
def next_available_date(current_session_id, last_date_time): | |
time.sleep(10) | |
url = next_url_base % current_session_id | |
next_data = next_data_base | |
next_data["SessionId"] = current_session_id | |
next_data["LastDateTime"] = last_date_time | |
data = urllib.urlencode(next_data) | |
req = urllib2.Request(url, data, common_headers) | |
req.add_header("Referer", url) | |
result_next = urllib2.urlopen(req) | |
current_html = result_next.read() | |
available_time_list = re.findall(r'<OPTION value="(\d+)"', current_html) | |
return available_time_list | |
def send_email(mail_title): | |
mail_content = "From: " + from_address + "\n" \ | |
+ "To: " + ", ".join(recipient_list) + "\n" \ | |
+ "Subject: " + mail_title + " (" + time.ctime() + ")\n" \ | |
+ "as the title\n\n" + time.ctime() | |
# print mail_content | |
mail_server = smtplib.SMTP('localhost') | |
mail_server.sendmail(from_address, recipient_list, mail_content) | |
mail_server.quit() | |
def endless_search(): | |
num_errors = 0 | |
while True: | |
session_id = get_new_session_id() | |
print 'Got a new session ID: %s (%s)' % (session_id, time.ctime()) | |
if None == session_id: | |
num_errors += 1 | |
if num_errors >= 3: | |
num_errors = 0 | |
send_email("Errors!") | |
# sys.exit(1) | |
time.sleep(60 * 60) # wait for an hour | |
else: | |
num_errors = 0 | |
#keep_alive(session_id, time_to_wait) | |
time.sleep(10) | |
for i in range(5): # max 5 trials for each session | |
results = first_available_times(session_id) | |
print 'Search No.%d' % (i + 1), | |
print results, | |
print '(%s)' % time.ctime(), | |
print results[0][-4:] | |
if len(results) == 0: | |
time.sleep(time_to_wait) | |
break | |
if results[0] < before_date_time: | |
if results[0] > after_date_time: | |
if only_morning: | |
if results[0][-4:] < 1300: | |
send_email("Found " + results[0]) | |
else: | |
send_email("Found " + results[0]) | |
#else: # unnecessary to search next | |
# for j in range(4 - i): | |
# results = next_available_date(session_id, results[-1]) | |
# if len(results) == 0: | |
# break | |
# else: | |
# print 'Search No.%d' % (i + j + 2), | |
# print results, | |
# print '(%s)' % time.ctime() | |
# if results[0] >= before_date_time: | |
# break | |
# else: | |
# if results[0] > after_date_time: | |
# send_email("Found " + results[0]) | |
# time.sleep(10) | |
# #i += (j + 1) # this trick doesn't work for Python | |
keep_alive(session_id, time_to_wait) | |
if __name__ == "__main__": | |
while True: | |
try: | |
current_hour = int(time.strftime("%H")) | |
if current_hour in range(0, 9): | |
print "sleep at night... (%s)" % time.ctime() | |
time.sleep(time_to_wait) | |
else: | |
endless_search() | |
except Exception, exc: | |
print "Failed as %s (%s)" % (str(exc), time.ctime()) | |
send_email("Failed as " + str(exc)) | |
time.sleep(time_to_wait) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment