Skip to content

Instantly share code, notes, and snippets.

@peolic
Last active June 1, 2021 13:31
Show Gist options
  • Save peolic/e9048803a905f14b51e8d14479f820bd to your computer and use it in GitHub Desktop.
Save peolic/e9048803a905f14b51e8d14479f820bd to your computer and use it in GitHub Desktop.
# coding: utf-8
import hashlib
import re
import sys
import traceback
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import cast, Optional
import requests
try:
from packaging.version import LegacyVersion
except ImportError:
from setuptools._vendor.packaging.version import LegacyVersion
here = Path(__file__).parent.resolve()
STASH_EXE: Path = here / 'stash-win.exe'
STASH_EXE_NEW: Path = here / 'stash-win-new.exe'
def get_asset(tag_name: str):
resp = requests.get(f'https://api.github.com/repos/stashapp/stash/releases/tags/{tag_name}')
data = resp.json()
if 'assets' not in data:
print(f'tag {tag_name!r} not found')
return None, None, None
try:
asset = next(
asset
for asset in data['assets']
if asset['name'] == 'stash-win.exe'
# if asset['content_type'] == 'application/x-dosexec'
)
except StopIteration:
print('correct asset not found')
return None, None, None
else:
version = data['name']
checksums_url = next((a['browser_download_url'] for a in data['assets'] if a['name'] == 'CHECKSUMS_SHA1'), None)
return asset, checksums_url, version
def download_asset(asset: dict, filename: str):
url = asset['browser_download_url']
size = asset['size']
cur_size = 0
print('downloading asset... ', end='')
with requests.get(url, stream=True) as r:
r.raise_for_status()
with open(filename, 'wb') as f:
for chunk in r.iter_content(chunk_size=8192):
f.write(chunk)
cur_size += len(chunk)
done = int((cur_size / size) * 100)
sys.stdout.write(f'\rdownloading asset... {done}%')
sys.stdout.flush()
sys.stdout.write('\n')
sys.stdout.flush()
def human_timedelta(td: timedelta) -> str:
mm, ss = divmod(td.seconds, 60)
hh, mm = divmod(mm, 60)
s = '%dh %02dm %02ds' % (hh, mm, ss)
if td.days:
s = '%dd %s' % (td.days, s)
return s
def get_current_version(file_path: Path) -> str:
# version is around 75% the way into the file
size = file_path.stat().st_size
skip_to = int(size * 0.75)
with file_path.open('rb') as fh:
# Jump ahead
fh.seek(skip_to)
data = fh.read()
match = re.search(rb"""
\0(v
# base version
\d+\.\d+\.\d+
# dev & git hash
(?:-(\d+-g[a-f\d]+))?
)\0
""", data, flags=re.VERBOSE)
if not match:
raise ValueError('Current version not found in file!')
return match.group(1).decode('ascii')
def patch_disable_freeones(file_path: Path) -> bool:
try:
data = bytearray(file_path.read_bytes())
# Find scraper definition start and end
scraper_start_pattern = re.compile(rb'\nname: Freeones', re.I)
scraper_end_pattern = re.compile(rb'# Last updated .+\n', re.I)
try:
scraper_start = scraper_start_pattern.search(cast(bytes, data)).start()
scraper_end = scraper_end_pattern.search(cast(bytes, data), scraper_start).end()
except AttributeError:
raise ValueError('Could not find scraper')
start_pattern = re.compile(rb'performerByName:\n')
end_pattern = re.compile(rb'\nxPathScrapers:\n')
try:
start = start_pattern.search(cast(bytes, data), scraper_start, scraper_end).start()
end = end_pattern.search(cast(bytes, data), scraper_start, scraper_end).start()
except AttributeError:
# Is it already patched?
if b'# builtin_freeones_disabled' in data[scraper_start:scraper_end]:
print('Already patched: disabled builtin_freeones')
return True
raise ValueError('Could not find section')
# Replace with spaces (keep LF)
new_data = bytearray(
c if c == 0x0A else 0x20
for c in data[start:end]
)
# Insert a marker
marker = bytearray(b'# builtin_freeones_disabled\n')
new_data[:len(marker)] = marker
orig_size = len(data)
data[start:end] = new_data
new_size = len(data)
if new_size != orig_size:
raise ValueError(f'Sizes differ! {new_size=} != {orig_size=}')
file_path.write_bytes(bytes(data))
print('patched: disabled builtin_freeones')
except Exception:
traceback.print_exc()
print('=== /\\ === Patch error === /\\ ===')
return False
return True
def get_checksum(checksums_url: Optional[str], asset_name: str) -> Optional[str]:
if checksums_url:
try:
resp = requests.get(checksums_url)
data = resp.text.splitlines()
return next(
(cs.split(' ')[0] for cs in data if cs.endswith(asset_name)),
None
)
except StopIteration:
pass
except Exception as error:
print(error)
def sha1_hash(path: Path) -> str:
h = hashlib.sha1()
b = bytearray(128*1024)
mv = memoryview(b)
with path.open('rb', buffering=0) as f:
for n in iter(lambda: f.readinto(mv), 0):
h.update(mv[:n])
return h.hexdigest()
def main(args = sys.argv[1:]):
if args:
tag_name = args[0]
else:
tag_name = 'latest_develop'
print(f'getting asset `{tag_name}`')
asset, checksums_url, version_str = get_asset(tag_name)
if not asset:
print('Failed getting asset data')
return
if STASH_EXE.is_file():
date_fmt = '%Y-%m-%d %H:%M:%S'
now = datetime.now().astimezone().replace(microsecond=0)
parsed_date_current = datetime.fromtimestamp(STASH_EXE.stat().st_mtime, tz=now.tzinfo).replace(microsecond=0)
parsed_date_new = datetime.fromisoformat(asset["updated_at"].rstrip('Z')).replace(tzinfo=timezone.utc).astimezone()
has_new_version = parsed_date_new > parsed_date_current
try:
current_version = get_current_version(STASH_EXE)
asset_version = version_str.partition(':')[0] # "v0.3.0-80-gabf0281: Latest development build"
has_new_version = LegacyVersion(asset_version) > LegacyVersion(current_version) # type: ignore
except Exception:
current_version = None
asset_version = None
version_check = current_version and asset_version
if version_check:
print()
print(f'Current version: {current_version}')
print(f'Remote version: {asset_version}')
print()
print(f'Asset last updated {human_timedelta(now - parsed_date_new)} ago (at {parsed_date_new.strftime(date_fmt)})')
if has_new_version:
if version_check:
print(f'New version {asset_version} available (current: {current_version})')
else:
print(f'New version available (current: {parsed_date_current.strftime(date_fmt)})')
else:
if version_check:
print(f'Up-to-date (current: {current_version})')
else:
print(f'Up-to-date (current: {parsed_date_current.strftime(date_fmt)})')
return True
# print('Downloading asset')
try:
download_asset(asset, str(STASH_EXE_NEW))
except Exception:
print('Failed downloading asset')
raise
checksum = get_checksum(checksums_url, asset['name'])
if checksum:
actual_checksum = sha1_hash(STASH_EXE_NEW)
if actual_checksum != checksum:
print('Asset failed checksum validation, not replacing executable!')
return False
else:
print('Asset passed checksum validation.')
else:
print('Cannot verify checksum')
print('applying patches...')
patch_disable_freeones(STASH_EXE_NEW)
print('replacing executable...')
STASH_EXE_NEW.replace(STASH_EXE)
return True
if __name__ == '__main__':
if 'patch' in sys.argv[1:]:
patch_disable_freeones(STASH_EXE)
exit(0)
result = None
try:
result = main()
finally:
if result:
# print('Closing window in 5 seconds.')
# sleep(5)
exit(0)
input('Press ENTER to continue...')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment