Created
May 14, 2025 15:35
-
-
Save SeaWizard-ODIN/302c7722de569fbac361bce482042e31 to your computer and use it in GitHub Desktop.
Easyapplybot Original w/SW edits
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# === Required Packages === | |
# Install via: pip install -r requirements.txt | |
# - selenium | |
# - pandas | |
# - pyyaml | |
# - beautifulsoup4 | |
# - webdriver-manager | |
import json | |
import csv | |
import logging | |
import os | |
import random | |
import re | |
import time | |
from datetime import datetime, timedelta | |
import getpass | |
from pathlib import Path | |
import pandas as pd | |
# import pyautogui | |
import yaml | |
from bs4 import BeautifulSoup | |
from selenium import webdriver | |
from selenium.common.exceptions import TimeoutException | |
from selenium.webdriver.chrome.options import Options | |
from selenium.webdriver.common.by import By | |
from selenium.webdriver.common.keys import Keys | |
from selenium.webdriver.remote.webelement import WebElement | |
from selenium.webdriver.support import expected_conditions as EC | |
from selenium.webdriver.support.ui import WebDriverWait | |
from selenium.webdriver.support.ui import Select | |
from selenium.webdriver.chrome.service import Service as ChromeService | |
from webdriver_manager.chrome import ChromeDriverManager | |
import urllib.parse | |
# from selenium.webdriver.chrome.service import Service as ChromeService | |
# import webdriver_manager.chrome as ChromeDriverManager | |
# ChromeDriverManager = ChromeDriverManager.ChromeDriverManager | |
log = logging.getLogger(__name__) | |
# def setupLogger() -> None: | |
# dt: str = datetime.strftime(datetime.now(), "%m*%d*%y %H*%M*%S ") | |
# | |
# if not os.path.isdir('./logs'): | |
# os.mkdir('./logs') | |
# | |
# # TODO need to check if there is a log dir available or not | |
# logging.basicConfig(filename=('./logs/' + str(dt) + 'applyJobs.log'), filemode='w', | |
# format='%(asctime)s::%(name)s::%(levelname)s::%(message)s', datefmt='./logs/%d-%b-%y %H:%M:%S') | |
# log.setLevel(logging.DEBUG) | |
# c_handler = logging.StreamHandler() | |
# c_handler.setLevel(logging.DEBUG) | |
# c_format = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s', '%H:%M:%S') | |
# c_handler.setFormatter(c_format) | |
# log.addHandler(c_handler) | |
def get_appliedIDs(self, filename): | |
if not os.path.exists(filename): | |
log.info(f"Output file {filename} not found. Creating new file.") | |
with open(filename, 'w') as f: | |
f.write("timestamp,jobID,job,company,attempted,result\n") | |
return [] | |
try: | |
df = pd.read_csv(filename, | |
header=None, | |
names=['timestamp', 'jobID', 'job', 'company', 'attempted', 'result'], | |
lineterminator='\n', | |
encoding='utf-8') | |
df['timestamp'] = pd.to_datetime(df['timestamp'], format="%Y-%m-%d %H:%M:%S") | |
df = df[df['timestamp'] > (datetime.now() - timedelta(days=2))] | |
jobIDs = list(df.jobID) | |
log.info(f"{len(jobIDs)} jobIDs found") | |
return jobIDs | |
except Exception as e: | |
log.info(f"{e} jobIDs could not be loaded from CSV {filename}") | |
return [] | |
def setupLogger() -> None: | |
dt: str = datetime.strftime(datetime.now(), "%m-%d-%y_%H-%M-%S") | |
# dt: str = datetime.strftime(datetime.now(), "%m*%d*%y %H*%M*%S ") | |
if not os.path.isdir('./logs'): | |
os.mkdir('./logs') | |
logging.basicConfig( | |
filename=('./logs/' + str(dt) + 'applyJobs.log'), | |
filemode='w', | |
format='%(asctime)s::%(name)s::%(levelname)s::%(message)s', | |
datefmt='./logs/%d-%b-%y %H:%M:%S' | |
) | |
log.setLevel(logging.DEBUG) | |
c_handler = logging.StreamHandler() | |
c_handler.setLevel(logging.DEBUG) | |
c_format = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s', '%H:%M:%S') | |
c_handler.setFormatter(c_format) | |
log.addHandler(c_handler) | |
# def setupLogger() -> None: | |
# dt: str = datetime.strftime(datetime.now(), "%m*%d*%y %H*%M*%S ") | |
# | |
# if not os.path.isdir('./logs'): | |
# os.mkdir('./logs') | |
# | |
# # TODO need to check if there is a log dir available or not | |
# logging.basicConfig(filename=('./logs/' + str(dt) + 'applyJobs.log'), filemode='w', | |
# format='%(asctime)s::%(name)s::%(levelname)s::%(message)s', datefmt='./logs/%d-%b-%y %H:%M:%S') | |
# log.setLevel(logging.DEBUG) | |
# c_handler = logging.StreamHandler() | |
# c_handler.setLevel(logging.DEBUG) | |
# c_format = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s', '%H:%M:%S') | |
# c_handler.setFormatter(c_format) | |
# log.addHandler(c_handler) | |
def setupLogger() -> None: | |
dt: str = datetime.strftime(datetime.now(), "%m-%d-%y_%H-%M-%S") | |
# dt: str = datetime.strftime(datetime.now(), "%m*%d*%y %H*%M*%S ") | |
if not os.path.isdir('./logs'): | |
os.mkdir('./logs') | |
logging.basicConfig( | |
filename=('./logs/' + str(dt) + 'applyJobs.log'), | |
filemode='w', | |
format='%(asctime)s::%(name)s::%(levelname)s::%(message)s', | |
datefmt='./logs/%d-%b-%y %H:%M:%S' | |
) | |
log.setLevel(logging.DEBUG) | |
c_handler = logging.StreamHandler() | |
c_handler.setLevel(logging.DEBUG) | |
c_format = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s', '%H:%M:%S') | |
c_handler.setFormatter(c_format) | |
log.addHandler(c_handler) | |
class EasyApplyBot: | |
# setupLogger() # MAX_SEARCH_TIME is 10 hours by default, feel free to modify it | |
MAX_SEARCH_TIME = 60 * 60 # 1 hour by default | |
def __init__(self, | |
username, | |
password, | |
phone_number, | |
# profile_path, | |
salary, | |
rate, | |
uploads={}, | |
filename='output.csv', | |
blacklist=[], | |
blackListTitles=[], | |
experience_level=[] | |
) -> None: | |
setupLogger() # Moved to correct place to call it | |
log.info("Welcome to Easy Apply Bot") | |
dirpath: str = os.getcwd() | |
log.info("current directory is : " + dirpath) | |
log.info("Please wait while we prepare the bot for you") | |
if experience_level: | |
experience_levels = { | |
1: "Entry level", | |
2: "Associate", | |
3: "Mid-Senior level", | |
4: "Director", | |
5: "Executive", | |
6: "Internship" | |
} | |
applied_levels = [experience_levels[level] for level in experience_level] | |
log.info("Applying for experience level roles: " + ", ".join(applied_levels)) | |
else: | |
log.info("Applying for all experience levels") | |
self.uploads = uploads | |
self.salary = salary | |
self.rate = rate | |
# self.profile_path = profile_path | |
past_ids: list | None = self.get_appliedIDs(filename) | |
self.appliedJobIDs: list = past_ids if past_ids != None else [] | |
self.filename: str = filename | |
self.options = self.browser_options() | |
self.browser = webdriver.Chrome(service=ChromeService(ChromeDriverManager().install()), options=self.options) | |
self.wait = WebDriverWait(self.browser, 30) | |
self.blacklist = blacklist | |
self.blackListTitles = blackListTitles | |
self.start_linkedin(username, password) | |
self.phone_number = phone_number | |
self.experience_level = experience_level | |
self.locator = { | |
"next": (By.CSS_SELECTOR, "button[aria-label='Continue to next step']"), | |
"review": (By.CSS_SELECTOR, "button[aria-label='Review your application']"), | |
"submit": (By.CSS_SELECTOR, "button[aria-label='Submit application']"), | |
"error": (By.CLASS_NAME, "artdeco-inline-feedback__message"), | |
"upload_resume": (By.XPATH, "//*[contains(@id, 'jobs-document-upload-file-input-upload-resume')]"), | |
"upload_cv": (By.XPATH, "//*[contains(@id, 'jobs-document-upload-file-input-upload-cover-letter')]"), | |
"follow": (By.CSS_SELECTOR, "label[for='follow-company-checkbox']"), | |
"upload": (By.NAME, "file"), | |
"search": (By.CLASS_NAME, "jobs-search-results-list"), | |
"links": ("xpath", '//div[@data-job-id]'), | |
"fields": (By.CLASS_NAME, "jobs-easy-apply-form-section__grouping"), | |
"radio_select": (By.CSS_SELECTOR, "input[type='radio']"), #need to append [value={}].format(answer) | |
"multi_select": (By.XPATH, "//*[contains(@id, 'text-entity-list-form-component')]"), | |
"text_select": (By.CLASS_NAME, "artdeco-text-input--input"), | |
"2fa_oneClick": (By.ID, 'reset-password-submit-button'), | |
"easy_apply_button": (By.XPATH, '//button[contains(@class, "jobs-apply-button")]') | |
} | |
#initialize questions and answers file | |
self.qa_file = Path("qa.csv") | |
self.answers = {} | |
#if qa file does not exist, create it | |
if self.qa_file.is_file(): | |
df = pd.read_csv(self.qa_file) | |
for index, row in df.iterrows(): | |
self.answers[row['Question']] = row['Answer'] | |
#if qa file does exist, load it | |
else: | |
df = pd.DataFrame(columns=["Question", "Answer"]) | |
df.to_csv(self.qa_file, index=False, encoding='utf-8') | |
def get_appliedIDs(self, filename) -> list | None: | |
try: | |
df = pd.read_csv(filename, | |
header=None, | |
names=['timestamp', 'jobID', 'job', 'company', 'attempted', 'result'], | |
lineterminator='\n', | |
encoding='utf-8') | |
df['timestamp'] = pd.to_datetime(df['timestamp'], format="%Y-%m-%d %H:%M:%S") | |
df = df[df['timestamp'] > (datetime.now() - timedelta(days=2))] | |
jobIDs: list = list(df.jobID) | |
log.info(f"{len(jobIDs)} jobIDs found") | |
return jobIDs | |
except Exception as e: | |
log.info(str(e) + " jobIDs could not be loaded from CSV {}".format(filename)) | |
return None | |
def browser_options(self): | |
options = webdriver.ChromeOptions() | |
options.add_argument("--start-maximized") | |
options.add_argument("--ignore-certificate-errors") | |
options.add_argument('--no-sandbox') | |
options.add_argument("--disable-extensions") | |
#options.add_argument(r'--remote-debugging-port=9222') | |
#options.add_argument(r'--profile-directory=Person 1') | |
# Disable webdriver flags or you will be easily detectable | |
options.add_argument("--disable-blink-features") | |
options.add_argument("--disable-blink-features=AutomationControlled") | |
# Load user profile | |
#options.add_argument(r"--user-data-dir={}".format(self.profile_path)) | |
return options | |
def start_linkedin(self, username, password) -> None: | |
log.info("Logging in.....Please wait :) ") | |
self.browser.get("https://www.linkedin.com/login?trk=guest_homepage-basic_nav-header-signin") | |
try: | |
user_field = self.browser.find_element("id","username") | |
pw_field = self.browser.find_element("id","password") | |
login_button = self.browser.find_element("xpath", | |
# '//*[@id="organic-div"]/form/div[3]/button') | |
'//button[@type="submit"]') | |
user_field.send_keys(username) | |
user_field.send_keys(Keys.TAB) | |
time.sleep(2) | |
pw_field.send_keys(password) | |
time.sleep(2) | |
login_button.click() | |
time.sleep(15) | |
# if self.is_present(self.locator["2fa_oneClick"]): | |
# oneclick_auth = self.browser.find_element(by='id', value='reset-password-submit-button') | |
# if oneclick_auth is not None: | |
# log.info("additional authentication required, sleep for 15 seconds so you can do that") | |
# time.sleep(15) | |
# else: | |
# time.sleep() | |
except TimeoutException: | |
log.info("TimeoutException! Username/password field or login button not found") | |
def fill_data(self) -> None: | |
self.browser.set_window_size(1, 1) | |
self.browser.set_window_position(2000, 2000) | |
def start_apply(self, positions, locations) -> None: | |
start: float = time.time() | |
self.fill_data() | |
self.positions = positions | |
self.locations = locations | |
combos: list = [] | |
while len(combos) < len(positions) * len(locations): | |
position = positions[random.randint(0, len(positions) - 1)] | |
location = locations[random.randint(0, len(locations) - 1)] | |
combo: tuple = (position, location) | |
if combo not in combos: | |
combos.append(combo) | |
log.info(f"Applying to {position}: {location}") | |
location = "&location=" + location | |
self.applications_loop(position, location) | |
if len(combos) > 500: | |
break | |
# self.finish_apply() --> this does seem to cause more harm than good, since it closes the browser which we usually don't want, other conditions will stop the loop and just break out | |
def applications_loop(self, position, location): | |
count_application = 0 | |
count_job = 0 | |
jobs_per_page = 0 | |
start_time: float = time.time() | |
log.info("Looking for jobs.. Please wait..") | |
self.browser.set_window_position(1, 1) | |
self.browser.maximize_window() | |
self.browser, _ = self.next_jobs_page(position, location, jobs_per_page, experience_level=self.experience_level) | |
log.info("Looking for jobs.. Please wait..") | |
while time.time() - start_time < self.MAX_SEARCH_TIME: | |
try: | |
log.info(f"{(self.MAX_SEARCH_TIME - (time.time() - start_time)) // 60} minutes left in this search") | |
# sleep to make sure everything loads, add random to make us look human. | |
randoTime: float = random.uniform(1.5, 2.9) | |
log.debug(f"Sleeping for {round(randoTime, 1)}") | |
#time.sleep(randoTime) | |
self.load_page(sleep=0.5) | |
# LinkedIn displays the search results in a scrollable <div> on the left side, we have to scroll to its bottom | |
# scroll to bottom | |
if self.is_present(self.locator["search"]): | |
scrollresults = self.get_elements("search") | |
# self.browser.find_element(By.CLASS_NAME, | |
# "jobs-search-results-list" | |
# ) | |
# Selenium only detects visible elements; if we scroll to the bottom too fast, only 8-9 results will be loaded into IDs list | |
for i in range(300, 3000, 100): | |
self.browser.execute_script("arguments[0].scrollTo(0, {})".format(i), scrollresults[0]) | |
scrollresults = self.get_elements("search") | |
#time.sleep(1) | |
# get job links, (the following are actually the job card objects) | |
if self.is_present(self.locator["links"]): | |
links = self.get_elements("links") | |
# links = self.browser.find_elements("xpath", | |
# '//div[@data-job-id]' | |
# ) | |
jobIDs = {} #{Job id: processed_status} | |
# children selector is the container of the job cards on the left | |
for link in links: | |
if 'Applied' not in link.text: #checking if applied already | |
if link.text not in self.blacklist: #checking if blacklisted | |
jobID = link.get_attribute("data-job-id") | |
if jobID == "search": | |
log.debug("Job ID not found, search keyword found instead? {}".format(link.text)) | |
continue | |
else: | |
jobIDs[jobID] = "To be processed" | |
if len(jobIDs) > 0: | |
self.apply_loop(jobIDs) | |
self.browser, jobs_per_page = self.next_jobs_page(position, | |
location, | |
jobs_per_page, | |
experience_level=self.experience_level) | |
else: | |
self.browser, jobs_per_page = self.next_jobs_page(position, | |
location, | |
jobs_per_page, | |
experience_level=self.experience_level) | |
except Exception as e: | |
print(e) | |
def apply_loop(self, jobIDs): | |
for jobID in jobIDs: | |
if jobIDs[jobID] == "To be processed": | |
applied = self.apply_to_job(jobID) | |
if applied: | |
log.info(f"Applied to {jobID}") | |
else: | |
log.info(f"Failed to apply to {jobID}") | |
jobIDs[jobID] = applied | |
def apply_to_job(self, jobID): | |
# #self.avoid_lock() # annoying | |
# get job page | |
self.get_job_page(jobID) | |
# let page load | |
time.sleep(1) | |
# get easy apply button | |
button = self.get_easy_apply_button() | |
# word filter to skip positions not wanted | |
if button is not False: | |
if any(word in self.browser.title for word in self.blackListTitles): | |
log.info('skipping this application, a blacklisted keyword was found in the job position') | |
string_easy = "* Contains blacklisted keyword" | |
result = False | |
else: | |
string_easy = "* has Easy Apply Button" | |
log.info("Clicking the EASY apply button") | |
button.click() | |
clicked = True | |
time.sleep(1) | |
self.fill_out_fields() | |
result: bool = self.send_resume() | |
if result: | |
string_easy = "*Applied: Sent Resume" | |
else: | |
string_easy = "*Did not apply: Failed to send Resume" | |
elif "You applied on" in self.browser.page_source: | |
log.info("You have already applied to this position.") | |
string_easy = "* Already Applied" | |
result = False | |
else: | |
log.info("The Easy apply button does not exist.") | |
string_easy = "* Doesn't have Easy Apply Button" | |
result = False | |
# position_number: str = str(count_job + jobs_per_page) | |
log.info(f"\nPosition {jobID}:\n {self.browser.title} \n {string_easy} \n") | |
self.write_to_file(button, jobID, self.browser.title, result) | |
return result | |
def write_to_file(self, button, jobID, browserTitle, result) -> None: | |
def re_extract(text, pattern): | |
target = re.search(pattern, text) | |
if target: | |
target = target.group(1) | |
return target | |
timestamp: str = datetime.now().strftime('%Y-%m-%d %H:%M:%S') | |
attempted: bool = False if button == False else True | |
job = re_extract(browserTitle.split(' | ')[0], r"\(?\d?\)?\s?(\w.*)") | |
company = re_extract(browserTitle.split(' | ')[1], r"(\w.*)") | |
toWrite: list = [timestamp, jobID, job, company, attempted, result] | |
with open(self.filename, 'a+') as f: | |
writer = csv.writer(f) | |
writer.writerow(toWrite) | |
def get_job_page(self, jobID): | |
job: str = 'https://www.linkedin.com/jobs/view/' + str(jobID) | |
self.browser.get(job) | |
self.job_page = self.load_page(sleep=0.5) | |
return self.job_page | |
def get_easy_apply_button(self): | |
EasyApplyButton = False | |
try: | |
buttons = self.get_elements("easy_apply_button") | |
# buttons = self.browser.find_elements("xpath", | |
# '//button[contains(@class, "jobs-apply-button")]' | |
# ) | |
for button in buttons: | |
if "Easy Apply" in button.text: | |
EasyApplyButton = button | |
self.wait.until(EC.element_to_be_clickable(EasyApplyButton)) | |
else: | |
log.debug("Easy Apply button not found") | |
except Exception as e: | |
print("Exception:",e) | |
log.debug("Easy Apply button not found") | |
return EasyApplyButton | |
def fill_out_fields(self): | |
fields = self.browser.find_elements(By.CLASS_NAME, "jobs-easy-apply-form-section__grouping") | |
for field in fields: | |
if "Mobile phone number" in field.text: | |
field_input = field.find_element(By.TAG_NAME, "input") | |
field_input.clear() | |
field_input.send_keys(self.phone_number) | |
return | |
def get_elements(self, type) -> list: | |
elements = [] | |
element = self.locator[type] | |
if self.is_present(element): | |
elements = self.browser.find_elements(element[0], element[1]) | |
return elements | |
def is_present(self, locator): | |
return len(self.browser.find_elements(locator[0], | |
locator[1])) > 0 | |
def send_resume(self) -> bool: | |
def is_present(button_locator) -> bool: | |
return len(self.browser.find_elements(button_locator[0], | |
button_locator[1])) > 0 | |
try: | |
#time.sleep(random.uniform(1.5, 2.5)) | |
next_locator = (By.CSS_SELECTOR, | |
"button[aria-label='Continue to next step']") | |
review_locator = (By.CSS_SELECTOR, | |
"button[aria-label='Review your application']") | |
submit_locator = (By.CSS_SELECTOR, | |
"button[aria-label='Submit application']") | |
error_locator = (By.CLASS_NAME,"artdeco-inline-feedback__message") | |
upload_resume_locator = (By.XPATH, '//span[text()="Upload resume"]') | |
upload_cv_locator = (By.XPATH, '//span[text()="Upload cover letter"]') | |
# WebElement upload_locator = self.browser.find_element(By.NAME, "file") | |
follow_locator = (By.CSS_SELECTOR, "label[for='follow-company-checkbox']") | |
submitted = False | |
loop = 0 | |
while loop < 2: | |
time.sleep(1) | |
# Upload resume | |
# if is_present(upload_resume_locator): | |
#upload_locator = self.browser.find_element(By.NAME, "file") | |
# try: | |
# resume_locator = self.browser.find_element(By.XPATH, "//*[contains(@id, 'jobs-document-upload-file-input-upload-resume')]") | |
# resume_path = os.path.abspath(self.uploads["Resume"]) | |
# if os.path.exists(resume_path): | |
# resume_locator.send_keys(resume_path) | |
# else: | |
# log.error(f"Resume file not found at: {resume_path}") | |
# return False | |
# resume = self.uploads["Resume"] | |
# resume_locator.send_keys(resume) | |
# except Exception as e: | |
# log.error(e) | |
# log.error("Resume upload failed") | |
# log.debug("Resume: " + resume) | |
# log.debug("Resume Locator: " + str(resume_locator)) | |
if is_present(upload_resume_locator): | |
try: | |
resume_locator = self.browser.find_element(By.XPATH, "//*[contains(@id, 'jobs-document-upload-file-input-upload-resume')]") | |
resume_path = os.path.abspath(self.uploads["Resume"]) | |
if os.path.exists(resume_path): | |
resume_locator.send_keys(resume_path) | |
else: | |
log.error(f"Resume file not found at: {resume_path}") | |
return False # Only return False if the file doesn't exist | |
except Exception as e: | |
log.error(e) | |
log.error("Resume upload failed") | |
log.debug("Resume Locator: " + str(resume_locator)) | |
# Upload cover letter if possible | |
if is_present(upload_cv_locator): | |
try: | |
cv_path = os.path.abspath(self.uploads.get("Cover Letter", "")) | |
if os.path.exists(cv_path): | |
cv_locator = self.browser.find_element(By.XPATH, "//*[contains(@id, 'jobs-document-upload-file-input-upload-cover-letter')]") | |
cv_locator.send_keys(cv_path) | |
log.info("Cover Letter uploaded successfully.") | |
else: | |
log.warning(f"Cover Letter file not found at: {cv_path}") | |
except Exception as e: | |
log.error(f"Error uploading Cover Letter: {e}") | |
# Handle 'Follow Company' Checkbox | |
follow_elements = self.get_elements("follow") | |
for element in follow_elements: | |
try: | |
button = self.wait.until(EC.element_to_be_clickable(element)) | |
button.click() | |
log.info("Clicked 'Follow Company' checkbox.") | |
except Exception as e: | |
log.warning(f"Error clicking 'Follow' checkbox: {e}") | |
# Handle Submit | |
submit_elements = self.get_elements("submit") | |
if submit_elements: | |
for element in submit_elements: | |
try: | |
button = self.wait.until(EC.element_to_be_clickable(element)) | |
button.click() | |
log.info("Application Submitted.") | |
submitted = True | |
break | |
except Exception as e: | |
log.error(f"Error clicking 'Submit' button: {e}") | |
# Handle Errors / Questions | |
elif self.get_elements("error"): | |
max_retries = 3 | |
retry_count = 0 | |
while retry_count < max_retries: | |
log.info(f"Answering application questions... Attempt {retry_count + 1}/{max_retries}") | |
self.process_questions() | |
time.sleep(1) | |
if "application was sent" in self.browser.page_source: | |
log.info("Application successfully submitted after answering questions.") | |
submitted = True | |
break | |
if is_present(self.locator["easy_apply_button"]): | |
log.info("Easy Apply button still present. Skipping application.") | |
submitted = False | |
break | |
retry_count += 1 | |
if retry_count >= max_retries: | |
log.info("Max retries reached. Skipping application.") | |
# Handle Next Button (Multi-Step Forms) | |
elif self.get_elements("next"): | |
next_elements = self.get_elements("next") | |
for element in next_elements: | |
try: | |
button = self.wait.until(EC.element_to_be_clickable(element)) | |
button.click() | |
log.info("Clicked 'Next' button to continue application.") | |
except Exception as e: | |
log.warning(f"Error clicking 'Next' button: {e}") | |
# Handle Review Button | |
elif self.get_elements("review"): | |
review_elements = self.get_elements("review") | |
for element in review_elements: | |
try: | |
button = self.wait.until(EC.element_to_be_clickable(element)) | |
button.click() | |
log.info("Clicked 'Review' button.") | |
except Exception as e: | |
log.warning(f"Error clicking 'Review' button: {e}") | |
else: | |
log.info("No further actions found. Application incomplete or requires manual review.") | |
time.sleep(2) | |
# if is_present(upload_cv_locator): | |
# cv = self.uploads["Cover Letter"] | |
# cv_locator = self.browser.find_element(By.XPATH, "//*[contains(@id, 'jobs-document-upload-file-input-upload-cover-letter')]") | |
# cv_locator.send_keys(cv) | |
# #time.sleep(random.uniform(4.5, 6.5)) | |
# elif len(self.get_elements("follow")) > 0: | |
# elements = self.get_elements("follow") | |
# for element in elements: | |
# button = self.wait.until(EC.element_to_be_clickable(element)) | |
# button.click() | |
# if len(self.get_elements("submit")) > 0: | |
# elements = self.get_elements("submit") | |
# for element in elements: | |
# button = self.wait.until(EC.element_to_be_clickable(element)) | |
# button.click() | |
# log.info("Application Submitted") | |
# submitted = True | |
# break | |
# elif len(self.get_elements("error")) > 0: | |
# elements = self.get_elements("error") | |
# if "application was sent" in self.browser.page_source: | |
# log.info("Application Submitted") | |
# submitted = True | |
# break | |
# elif len(elements) > 0: | |
# max_retries = 3 | |
# retry_count = 0 | |
# while len(elements) > 0 and retry_count < max_retries: | |
# log.info(f"Please answer the questions, attempt {retry_count + 1}/{max_retries}, waiting 1 second...") | |
# time.sleep(1) | |
# elements = self.get_elements("error") | |
# for element in elements: | |
# self.process_questions() | |
# if "application was sent" in self.browser.page_source: | |
# log.info("Application Submitted") | |
# submitted = True | |
# break | |
# elif is_present(self.locator["easy_apply_button"]): | |
# log.info("Skipping application") | |
# submitted = False | |
# break | |
# retry_count += 1 | |
# if retry_count >= max_retries: | |
# log.info("Max retries reached. Skipping application.") | |
# continue | |
# else: | |
# log.info("Application not submitted") | |
# time.sleep(2) | |
# break | |
# # elif len(self.get_elements("error")) > 0: | |
# # elements = self.get_elements("error") | |
# # if "application was sent" in self.browser.page_source: | |
# # log.info("Application Submitted") | |
# # submitted = True | |
# # break | |
# # elif len(elements) > 0: | |
# # while len(elements) > 0: | |
# # log.info("Please answer the questions, waiting 5 seconds...") | |
# # time.sleep(5) | |
# # elements = self.get_elements("error") | |
# # for element in elements: | |
# # self.process_questions() | |
# # if "application was sent" in self.browser.page_source: | |
# # log.info("Application Submitted") | |
# # submitted = True | |
# # break | |
# # elif is_present(self.locator["easy_apply_button"]): | |
# # log.info("Skipping application") | |
# # submitted = False | |
# # break | |
# # continue | |
# # #add explicit wait | |
# # else: | |
# # log.info("Application not submitted") | |
# # time.sleep(2) | |
# # break | |
# # # self.process_questions() | |
# elif len(self.get_elements("next")) > 0: | |
# elements = self.get_elements("next") | |
# for element in elements: | |
# button = self.wait.until(EC.element_to_be_clickable(element)) | |
# button.click() | |
# elif len(self.get_elements("review")) > 0: | |
# elements = self.get_elements("review") | |
# for element in elements: | |
# button = self.wait.until(EC.element_to_be_clickable(element)) | |
# button.click() | |
# elif len(self.get_elements("follow")) > 0: | |
# elements = self.get_elements("follow") | |
# for element in elements: | |
# button = self.wait.until(EC.element_to_be_clickable(element)) | |
# button.click() | |
except Exception as e: | |
log.error(e) | |
log.error("cannot apply to this job") | |
pass | |
#raise (e) | |
return submitted | |
# def process_questions(self): | |
# time.sleep(1) | |
# form = self.get_elements("fields") | |
# for field in form: | |
# question = field.text.lower() | |
# answer = self.get_auto_answer(question) | |
# log.info(f"Auto-answering question: '{question}' with answer: '{answer}'") | |
# if answer is None: | |
# log.info(f"Skipping question (no default answer found): '{question}'") | |
# continue | |
# try: | |
# # --- Handle direct input fields (text/numbers) --- | |
# inputs = field.find_elements(By.TAG_NAME, "input") | |
# for input_field in inputs: | |
# input_type = input_field.get_attribute("type") | |
# if input_type in ["text", "number"]: | |
# input_field.clear() | |
# input_field.send_keys(str(answer)) | |
# input_field.send_keys(Keys.TAB) | |
# log.info(f"Entered '{answer}' into text/number field for '{question}'.") | |
# break | |
# # --- Handle radio buttons --- | |
# radios = field.find_elements(By.CSS_SELECTOR, "input[type='radio']") | |
# for radio in radios: | |
# if radio.get_attribute("value").lower() == str(answer).lower(): | |
# self.browser.execute_script("arguments[0].click();", radio) | |
# log.info(f"Selected radio option '{answer}' for '{question}'.") | |
# break | |
# # --- Handle dropdowns --- | |
# selects = field.find_elements(By.TAG_NAME, "select") | |
# for select in selects: | |
# options = select.find_elements(By.TAG_NAME, "option") | |
# for option in options: | |
# if option.text.strip().lower() == str(answer).lower(): | |
# option.click() | |
# log.info(f"Selected dropdown option '{answer}' for '{question}'.") | |
# break | |
# except Exception as e: | |
# log.error(f"Error filling field for question '{question}': {e}") | |
def process_questions(self): | |
time.sleep(1) | |
form = self.get_elements("fields") | |
for field in form: | |
question = field.text.lower() | |
answer = self.get_auto_answer(question) | |
log.info(f"Auto-answering question: '{question}' with answer: '{answer}'") | |
if answer is None: | |
log.info(f"Skipping question (no default answer found): '{question}'") | |
continue | |
try: | |
# --- Handle Text and Number Inputs --- | |
input_fields = field.find_elements(By.TAG_NAME, "input") | |
for input_field in input_fields: | |
input_type = input_field.get_attribute("type") | |
if input_type in ["text", "number"]: | |
input_field.clear() | |
input_field.send_keys(str(answer)) | |
input_field.send_keys(Keys.RETURN) | |
log.info(f"Filled {input_type} field with '{answer}'.") | |
break # Move to the next question | |
# --- Handle Radio Buttons --- | |
radio_buttons = field.find_elements(By.CSS_SELECTOR, "input[type='radio']") | |
for radio in radio_buttons: | |
if radio.get_attribute("value").lower() == str(answer).lower(): | |
self.browser.execute_script("arguments[0].click();", radio) | |
log.info(f"Selected radio option '{answer}'.") | |
break | |
# --- Handle Standard Dropdowns --- | |
selects = field.find_elements(By.TAG_NAME, "select") | |
for select in selects: | |
options = select.find_elements(By.TAG_NAME, "option") | |
for option in options: | |
if option.text.strip().lower() == str(answer).lower(): | |
option.click() | |
log.info(f"Selected standard dropdown option '{answer}'.") | |
break | |
# --- Dynamic Language Dropdown Handling --- | |
for language in self.answers.get("languages", {}).keys(): | |
if language.lower() in question: | |
lang_answer = self.answers["languages"].get(language, "None") | |
log.info(f"Handling language proficiency for '{language}' with answer '{lang_answer}'.") | |
try: | |
# Try standard <select> first | |
lang_select = self.browser.find_element(By.XPATH, f"//label[contains(translate(text(), 'ABCDEFGHIJKLMNOPQRSTUVWXYZ', 'abcdefghijklmnopqrstuvwxyz'), '{language.lower()}')]/following-sibling::select") | |
Select(lang_select).select_by_visible_text(lang_answer) | |
log.info(f"Selected '{language}' proficiency using standard <select>.") | |
except: | |
try: | |
# Handle LinkedIn-style custom dropdown | |
lang_label = self.browser.find_element(By.XPATH, f"//label[contains(translate(text(), 'ABCDEFGHIJKLMNOPQRSTUVWXYZ', 'abcdefghijklmnopqrstuvwxyz'), '{language.lower()}')]") | |
dropdown_trigger = lang_label.find_element(By.XPATH, "../following-sibling::*//button | ../following-sibling::*//div[contains(@class, 'dropdown')]") | |
dropdown_trigger.click() | |
time.sleep(0.5) # Wait for options to appear | |
options = self.browser.find_elements(By.XPATH, f"//li[contains(translate(text(), 'ABCDEFGHIJKLMNOPQRSTUVWXYZ', 'abcdefghijklmnopqrstuvwxyz'), '{lang_answer.lower()}')]") | |
if options: | |
options[0].click() | |
log.info(f"Selected '{language}' proficiency '{lang_answer}' using custom dropdown.") | |
else: | |
log.warning(f"No matching '{language}' proficiency option found in custom dropdown.") | |
except Exception as e: | |
log.warning(f"Could not handle '{language}' proficiency dropdown: {e}") | |
except Exception as e: | |
log.error(f"Error during question processing: {e}") | |
# def process_questions(self): | |
# time.sleep(1) | |
# try: | |
# # 1. Handle "How many years of work experience do you have with Sales?" | |
# sales_input = self.browser.find_element(By.XPATH, "//label[contains(text(), 'Sales')]/following-sibling::input") | |
# sales_input.clear() | |
# sales_input.send_keys(str(self.get_auto_answer("sales"))) | |
# sales_input.send_keys(Keys.RETURN) | |
# log.info("Filled Sales experience field.") | |
# # 2. Handle "How many years of work experience do you have with Business Travel?" | |
# travel_input = self.browser.find_element(By.XPATH, "//label[contains(text(), 'Business Travel')]/following-sibling::input") | |
# travel_input.clear() | |
# travel_input.send_keys(str(self.get_auto_answer("business travel"))) | |
# travel_input.send_keys(Keys.RETURN) | |
# log.info("Filled Business Travel experience field.") | |
# # 3. Handle Chinese language proficiency dropdown | |
# chinese_dropdown = self.browser.find_element(By.XPATH, "//label[contains(text(), 'Chinese')]/following-sibling::select") | |
# Select(chinese_dropdown).select_by_visible_text(self.get_auto_answer("chinese")) | |
# log.info("Selected Chinese proficiency.") | |
# # 4. Handle English language proficiency dropdown | |
# english_dropdown = self.browser.find_element(By.XPATH, "//label[contains(text(), 'English')]/following-sibling::select") | |
# Select(english_dropdown).select_by_visible_text(self.get_auto_answer("english")) | |
# log.info("Selected English proficiency.") | |
# # 5. Handle Bachelor’s Degree Yes/No | |
# bachelor_yes = self.browser.find_element(By.XPATH, "//label[contains(text(), \"Bachelor's Degree\")]/following-sibling::*//label[contains(., 'Yes')]") | |
# bachelor_yes.click() | |
# log.info("Selected Bachelor's Degree: Yes.") | |
# except Exception as e: | |
# log.error(f"Error during question processing: {e}") | |
# def process_questions(self): | |
# time.sleep(1) | |
# form = self.get_elements("fields") | |
# for field in form: | |
# question = field.text.lower() | |
# answer = self.get_auto_answer(question) | |
# log.info(f"Auto-answering question: '{question}' with answer: '{answer}'") | |
# if answer is None: | |
# log.info(f"Skipping question (no default answer found): '{question}'") | |
# continue | |
# try: | |
# # Handle text/numeric fields (e.g., experience years) | |
# input_fields = field.find_elements(By.TAG_NAME, "input") | |
# for input_field in input_fields: | |
# input_type = input_field.get_attribute("type") | |
# if input_type in ["text", "number"]: | |
# input_field.clear() | |
# input_field.send_keys(str(answer)) | |
# input_field.send_keys(Keys.RETURN) | |
# log.info(f"Entered '{answer}' into text/number field.") | |
# break | |
# # Handle radio buttons | |
# radio_buttons = field.find_elements(By.CSS_SELECTOR, "input[type='radio']") | |
# for radio in radio_buttons: | |
# if radio.get_attribute("value").lower() == answer.lower(): | |
# self.browser.execute_script("arguments[0].click();", radio) | |
# log.info(f"Selected radio option: '{answer}'") | |
# break | |
# # Handle dropdowns (select elements) | |
# selects = field.find_elements(By.TAG_NAME, "select") | |
# for select in selects: | |
# options = select.find_elements(By.TAG_NAME, "option") | |
# for option in options: | |
# if option.text.strip().lower() == answer.lower(): | |
# option.click() | |
# log.info(f"Selected dropdown option: '{answer}'") | |
# break | |
# except Exception as e: | |
# log.error(f"Error filling field for question '{question}': {e}") | |
# def process_questions(self): | |
# time.sleep(1) | |
# form = self.get_elements("fields") | |
# for field in form: | |
# question = field.text.lower() | |
# answer = self.get_auto_answer(question) | |
# log.info(f"Auto-answering question: '{question}' with answer: '{answer}'") | |
# if answer is None: | |
# log.info(f"Skipping question (no default answer found): '{question}'") | |
# continue | |
# try: | |
# # Numeric & text fields | |
# if self.is_present(self.locator["text_select"]): | |
# input_field = field.find_element(self.locator["text_select"]) | |
# input_field.clear() | |
# input_field.send_keys(str(answer)) | |
# input_field.send_keys(Keys.RETURN) | |
# # Radio buttons | |
# elif self.is_present(self.locator["radio_select"]): | |
# input_field = field.find_element(By.CSS_SELECTOR, f"input[type='radio'][value='{answer}']") | |
# self.browser.execute_script("arguments[0].click();", input_field) | |
# # Dropdowns using <select> element | |
# elif field.find_elements(By.TAG_NAME, "select"): | |
# select_element = field.find_element(By.TAG_NAME, "select") | |
# select = Select(select_element) | |
# try: | |
# select.select_by_visible_text(answer) | |
# except: | |
# log.warning(f"Option '{answer}' not found for question '{question}'") | |
# except Exception as e: | |
# log.error(f"Error filling field for question '{question}': {e}") | |
# try: | |
# # --- Special Handling for Numeric Fields --- | |
# if "how many years" in question: | |
# input_field = field.find_element(self.locator["text_select"]) | |
# input_field.clear() | |
# input_field.send_keys(str(answer)) | |
# input_field.send_keys(Keys.RETURN) | |
# continue # Skip remaining checks for this field | |
# # --- Special Handling for Chinese Proficiency Dropdown --- | |
# if "proficiency in chinese" in question: | |
# dropdown = field.find_element(By.TAG_NAME, "select") | |
# for option in dropdown.find_elements(By.TAG_NAME, "option"): | |
# if option.text.strip().lower() == answer.lower(): | |
# option.click() | |
# break | |
# continue # Skip remaining checks for this field | |
# # --- Generic Handling for Text Inputs --- | |
# if self.is_present(self.locator["text_select"]): | |
# input_field = field.find_element(self.locator["text_select"]) | |
# input_field.clear() | |
# input_field.send_keys(str(answer)) | |
# input_field.send_keys(Keys.RETURN) | |
# # --- Generic Handling for Radio Buttons --- | |
# elif self.is_present(self.locator["radio_select"]): | |
# input_field = field.find_element(By.CSS_SELECTOR, f"input[type='radio'][value='{answer}']") | |
# self.browser.execute_script("arguments[0].click();", input_field) | |
# # --- Generic Handling for Other Dropdowns --- | |
# elif "select an option" in field.text.lower(): | |
# try: | |
# select_element = field.find_element(By.TAG_NAME, "select") | |
# for option in select_element.find_elements(By.TAG_NAME, "option"): | |
# if option.text.strip().lower() == answer.lower(): | |
# option.click() | |
# break | |
# except Exception as e: | |
# log.warning(f"Could not select dropdown option for '{question}': {e}") | |
# except Exception as e: | |
# log.error(f"Error filling field: {e}") | |
# def process_questions(self): | |
# time.sleep(1) | |
# form = self.get_elements("fields") | |
# for field in form: | |
# question = field.text.lower() | |
# answer = self.get_auto_answer(question) | |
# log.info(f"Auto-answering question: '{question}' with answer: '{answer}'") | |
# if answer is None: | |
# log.info(f"Skipping question (no default answer found): '{question}'") | |
# continue | |
# try: | |
# # Handle numeric and text inputs | |
# if self.is_present(self.locator["text_select"]): | |
# input_field = field.find_element(self.locator["text_select"]) | |
# input_field.clear() | |
# input_field.send_keys(str(answer)) | |
# input_field.send_keys(Keys.RETURN) | |
# # Handle radio buttons | |
# elif self.is_present(self.locator["radio_select"]): | |
# input_field = field.find_element(By.CSS_SELECTOR, f"input[type='radio'][value='{answer}']") | |
# self.browser.execute_script("arguments[0].click();", input_field) | |
# # Handle dropdowns explicitly | |
# elif "select an option" in field.text.lower(): | |
# try: | |
# select_element = field.find_element(By.TAG_NAME, "select") | |
# for option in select_element.find_elements(By.TAG_NAME, "option"): | |
# if option.text.strip().lower() == answer.lower(): | |
# option.click() | |
# break | |
# except Exception as e: | |
# log.warning(f"Could not select dropdown option for '{question}': {e}") | |
# except Exception as e: | |
# log.error(f"Error filling field: {e}") | |
# def process_questions(self): | |
# time.sleep(1) | |
# form = self.get_elements("fields") | |
# for field in form: | |
# question = field.text.lower() | |
# answer = self.get_auto_answer(question) | |
# log.info(f"Auto-answering question: '{question}' with answer: '{answer}'") | |
# if answer is None: | |
# log.info(f"Skipping question (no default answer found): '{question}'") | |
# continue # Skip if no default answer is found | |
# try: | |
# if self.is_present(self.locator["radio_select"]): | |
# input = field.find_element(By.CSS_SELECTOR, f"input[type='radio'][value='{answer}']") | |
# self.browser.execute_script("arguments[0].click();", input) | |
# elif self.is_present(self.locator["multi_select"]): | |
# input = field.find_element(self.locator["multi_select"]) | |
# input.send_keys(answer) | |
# elif self.is_present(self.locator["text_select"]): | |
# input = field.find_element(self.locator["text_select"]) | |
# input.send_keys(str(answer)) | |
# input.send_keys(Keys.RETURN) | |
# except Exception as e: | |
# log.error(f"Error filling field: {e}") | |
def get_auto_answer(self, question): | |
params = self.answers if hasattr(self, 'answers') else {} | |
question = question.lower() # Normalize question text for easier matching | |
# Handle visa and authorization questions | |
if "visa" in question: | |
return str(params.get("auto_answer_questions", {}).get("requireVisa", "No")) | |
if "authorized to work" in question: | |
return str(params.get("auto_answer_questions", {}).get("legallyAuthorized", "Yes")) | |
if "background check" in question: | |
return str(params.get("auto_answer_questions", {}).get("backgroundCheck", "Yes")) | |
if "start immediately" in question: | |
return str(params.get("auto_answer_questions", {}).get("urgentFill", "Yes")) | |
# Handle degree-related questions | |
if "degree" in question or "education" in question: | |
degrees = params.get("auto_answer_questions", {}).get("degreeCompleted", []) | |
return degrees[0] if degrees else "Bachelor's Degree" | |
# Handle language proficiency questions | |
if "proficiency in chinese" in question: | |
return params.get("languages", {}).get("chinese", "None") | |
if "proficiency in english" in question: | |
return params.get("languages", {}).get("english", "Native or bilingual") | |
# Handle experience-related questions | |
industry_exp = params.get("industry_experience", {}) | |
for industry in industry_exp: | |
if industry.lower() in question: | |
return str(industry_exp[industry]) # Ensure it's a string for input fields | |
# Default fallback | |
return "0" # Safe default for numeric fields like years of experience | |
# def get_auto_answer(self, question): | |
# params = self.answers if hasattr(self, 'answers') else {} | |
# log.debug(f"Processing question: {question}") | |
# if "visa" in question: | |
# return "No" if not params.get("auto_answer_questions", {}).get("requireVisa", False) else "Yes" | |
# elif "authorized to work" in question: | |
# return "Yes" if params.get("auto_answer_questions", {}).get("legallyAuthorized", True) else "No" | |
# elif "background check" in question: | |
# return "Yes" if params.get("auto_answer_questions", {}).get("backgroundCheck", True) else "No" | |
# elif "start immediately" in question: | |
# return "Yes" if params.get("auto_answer_questions", {}).get("urgentFill", True) else "No" | |
# elif "degree" in question: | |
# degrees = params.get("auto_answer_questions", {}).get("degreeCompleted", []) | |
# return degrees[0] if degrees else "Bachelor's Degree" | |
# elif "language" in question: | |
# if "chinese" in question: | |
# return params.get("languages", {}).get("chinese", "None") | |
# if "english" in question: | |
# return params.get("languages", {}).get("english", "Native or bilingual") | |
# elif "experience" in question: | |
# industry_exp = params.get("industry_experience", {}) | |
# for industry in industry_exp: | |
# if industry.lower() in question: | |
# return str(industry_exp[industry]) # Ensure number is returned as string | |
# return str(industry_exp.get("default", 0)) | |
# return "Yes" # Default fallback | |
# def get_auto_answer(self, question): | |
# params = self.answers if hasattr(self, 'answers') else {} | |
# log.debug(f"Processing question: {question}") | |
# # Experience-related questions | |
# if "how many years" in question or "years of work experience" in question: | |
# industry_exp = params.get("industry_experience", {}) | |
# for industry in industry_exp: | |
# if industry.lower() in question: | |
# return str(industry_exp[industry]) # Ensure it's a string for input fields | |
# return str(industry_exp.get("default", 0)) # Fallback to default if no match | |
# if "experience" in question or "years of work" in question: | |
# industry_exp = params.get("industry_experience", {}) | |
# for industry in industry_exp: | |
# if industry.lower() in question: | |
# return industry_exp[industry] | |
# return industry_exp.get("default", 0) # Fallback to default if no match | |
# Visa & Work Authorization | |
# if "visa" in question: | |
# return "Yes" if params.get("auto_answer_questions", {}).get("requireVisa", False) else "No" | |
# if "authorized to work" in question: | |
# return "Yes" if params.get("auto_answer_questions", {}).get("legallyAuthorized", True) else "No" | |
# if "background check" in question: | |
# return "Yes" if params.get("auto_answer_questions", {}).get("backgroundCheck", True) else "No" | |
# if "start immediately" in question: | |
# return "Yes" if params.get("auto_answer_questions", {}).get("urgentFill", True) else "No" | |
# # Degree/Education Level | |
# if "degree" in question or "education" in question: | |
# degrees = params.get("auto_answer_questions", {}).get("degreeCompleted", []) | |
# return degrees[0] if degrees else "Bachelor's Degree" | |
# # Language Proficiency | |
# if "language" in question or "proficiency" in question: | |
# if "chinese" in question: | |
# return params.get("languages", {}).get("chinese", "None") | |
# if "english" in question: | |
# return params.get("languages", {}).get("english", "Native or bilingual") | |
# # Default language response | |
# return "None" | |
# # Default Fallback for Unhandled Cases | |
# log.warning(f"No matching answer found for question: '{question}'. Using fallback.") | |
# if "years of" in question or "experience" in question: | |
# return "0" # Default for numerical experience questions | |
# return "Yes" # Default for Yes/No questions | |
# def process_questions(self): | |
# time.sleep(1) | |
# form = self.get_elements("fields") #self.browser.find_elements(By.CLASS_NAME, "jobs-easy-apply-form-section__grouping") | |
# for field in form: | |
# question = field.text | |
# answer = self.ans_question(question.lower()) | |
# #radio button | |
# if self.is_present(self.locator["radio_select"]): | |
# try: | |
# input = field.find_element(By.CSS_SELECTOR, "input[type='radio'][value={}]".format(answer)) | |
# input.execute_script("arguments[0].click();", input) | |
# except Exception as e: | |
# log.error(e) | |
# continue | |
# multi select | |
# elif self.is_present(self.locator["multi_select"]): | |
# try: | |
# input = field.find_element(self.locator["multi_select"]) | |
# input.send_keys(answer) | |
# except Exception as e: | |
# log.error(e) | |
# continue | |
# text box | |
# elif self.is_present(self.locator["text_select"]): | |
# try: | |
# input = field.find_element(self.locator["text_select"]) | |
# input.send_keys(answer) | |
# except Exception as e: | |
# log.error(e) | |
# continue | |
# elif self.is_present(self.locator["text_select"]): | |
# pass | |
# if "Yes" or "No" in answer: #radio button | |
# try: #debug this | |
# input = form.find_element(By.CSS_SELECTOR, "input[type='radio'][value={}]".format(answer)) | |
# form.execute_script("arguments[0].click();", input) | |
# except: | |
# pass | |
# else: | |
# input = form.find_element(By.CLASS_NAME, "artdeco-text-input--input") | |
# input.send_keys(answer) | |
def ans_question(self, question): #refactor this to an ans.yaml file | |
answer = None | |
if "how many" in question: | |
answer = "1" | |
elif "experience" in question: | |
answer = "1" | |
elif "sponsor" in question: | |
answer = "No" | |
elif 'do you ' in question: | |
answer = "Yes" | |
elif "have you " in question: | |
answer = "Yes" | |
elif "US citizen" in question: | |
answer = "Yes" | |
elif "are you " in question: | |
answer = "Yes" | |
elif "salary" in question: | |
answer = self.salary | |
elif "can you" in question: | |
answer = "Yes" | |
elif "gender" in question: | |
answer = "Male" | |
elif "race" in question: | |
answer = "Wish not to answer" | |
elif "lgbtq" in question: | |
answer = "Wish not to answer" | |
elif "ethnicity" in question: | |
answer = "Wish not to answer" | |
elif "nationality" in question: | |
answer = "Wish not to answer" | |
elif "government" in question: | |
answer = "I do not wish to self-identify" | |
elif "are you legally" in question: | |
answer = "Yes" | |
else: | |
log.info("Not able to answer question automatically. Please provide answer") | |
#open file and document unanswerable questions, appending to it | |
answer = "user provided" | |
time.sleep(15) | |
# df = pd.DataFrame(self.answers, index=[0]) | |
# df.to_csv(self.qa_file, encoding="utf-8") | |
log.info("Answering question: " + question + " with answer: " + answer) | |
# Append question and answer to the CSV | |
if question not in self.answers: | |
self.answers[question] = answer | |
# Append a new question-answer pair to the CSV file | |
new_data = pd.DataFrame({"Question": [question], "Answer": [answer]}) | |
new_data.to_csv(self.qa_file, mode='a', header=False, index=False, encoding='utf-8') | |
log.info(f"Appended to QA file: '{question}' with answer: '{answer}'.") | |
return answer | |
def load_page(self, sleep=1): | |
scroll_page = 0 | |
while scroll_page < 4000: | |
self.browser.execute_script("window.scrollTo(0," + str(scroll_page) + " );") | |
scroll_page += 500 | |
time.sleep(sleep) | |
if sleep != 1: | |
self.browser.execute_script("window.scrollTo(0,0);") | |
time.sleep(sleep) | |
page = BeautifulSoup(self.browser.page_source, "lxml") | |
return page | |
# def avoid_lock(self) -> None: | |
# x, \_ = pyautogui.position() | |
# pyautogui.moveTo(x + 200, pyautogui.position().y, duration=1.0) | |
# pyautogui.moveTo(x, pyautogui.position().y, duration=0.5) | |
# pyautogui.keyDown('ctrl') | |
# pyautogui.press('esc') | |
# pyautogui.keyUp('ctrl') | |
# time.sleep(0.5) | |
# pyautogui.press('esc') | |
def next_jobs_page(self, position, location, jobs_per_page, experience_level=[]): | |
experience_level_str = ",".join(map(str, experience_level)) if experience_level else "" | |
experience_level_param = f"&f_E={experience_level_str}" if experience_level_str else "" | |
location_param = f"&location={urllib.parse.quote(location)}" if location else "" | |
search_url = ( | |
f"https://www.linkedin.com/jobs/search/?f_LF=f_AL&keywords={urllib.parse.quote(position)}" | |
f"{location_param}&start={jobs_per_page}{experience_level_param}" | |
) | |
self.browser.get(search_url) | |
log.info("Loading next job page?") | |
self.load_page() | |
return (self.browser, jobs_per_page) | |
# def next_jobs_page(self, position, location, jobs_per_page, experience_level=[]): | |
# # Construct the experience level part of the URL | |
# experience_level_str = ",".join(map(str, experience_level)) if experience_level else "" | |
# experience_level_param = f"&f_E={experience_level_str}" if experience_level_str else "" | |
# self.browser.get( | |
# # URL for jobs page | |
# "https://www.linkedin.com/jobs/search/?f_LF=f_AL&keywords=" + | |
# position + location + "&start=" + str(jobs_per_page) + experience_level_param) | |
# #self.avoid_lock() | |
log.info("Loading next job page?") | |
self.load_page() | |
return (self.browser, jobs_per_page) | |
# def finish_apply(self) -> None: | |
# self.browser.close() | |
if __name__ == '__main__': | |
with open("config.yaml", 'r') as stream: | |
try: | |
parameters = yaml.safe_load(stream) | |
except yaml.YAMLError as exc: | |
raise exc | |
log.info(f"Positions from YAML: {parameters.get('positions')}") | |
log.info(f"Locations from YAML: {parameters.get('locations')}") | |
assert len(parameters['positions']) > 0 | |
assert len(parameters['locations']) > 0 | |
assert parameters['username'] is not None | |
assert parameters['password'] is not None | |
assert parameters['phone_number'] is not None | |
if 'uploads' in parameters.keys() and type(parameters['uploads']) == list: | |
raise Exception("uploads read from the config file appear to be in list format" + | |
" while should be dict. Try removing '-' from line containing" + | |
" filename & path") | |
log.info({k: parameters[k] for k in parameters.keys() if k not in ['username', 'password']}) | |
output_filename: list = [f for f in parameters.get('output_filename', ['output.csv']) if f is not None] | |
output_filename: list = output_filename[0] if len(output_filename) > 0 else 'output.csv' | |
blacklist = parameters.get('blacklist', []) | |
blackListTitles = parameters.get('blackListTitles', []) | |
uploads = {} if parameters.get('uploads', {}) is None else parameters.get('uploads', {}) | |
for key in uploads.keys(): | |
assert uploads[key] is not None | |
locations: list = [l for l in parameters['locations'] if l is not None] | |
positions: list = [p for p in parameters['positions'] if p is not None] | |
bot = EasyApplyBot(parameters['username'], | |
parameters['password'], | |
parameters['phone_number'], | |
parameters['salary'], | |
parameters['rate'], | |
uploads=uploads, | |
filename=output_filename, | |
blacklist=blacklist, | |
blackListTitles=blackListTitles, | |
experience_level=parameters.get('experience_level', []) | |
) | |
# Attach config data to bot for auto-answering | |
bot.answers = parameters | |
bot.start_apply(positions, locations) | |
# bot = EasyApplyBot(parameters['username'], | |
# parameters['password'], | |
# parameters['phone_number'], | |
# parameters['salary'], | |
# parameters['rate'], | |
# uploads=uploads, | |
# filename=output_filename, | |
# blacklist=blacklist, | |
# blackListTitles=blackListTitles, | |
# experience_level=parameters.get('experience_level', []) | |
# ) | |
# bot.start_apply(positions, locations) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment