Skip to content

Instantly share code, notes, and snippets.

@dieu
Last active March 13, 2024 19:39
Show Gist options
  • Save dieu/17ffbcace6024dcf5b75dbbee1961977 to your computer and use it in GitHub Desktop.
Save dieu/17ffbcace6024dcf5b75dbbee1961977 to your computer and use it in GitHub Desktop.
SleepHQ Uploader
import binascii
from lxml.html import parse as parse_url
from parse import parse
import requests
from tqdm import tqdm
from os import path
import os
import time
class ezshare():
def __init__(self, url="http://ezshare.card/dir?dir=A:", num_retries=5):
self.base = url
self.num_retries = num_retries
def is_dir(self, href):
r = parse("{}/download?file={name}", href)
return r == None
def is_file(self, href):
return not self.is_dir(href)
def _get(self, url):
#print(f"GET {url}")
return parse_url(url)
def ping(self):
try:
self._get(self.base)
return True
except:
return False
def listdir(self, dir, recursive=False, shift=0):
ret = {}
is_root = False
if dir=="/":
dir=""
is_root = True
dir=dir.replace("/","\\")
soup = self._get(f"{self.base}{dir}")
has_dotiles=False
for a in soup.xpath("//pre/a"):
href = a.get("href")
name = a.text.strip()
if name == "." or name == ".." or name == "ezshare.cfg":
has_dotiles=True
continue
if self.is_dir(href):
ret[name] = {}
if recursive:
dir_content = self.listdir(f"{dir}/{name}", recursive=recursive)
if dir_content is not None:
ret[name] = dir_content
else:
ret[name] = href
#No dotfiles or ezshare.cfg and is not root? This must not be a directory
if not has_dotiles and not is_root:
return None
return ret
def print_list(self, dirlist, shift=0):
shiftstr = " " * shift
for k,v in dirlist.items():
if type(v) is dict:
print(f"{shiftstr} {k}/")
self.print_list(v, shift=shift+1)
else:
print(f"{shiftstr} {k}")
def stream_size(self, stream):
pos = stream.tell()
stream.seek(0,2)
ln = stream.tell()
stream.seek(pos)
return ln - pos
def _dload(self, link, file_name, crc):
with open(file_name, "ab") as f:
f.seek(0)
response = requests.head(link)
curlength = self.stream_size(f)
total_length = int(response.headers.get('content-length'))
if not crc and curlength == total_length:
print(f"Skipping file {file_name} (same size)")
return True
response = requests.get(link, stream=True)
if total_length is None: # no content length header
f.truncate()
f.write(response.content)
return True
elif crc and curlength == total_length:
f_crc32_hash = binascii.crc32(f.read())
r_crc32_hash = binascii.crc32(response.content)
if f_crc32_hash == r_crc32_hash:
print(f"Skipping file {file_name} (same crc32 sum)")
return True
total_length = int(total_length)
with tqdm(desc=file_name, total=total_length, unit='B', unit_scale=True, unit_divisor=1024, miniters=1) as pbar:
f.truncate(0)
for data in response.iter_content(chunk_size=4096):
f.write(data)
pbar.update(len(data))
return True
def _dload_with_retry(self, link, file_name, crc):
last_exception = None
for retries in range(self.num_retries):
try:
return self._dload(link, file_name, crc)
except (requests.exceptions.ConnectionError, requests.exceptions.ChunkedEncodingError) as err:
print(f"Retrying link: {link} ({retries} times)")
last_exception = err
if last_exception is not None:
print(f"Failed to download {link}: {last_exception}")
return False
def download(self, remote_file, local_file=None, recursive=False, crc=False):
if local_file == None:
local_file = path.basename(remote_file)
if local_file[-1]=="/":
os.makedirs(local_file, exist_ok=True)
if path.isdir(local_file):
local_file = path.join(local_file, path.basename(remote_file))
remote_dir = path.dirname(remote_file)
basename = path.basename(remote_file)
link = self.listdir(remote_dir, recursive)[basename]
self._dload_with_retry(link, local_file, crc)
def _sync_list(self, todo, local_dir, crc):
os.makedirs(local_dir, exist_ok=True)
for k,v in todo.items():
if type(v) is dict:
self._sync_list(v, path.join(local_dir, k), crc)
elif v:
self._dload_with_retry(v, path.join(local_dir, k), crc)
else:
print(f"Skipping sync of {k} because link is {v}")
def sync(self, remote_dir, local_dir=".", recursive=False, crc=False):
if local_dir == None:
local_dir = "."
todo = self.listdir(remote_dir, recursive=recursive)
if todo is None:
self.download(remote_dir, local_dir, crc=crc)
else:
self._sync_list(todo, local_dir, crc)
import binascii
import os
import time
import zipfile
import logging
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
chromedriver_path = "/usr/lib/chromium/chromedriver"
# Setup logging
logging.basicConfig(level=logging.INFO)
def remove_path(path):
try:
print(f"rm {path}")
os.remove(path)
except Exception:
pass
def cleanup_data(path):
remove_path(f"{path}/Identification.crc")
remove_path(f"{path}/Identification.tgt")
remove_path(f"{path}/Journal.dat")
remove_path(f"{path}/STR.edf")
remove_path(f"{path}/SETTINGS")
def download_data(to_path):
from_path = "/"
share = ezshare(num_retries=3)
print(f"Synchronizing remote {from_path} -> {to_path}")
share.sync(from_path, to_path, recursive=True)
def file_modified_since_zip(file_path, relative_path, zipf):
try:
z_crc32_hash = zipf.getinfo(relative_path).CRC
f_crc32_hash = 0
with open(file_path, "rb") as f:
while True:
chunk = f.read(4096)
if not chunk:
break
f_crc32_hash = binascii.crc32(chunk, f_crc32_hash) & 0xffffffff
if f_crc32_hash == z_crc32_hash:
print(f"{file_path} already in archive")
return False
else:
print(f"{file_path} ({f_crc32_hash}) != {relative_path} ({z_crc32_hash})")
return True
except KeyError:
print(f"{file_path} as {relative_path} is not exist in archive")
return True
def create_zip(folder_path, zip_path):
has_something_new = False
if os.path.isfile(zip_path):
with zipfile.ZipFile(zip_path, "a", zipfile.ZIP_DEFLATED) as zipf:
for root, dirs, files in os.walk(folder_path):
for file in files:
file_path = os.path.join(root, file)
relative_path = os.path.relpath(file_path, os.path.dirname(folder_path))
if file_modified_since_zip(file_path, relative_path, zipf):
has_something_new = True
break
if has_something_new:
break
else:
has_something_new = True
if has_something_new:
with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zipf:
for root, dirs, files in os.walk(folder_path):
for file in files:
file_path = os.path.join(root, file)
relative_path = os.path.relpath(file_path, os.path.dirname(folder_path))
print(f"adding {file_path} as {relative_path}")
zipf.write(file_path, relative_path)
return has_something_new
def upload_file(file_path, username, password):
options = Options()
options.add_argument("--disable-extensions")
options.add_argument("--disable-gpu")
options.add_argument("--disable-dev-shm-usage")
options.add_argument("--no-sandbox")
options.add_argument("--headless")
with webdriver.Chrome(service=Service(chromedriver_path), options=options) as driver:
driver.get('https://sleephq.com/users/sign_in')
WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.ID, "user_email")))
# Login
if driver.current_url == 'https://sleephq.com/users/sign_in':
logging.info("logging in...")
driver.find_element(By.ID, 'user_email').send_keys(username)
driver.find_element(By.ID, 'user_password').send_keys(password)
driver.find_element(By.TAG_NAME, 'button').click()
WebDriverWait(driver, 10).until(EC.url_changes(driver.current_url))
# Upload
dashboard_url = driver.current_url
team_id = dashboard_url.split('/')[-1].split("?")[0]
import_url = f'https://sleephq.com/account/teams/{team_id}/imports'
file_element = '//input[@type="file" and @class="dz-hidden-input"]'
logging.info(f"import page {import_url}")
driver.get(import_url)
WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.XPATH, file_element)))
driver.find_element(By.XPATH, file_element).send_keys(file_path)
time.sleep(60)
driver.find_element(By.ID, 'start-upload-button').click()
logging.info("uploading...")
time.sleep(60)
logging.info("upload completed.")
ezshare_path = "/share/ezshare/"
ezshare_archive_path = f"/share/ezshare_archive.zip"
cleanup_data(ezshare_path)
download_data(ezshare_path)
if create_zip(ezshare_path, ezshare_archive_path):
sleep_hq_username = os.environ.get("SLEEP_HQ_USERNAME")
sleep_hq_password = os.environ.get("SLEEP_HQ_PASSWORD")
upload_file(ezshare_archive_path, sleep_hq_username, sleep_hq_password)
else:
logging.info("nothing new in archive, nothing to upload")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment