Skip to content

Instantly share code, notes, and snippets.

@daniel-j
Last active March 17, 2020 17:03
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save daniel-j/e2f49507a77cc96b9719c4111860ced4 to your computer and use it in GitHub Desktop.
Save daniel-j/e2f49507a77cc96b9719c4111860ced4 to your computer and use it in GitHub Desktop.
Linux updater for My Little Karaoke song and theme packages
#!/usr/bin/env bash
LD_LIBRARY_PATH="" ./mlk-updater.py --check
./launch.sh $@
#!/usr/bin/env python3
import os
import tarfile
import json
import shutil
import argparse
import gi
gi.require_version('Gtk', '3.0')
gi.require_version('Soup', '2.4')
from gi.repository import Gtk, Soup, GLib, GObject
parser = argparse.ArgumentParser(description='My Little Karaoke Updater')
parser.add_argument('--data-dir', help='Path to UltraStar Deluxe data directory')
parser.add_argument('--update', action='store_true', help='Don\'t ask about updating')
parser.add_argument('--check', action='store_true', help='Ask to check for updates')
args = parser.parse_args()
DATA_DIR = args.data_dir if args.data_dir else './data'
TEMP_DIR = os.path.join(DATA_DIR, 'temp')
def sizeof_fmt(num, suffix='B'):
# https://stackoverflow.com/a/1094933
for unit in ['', 'Ki', 'Mi', 'Gi', 'Ti', 'Pi', 'Ei', 'Zi']:
if abs(num) < 1024.0:
return "%3.1f %s%s" % (num, unit, suffix)
num /= 1024.0
return "%.1f %s%s" % (num, 'Yi', suffix)
class ProgressBarWindow(Gtk.Window):
def __init__(self):
Gtk.Window.__init__(
self,
title="My Little Karaoke Updater",
default_width=500
)
self.set_border_width(50)
self.set_position(Gtk.WindowPosition.CENTER)
vbox = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=0)
self.add(vbox)
self.progressbar_package = Gtk.ProgressBar()
self.progressbar_package.set_show_text(True)
self.progressbar_total = Gtk.ProgressBar()
# self.progressbar_total.set_text('')
self.progressbar_total.set_show_text(True)
vbox.pack_start(self.progressbar_package, False, False, 5)
vbox.pack_start(self.progressbar_total, False, False, 5)
self.show_all()
self.progressbar_total.hide()
# button = Gtk.CheckButton("Button")
# vbox.pack_start(button, True, True, 0)
# button = Gtk.CheckButton("Activity mode")
# button.connect("toggled", self.on_activity_mode_toggled)
# vbox.pack_start(button, True, True, 0)
# button = Gtk.CheckButton("Right to Left")
# button.connect("toggled", self.on_right_to_left_toggled)
# vbox.pack_start(button, True, True, 0)
self.queue = []
self.downloaded_part = 0
self.part_size = 0
self.progress_total = 0
self.downloaded_total = 0
self.soup = Soup.SessionAsync()
logger = Soup.Logger.new(Soup.LoggerLogLevel.HEADERS, -1)
self.soup.add_feature(logger)
try:
with open(os.path.join(DATA_DIR, 'mlkcache.json')) as cache_file:
self.cache = json.load(cache_file)
except:
self.cache = {}
self.progressbar_package.set_fraction(0)
self.fetch(
'https://www.mylittlekaraoke.com/store/webinst/linux.webinst',
self.on_manifest
)
# def got_chunk(msg, chunk):
# print('data', msg.props.response_body_data.get_size())
# print(msg.props.response_headers.get_content_length())
# msg.connect('got-chunk', got_chunk)
# msg.connect('finished', callback)
def fetch(self, uri, callback=None, method='GET',
accumulate=True, headers={}, set_range=None):
preheaders = {
'User-Agent': 'mlkupdater/Linux'
}
preheaders.update(headers)
message = Soup.Message.new(method, uri)
for key, value in preheaders.items():
message.props.request_headers.append(key, value)
if set_range:
message.props.request_headers.set_range(set_range[0], set_range[1])
if not accumulate:
message.response_body.set_accumulate(False)
def on_finished(session, message):
callback(message.props.response_body_data.get_data())
print(method, uri)
self.soup.queue_message(message, on_finished if callback else None)
return message
def save_cache(self):
os.makedirs(os.path.join(DATA_DIR), exist_ok=True)
with open(os.path.join(DATA_DIR, 'mlkcache.json'), 'w') as cache_file:
json.dump(self.cache, cache_file)
def on_manifest(self, data):
data = data.decode('utf-8').strip().split('\n')
i = iter(data)
self.manifest = list(zip(i, i))
# print(self.manifest)
self.package_iter = iter(self.manifest)
self.package_counter = 0
self.progressbar_package.set_text('Fetching package info - 0/' + str(len(self.manifest)))
self.check_next_package()
def check_next_package(self):
try:
(url, size) = next(self.package_iter)
except StopIteration:
print('Check complete', flush=True)
self.save_cache()
if len(self.queue) != 0 and not args.update:
print(self.queue, flush=True)
dialog = Gtk.MessageDialog(parent=self, flags=0, message_type=Gtk.MessageType.QUESTION, buttons=Gtk.ButtonsType.YES_NO, text="Updates available!")
dialog.format_secondary_text("My Little Karaoke packages are available to download. Continue?")
response = dialog.run()
dialog.destroy()
if response != Gtk.ResponseType.YES:
self.close()
return
self.queue_iter = iter(self.queue)
GLib.timeout_add(200, self.on_timeout)
self.download_next_package()
return False
size = int(size)
filename = os.path.basename(url)
self.progressbar_package.set_text('Fetching package info - ' + str(self.package_counter + 1) + '/' + str(len(self.manifest)))
def on_get_headers(data):
headers = msgheaders.props.response_headers
content_length = headers.get_content_length()
if content_length != size:
print('Size mismatch!', url, content_length, size, flush=True)
GLib.timeout_add_seconds(2, self.check_next_package)
return
etag = headers.get('etag')
haslatest = False
cacheInfo = self.cache.get(filename, {})
if (etag is None or cacheInfo.get('etag') == etag) and cacheInfo.get('size') == content_length and cacheInfo.get('complete'):
haslatest = True
if not haslatest:
self.cache[filename] = {
'complete': False,
'etag': etag,
'size': content_length
}
filepath = os.path.join(TEMP_DIR, filename)
downloaded = 0
if os.path.isfile(filepath):
downloaded = os.path.getsize(filepath)
self.queue.append([url, filename, filepath, content_length, downloaded])
self.downloaded_total += downloaded
else:
self.downloaded_total += content_length
self.progress_total += content_length
# print(filename, etag, sizeof_fmt(content_length), haslatest, flush=True)
self.package_counter += 1
self.progressbar_package.set_fraction(self.package_counter / (len(self.manifest) - 1))
GLib.timeout_add_seconds(0.1, self.check_next_package)
msgheaders = self.fetch(url, on_get_headers, method='HEAD')
return False
def download_next_package(self):
print('Download next')
try:
(url, filename, filepath, size, downloaded) = next(self.queue_iter)
except StopIteration:
print('Everything is up to date!', flush=True)
self.close()
try:
os.rmdir(TEMP_DIR)
except OSError:
pass
return False
self.part_filename = filename
self.downloaded_part = downloaded
self.part_size = size
free_space = shutil.disk_usage(DATA_DIR).free
required_space = self.progress_total - self.downloaded_total + 1024 * 200
if free_space <= required_space:
print("Not enough free space!", flush=True)
dialog = Gtk.MessageDialog(self, 0, Gtk.MessageType.ERROR, Gtk.ButtonsType.YES_NO, "Not enough free space!")
dialog.format_secondary_text("You have " + sizeof_fmt(free_space) + " of free space left on your device. The remaining updates require " + sizeof_fmt(required_space) + " free space to complete.\nDo you want to continue downloading anyway?")
response = dialog.run()
dialog.destroy()
if response != Gtk.ResponseType.YES:
self.close()
return False
os.makedirs(os.path.join(TEMP_DIR), exist_ok=True)
if downloaded > 0:
f = open(filepath, 'a+b')
print('Resuming download of ' + url, flush=True)
else:
f = open(filepath, 'w+b')
print('Downloading ' + url, flush=True)
self.progressbar_package.set_fraction(0)
self.progressbar_total.show()
# def on_headers(message):
# headers = message.props.response_headers
# i = Soup.MessageHeadersIter.init(headers)
# while True:
# h = i.next()
# if not h or h.name is None:
# break
# print(h)
def on_chunk(message, chunk):
nonlocal downloaded
f.write(chunk.get_data())
downloaded += chunk.length
self.downloaded_part = downloaded
self.downloaded_total += chunk.length
def on_complete_refresh(message):
self.on_timeout()
GLib.timeout_add(200, on_complete, message)
def on_complete(message):
f.close()
filetype = os.path.splitext(filename)[1]
dest = None
istar = False
if filetype == '.mlk':
dest = os.path.join(DATA_DIR, 'songs')
istar = True
elif filetype == '.mlt':
dest = os.path.join(DATA_DIR, 'themes')
istar = True
elif filetype == '.mlu':
dest = DATA_DIR # already includes avatars folder inside
istar = True
if istar and dest and tarfile.is_tarfile(filepath):
print('Extracting ' + filepath + ' to ' + dest, flush=True)
with tarfile.open(filepath, 'r:') as tar:
for file in tar:
name = os.path.join(dest, file.name)
try:
tar.extract(file, path=dest)
except:
os.remove(name)
tar.extract(file, path=dest)
finally:
os.chmod(name, file.mode)
else:
print('Unknown filetype ' + filename, flush=True)
self.download_next_package()
return False
os.remove(filepath)
self.cache[filename]['complete'] = True
self.save_cache()
self.download_next_package()
msg = self.fetch(url, accumulate=False, set_range=[downloaded, size])
# msg.connect('got-headers', on_headers)
msg.connect('got-chunk', on_chunk)
msg.connect('finished', on_complete_refresh)
return False
def on_timeout(self):
"""
Update value on the progress bar
"""
self.progressbar_package.set_fraction(self.downloaded_part / self.part_size)
self.progressbar_package.set_text('Downloading ' + self.part_filename + ' - ' + '{0:.2f}'.format((self.downloaded_part / self.part_size) * 100) + ' % ' + sizeof_fmt(self.part_size))
self.progressbar_total.set_fraction(self.downloaded_total / self.progress_total)
self.progressbar_total.set_text('Total progress - ' + '{0:.2f}'.format((self.downloaded_total / self.progress_total) * 100) + ' % ' + sizeof_fmt(self.progress_total))
# As this is a timeout function, return True so that it
# continues to get called
return True
def start_update():
win = ProgressBarWindow()
win.connect("delete-event", Gtk.main_quit)
Gtk.main()
if args.check:
dialog = Gtk.MessageDialog(None, 0, Gtk.MessageType.QUESTION, Gtk.ButtonsType.YES_NO, "Check for updates")
dialog.format_secondary_text("Do you want to check for My Little Karaoke updates?")
response = dialog.run()
dialog.destroy()
if response == Gtk.ResponseType.YES:
start_update()
else:
start_update()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment