|
import os |
|
import traceback |
|
import selenium |
|
from selenium import webdriver |
|
from selenium.webdriver.chrome.options import Options |
|
from selenium.webdriver.common.by import By |
|
from selenium.webdriver.common.keys import Keys |
|
from selenium.webdriver.support.ui import WebDriverWait |
|
from selenium.webdriver.support.expected_conditions import presence_of_element_located, element_to_be_clickable |
|
from selenium.common.exceptions import ElementClickInterceptedException |
|
import time |
|
from PIL import Image |
|
import io |
|
import requests |
|
import chromedriver_binary |
|
import chromedriver_autoinstaller |
|
import datetime |
|
import oyaml as yaml |
|
import calendar |
|
from pywebio.input import * |
|
from pywebio.output import * |
|
from pywebio.session import * |
|
|
|
logs_dir = './logs' |
|
class bcolors: |
|
HEADER = '\033[95m' |
|
OKBLUE = '\033[94m' |
|
OKCYAN = '\033[96m' |
|
OKGREEN = '\033[92m' |
|
WARNING = '\033[93m' |
|
FAIL = '\033[91m' |
|
ENDC = '\033[0m' |
|
BOLD = '\033[1m' |
|
UNDERLINE = '\033[4m' |
|
|
|
# Remember to install the chromedriver for your OS, download the version that matches your installed Google Chrome version |
|
# https://chromedriver.chromium.org/downloads |
|
|
|
# Global Variables |
|
driver = None |
|
wait = WebDriverWait(driver, 20) |
|
config = None |
|
info = None |
|
log_file = None |
|
telegram_bot_url = None |
|
|
|
def notify_telegram(message): |
|
global telegram_bot_url |
|
message = f"BUPA bot: {message}" |
|
try: |
|
requests.get(f"{telegram_bot_url}{message}") |
|
except Exception as ee: |
|
print("Error notifying telegram bot") |
|
|
|
|
|
""" |
|
Just setting up Selenium with the driver |
|
""" |
|
def read_config(): |
|
global config |
|
global info |
|
global telegram_bot_url |
|
config = yaml.full_load(open("config.yml")) |
|
try: |
|
info = yaml.full_load(open("info.no-commit.yml")) |
|
except: |
|
info = yaml.full_load(open("info.yml")) |
|
if info['telegram_token']: |
|
telegram_bot_url = f"https://api.telegram.org/bot{info['telegram_token']}/sendMessage?chat_id={info['chat_id']}&text=" |
|
|
|
|
|
def query(key): |
|
q = config['selectors'][key] |
|
return (q['by'], q['query']) |
|
|
|
|
|
def init_chrome_driver(): |
|
global driver |
|
global wait |
|
global options |
|
options = Options() |
|
options.headless = False # When True it will show a chrome window so you can see what's happening |
|
# options.add_argument('--headless') # ensure GUI is off |
|
options.add_argument("--window-size=400, 300") |
|
options.add_argument('--no-sandbox') |
|
options.add_argument('--disable-dev-shm-usage') |
|
# Initialize the Chrome Driver with the options we set up |
|
# driver = webdriver.Chrome(config["chrome_driver_path"], options=options) |
|
driver = webdriver.Chrome(options=options) |
|
chromedriver_autoinstaller.install() |
|
# This defines a default wait object, the waiting time is 10 seconds |
|
# Whenever you call a wait, it will wait until the even you specify happens |
|
# If it does not happen in the time frame you passed, it will throw a TimeOutException |
|
wait = WebDriverWait(driver, config["wait_element_time"]) |
|
|
|
def current_time_str(): |
|
return datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') |
|
|
|
def log_print(log_type="i", txt="", with_time=True, end="\n", start="\n", tg_notif=False, to_file=True): |
|
global log_file |
|
global telegram |
|
prefix = "" |
|
if with_time: |
|
prefix = f"{current_time_str()}\t" |
|
color = "" |
|
log_type = log_type.lower() |
|
if log_type in ["i","info","nfo","log"]: |
|
color = bcolors.OKCYAN |
|
elif log_type in ["w","f"]: |
|
color = bcolors.WARNING |
|
elif log_type in ["e","f", "n", "error", "fail", "no"]: |
|
color = bcolors.FAIL |
|
elif log_type in ["s","success", "ok"]: |
|
color = bcolors.OKGREEN |
|
|
|
print(start+prefix+color+txt, end=end) |
|
if to_file and log_file is not None: |
|
try: |
|
log_file.write(start+prefix+txt+end+"\n") |
|
log_file.flush() |
|
except Exception as e: |
|
pass |
|
if tg_notif: |
|
notify_telegram(f"{prefix}\n{txt}") |
|
|
|
|
|
def wait_print(seconds, waiting_msg="", end_msg="", log_type="i", tg_notif=False): |
|
while seconds > 0: |
|
log_print(log_type, f"{waiting_msg} {seconds}s... ", end = "\r", start="", tg_notif=tg_notif, to_file=False) |
|
time.sleep(1) |
|
seconds = seconds - 1 |
|
log_print(log_type, f"{waiting_msg} {seconds}s... {end_msg}", end = "\n", start="", tg_notif=tg_notif, to_file=True) |
|
|
|
def check_availability(): |
|
# Check all available days from the first one |
|
available_days = driver.find_elements(*query('available_days')) |
|
total_available = len(available_days) |
|
with open('dates_log.csv', 'w') as f: |
|
for i in range(0, total_available): |
|
d = available_days[i] |
|
new_date = datetime.datetime.strptime(d.get_attribute("data-value"), '%d/%m/%Y') |
|
if not check_date(new_date, config['app_date']): |
|
continue |
|
f.write(f"{current_time_str()},{new_date}\n") |
|
f.flush() |
|
d.click() # Click on the day |
|
wait.until(element_to_be_clickable(query('available_days_container'))) # Waiting until the page has reloaded |
|
if book_appointment(driver, config, new_date): |
|
return True |
|
available_days = driver.find_elements(*query('available_days')) # Need to get the reference to the new available days |
|
return False |
|
|
|
def get_current_booking(): |
|
# Get current appointment date |
|
try: |
|
app_date = driver.find_element(*query('current_appointment')).text |
|
config['app_date'] = datetime.datetime.strptime(app_date, '%A, %d %B %Y @ %I:%M %p') # 25 June 2024 @ 10:45 AM |
|
log_print("e", f"Current appointment date: {config['app_date']}", end="") |
|
except Exception as e: |
|
log_print("e", f"Error getting current appointment date: {e}") |
|
|
|
def edit_appointment(): |
|
# Gotta wait until the "modify/edit date and location" button is visible |
|
edit_btn = wait.until(element_to_be_clickable(query('edit_btn'))) # (By.ID, "ContentPlaceHolder1_repAppointments_lnkChangeAppointment_0") |
|
edit_btn.click() |
|
# Clicking next on the location selection page |
|
next_btn = wait.until(element_to_be_clickable(query('next_btn'))) # (By.ID, "ContentPlaceHolder1_btnCont") |
|
next_btn.click() |
|
|
|
|
|
def loop_refresh(): |
|
logged_in = False |
|
# Loop forever to keep finding closer and closer appointments |
|
while True: |
|
try: |
|
if not logged_in: |
|
login() |
|
edit_appointment() |
|
wait.until(element_to_be_clickable(query('available_days_container'))) |
|
except Exception as e: |
|
log_print("e", f"Error navigating to edit booking page{e}", end="") |
|
traceback.print_exception(e) |
|
logged_in = False |
|
time.sleep(2) # take a pause |
|
continue |
|
|
|
get_current_booking() |
|
# If this is the first time, we just logged in so this will still be false |
|
if not logged_in: |
|
notify_telegram(f"Current booking date: {config['app_date']}") |
|
logged_in = True |
|
found_available = False |
|
while not found_available: |
|
found_available = check_availability() |
|
try: |
|
driver.find_element(*query('next_dates_btn')).click() |
|
log_print('i', f"Getting more available dates...") |
|
wait.until(element_to_be_clickable(query('available_days_container'))) |
|
except Exception as e: |
|
break |
|
|
|
if not found_available: |
|
print() |
|
wait_print(config["time_to_refresh"], |
|
waiting_msg=f"No new date found or not a good option! Waiting {config['time_to_refresh']}s before refresh...", |
|
end_msg="Refreshing the page!", log_type="w") # take a pause |
|
|
|
driver.get(config['bupa_account_url']) |
|
# driver.refresh() # Refresh the page and restart the loop which will look for the calendar |
|
time.sleep(2) # take a pause |
|
|
|
def book_appointment(driver, config, new_date): |
|
try: |
|
wait.until(element_to_be_clickable(query('time_radio'))) # query('time_radio') = (By.CSS_SELECTOR, '#ContentPlaceHolder1_SelectTime1_rblResults input') |
|
time_results = driver.find_elements(*query('time_radio')) |
|
except Exception as e: |
|
log_print("e", "No times found for current selected date", end="") |
|
return False |
|
best_datetime = None |
|
for e in time_results: |
|
new_time = datetime.datetime.strptime(e.get_attribute('data-text'), "%I:%M %p") |
|
new_date = new_date.replace(hour=new_time.hour, minute=new_time.minute) |
|
log_print("w", f"Checking date-time: {new_date}") |
|
if new_date < config['app_date']: |
|
best_datetime = new_date |
|
break |
|
log_print("w", f"New date-time {new_date} is AFTER current appointment date: {config['app_date']}") |
|
if best_datetime is None: |
|
log_print("w", f"No suitable date-time found") |
|
return False |
|
|
|
log_print("w", f"Found a new suitable date-time! BOOKING {best_datetime}") |
|
notify_telegram(f"Found a new suitable date-time: {best_datetime}") |
|
|
|
if not config['dry_run']: |
|
# And because I'm lazy, I'll always click the last radio button in the list (so the latest time of the day available) |
|
# After clicking we need to wait to be able to click next |
|
next_btn_edit = wait.until(element_to_be_clickable(query('next_btn'))) # query('next_btn') = (By.ID, 'ContentPlaceHolder1_btnCont') |
|
next_btn_edit.click() |
|
|
|
# After clicking next we need to wait until the confirmation page shows up |
|
# Because the Save Changes button has no class or id I used XPATH to get the button with that specific text in it |
|
save_btn = wait.until(element_to_be_clickable(query('save_btn'))) # (By.XPATH, '//button[text()="Save changes"]') |
|
log_print("i", f"Found Save button! {save_btn.get_attribute('onclick')}") |
|
save_btn.click() # Click Save! |
|
# Update the date in the yaml file so at the next loop it won't keep booking this same one and get stuck in a loop |
|
config['app_date'] = new_date |
|
yaml.dump(config, open("config.yml", "w")) |
|
notify_telegram(f"New appointment booked!: {new_date}") |
|
|
|
return True |
|
|
|
|
|
def check_date(date_to_check, curr_appt_date): |
|
log_print("i", f"Checking new potential date: {date_to_check}", end="... ") |
|
if date_to_check > curr_appt_date: |
|
log_print("w", f"Date is AFTER the current booked date ({config['app_date']})", end="", start="", with_time=False) |
|
return False |
|
|
|
dates_range=config.get("dates_range") |
|
from_date = None |
|
to_date = None |
|
if dates_range is not None: |
|
from_date= dates_range.get("from") |
|
to_date= dates_range.get("to") |
|
if from_date is not None and date_to_check < datetime.datetime.strptime(from_date, '%d/%m/%Y'): |
|
log_print("w", f"New date {date_to_check} is OUTSIDE the RANGE (BEFORE the FROM date {from_date})", end="") |
|
return False |
|
if to_date is not None and date_to_check > datetime.datetime.strptime(to_date, '%d/%m/%Y'): |
|
log_print("w", f"New date {date_to_check} is OUTSIDE the RANGE (AFTER the TO date {to_date})", end="") |
|
return False |
|
|
|
weekdays_excluded = config.get("dates_weekdays_excluded") |
|
if weekdays_excluded is not None: |
|
for weekday_str in weekdays_excluded: |
|
new_weekday = calendar.day_name[date_to_check.weekday()].lower() |
|
weekday_ex = weekday_str.lower() |
|
# log_print(f"{new_weekday} - {weekday_str}", end="\t") |
|
if new_weekday == weekday_ex: |
|
log_print("w", f"New date {date_to_check} is a {weekday_str.upper()}, which is EXCLUDED", end="") |
|
return False |
|
|
|
dates_excluded = config.get("dates_excluded") |
|
if dates_excluded is not None: |
|
for date_ex_str in dates_excluded: |
|
date_ex = datetime.datetime.strptime(date_ex_str, '%d/%m/%Y') |
|
if date_ex == date_to_check: |
|
log_print("w", f"New date {date_to_check} is in the EXCLUDED list", end="") |
|
return False |
|
log_print("ok", f"New date {date_to_check} WORKS! Checking times...!", end="") |
|
return True |
|
|
|
def test_dates(dates_to_check): |
|
for date in dates_to_check: |
|
check_date(datetime.datetime.strptime(date, '%d/%m/%Y')) |
|
|
|
|
|
|
|
""" |
|
Function to login by inputting the data and moving to the edit booking page |
|
""" |
|
def login(): |
|
driver.get(config["bupa_base_url"]) # Navigate to the BUPA modify booking page |
|
# Get all the field and send the input for each one |
|
# driver.maximize_window() |
|
time.sleep(2) |
|
search_btn = wait.until(element_to_be_clickable(query('search_btn'))) # (By.ID, "ContentPlaceHolder1_btnSearch") |
|
hapid = driver.find_element(*query('hapid_field')) # By.ID, 'ContentPlaceHolder1_txtHAPID' |
|
hapid.send_keys(info['HAPID']) |
|
firstname = driver.find_element(*query('first_name')) # By.ID, 'ContentPlaceHolder1_txtFirstName' |
|
firstname.send_keys(info["first_name"]) |
|
surname = driver.find_element(*query('surname')) # By.ID, 'ContentPlaceHolder1_txtSurname' |
|
surname.send_keys(info["surname"]) |
|
dob = driver.find_element(*query('dob')) # By.ID, 'ContentPlaceHolder1_txtDOB' |
|
dob.send_keys(info["dob"]) |
|
# Finally click search to get to our booking |
|
search_btn.click() |
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
if not os.path.exists(logs_dir): |
|
os.makedirs(logs_dir) |
|
log_file = open(f"{logs_dir}/log_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}.log", "w") |
|
read_config() |
|
# test_dates(["01/01/2021", "29/01/2022", "30/01/2022", "31/12/2021", "01/01/2022", "01/03/2022", "10/02/2022", "01/02/2022", "15/01/2022", "12/12/2022", "01/01/2023"]) |
|
init_chrome_driver() |
|
try: |
|
loop_refresh() |
|
except Exception as e: |
|
log_print("e", f"Critical ERROR!! {e}", end="") |
|
print() |
|
print(traceback.print_exception(e)) |
|
log_file.close() |