Last active
March 29, 2020 00:20
-
-
Save Xorboo/5fe9e05977afb288a4b12e4d3d617e2e to your computer and use it in GitHub Desktop.
Script to check residence permit decision state on https://klient.gdansk.uw.gov.pl and notify via telegram bot. Barely works, but does its thing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
import os | |
import time | |
import requests | |
from selenium import webdriver | |
from selenium.webdriver.support.ui import WebDriverWait | |
from selenium.webdriver.support import expected_conditions as EC | |
from selenium.webdriver.common.by import By | |
def expand_shadow_element(browser, element): | |
shadow_root = browser.execute_script('return arguments[0].shadowRoot', element) | |
return shadow_root | |
def get_data(login, password): | |
try: | |
chrome_options = webdriver.ChromeOptions() | |
chrome_options.add_argument("--headless") | |
chrome_options.add_argument("--no-sandbox") | |
browser = webdriver.Chrome(options=chrome_options) | |
browser.set_page_load_timeout(10) | |
browser.get('https://klient.gdansk.uw.gov.pl/') | |
WebDriverWait(browser, 10).until(EC.presence_of_element_located((By.TAG_NAME, 'vaadin-text-field'))) | |
text_shadow_section = expand_shadow_element(browser, browser.find_element_by_tag_name('vaadin-text-field')) | |
login_field = text_shadow_section.find_element_by_css_selector('input') | |
login_field.send_keys(login) | |
password_shadow_section = expand_shadow_element(browser, browser.find_element_by_tag_name('vaadin-password-field')) | |
password_field = password_shadow_section.find_element_by_css_selector('input') | |
password_field.send_keys(password) | |
button_shadow_section = expand_shadow_element(browser, browser.find_elements_by_tag_name('vaadin-button')[-1]) | |
login_button = button_shadow_section.find_element_by_id('button') | |
login_button.click() | |
WebDriverWait(browser, 10).until(EC.url_changes(browser.current_url)) | |
combined_data = '' | |
labels = browser.find_elements_by_tag_name('label')[1:] | |
data_fields = browser.find_elements_by_tag_name('vaadin-text-field') | |
for i in range(len(data_fields)): | |
data_field = data_fields[i] | |
field_shadow = expand_shadow_element(browser, data_field) | |
input_field = field_shadow.find_element_by_css_selector('input') | |
field_value = input_field.get_attribute('value') | |
data_name = labels[i].get_attribute('innerHTML') if i < len(labels) else '\t[?]: ' | |
combined_data += f'*{data_name}:* {field_value}\t' | |
notes_field = browser.find_elements_by_tag_name('vaadin-text-area')[0] | |
notes_shadow = expand_shadow_element(browser, notes_field) | |
text_area_field = notes_shadow.find_element_by_css_selector('textarea') | |
combined_data += '*Notes:* ' + text_area_field.get_attribute('value') | |
except Exception as ex: | |
send_error(ex) | |
combined_data = '' | |
finally: | |
if browser: | |
browser.quit() | |
return combined_data | |
def get_file_path(): | |
dir_path = os.path.dirname(os.path.realpath(__file__)) | |
return os.path.join(dir_path, 'data.txt') | |
def get_current_data(): | |
print('Reading current data') | |
data = {} | |
file_name = get_file_path() | |
if not os.path.exists(file_name): | |
return data | |
with open(file_name, 'r+', encoding='utf-8') as file: | |
file_data = file.read() | |
lines = file_data.split('\n') | |
for line in lines: | |
if len(line) > 0: | |
user_name, user_data = line.split('\t', 1) | |
if user_name and user_data: | |
data[user_name] = user_data | |
return data | |
def save_data(new_data): | |
file_name = get_file_path() | |
with open(file_name, 'w+', encoding='utf-8') as file: | |
for user_name, user_data in new_data.items(): | |
line = f'{user_name}\t{user_data}\n' | |
file.write(line) | |
def perform_check(all_data, user_name, login, password): | |
print(f'Checking user {user_name}...') | |
old_data = all_data[user_name] if user_name in all_data else '' | |
new_data = get_data(login, password) | |
if new_data == '': | |
print('Couldn\'t get new data') | |
return False | |
if old_data == new_data: | |
print(f'No new data') | |
return False | |
print(f'Old data: {user_name} -> {old_data}') | |
print(f'New data: {user_name} -> {new_data}') | |
formatted_old_data = old_data.replace('\t', '\n') | |
formatted_new_data = new_data.replace('\t', '\n') | |
text = f'*Status changed for {user_name}:*\n\n*Old Status:*\n{formatted_old_data}\n\n*New Status:*\n{formatted_new_data}' | |
send_update(text) | |
all_data[user_name] = new_data | |
return True | |
def send_update(text): | |
admin_ids = ['%import_user_chat_ids%'] | |
send_message(text, update_ids) | |
def send_error(exception): | |
admin_ids = ['%import_admin_chat_ids%'] | |
exception_message = f'Exception [{type(exception).__name__}]: {exception}, Args: {exception.args!r}' | |
print(exception_message) | |
send_message(exception_message, admin_ids) | |
time.sleep(30) | |
def send_message(message, adresses): | |
token = '%import_bot_token%' | |
for chat in adresses: | |
send_text = f'https://api.telegram.org/bot{token}/sendMessage?chat_id={chat}&parse_mode=Markdown&text={message}' | |
requests.get(send_text) | |
time.sleep(5) | |
if __name__ == '__main__': | |
current_data = get_current_data() | |
data_changed = False | |
data_changed |= perform_check(current_data, '%name%', '%login%', '%password%') | |
time.sleep(10) | |
# ... | |
if data_changed: | |
print('Saving new data...') | |
save_data(current_data) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment