Created
June 20, 2020 20:46
-
-
Save Neo-Zhixing/df551b5b1d6539cd3127b83e7fd3a373 to your computer and use it in GitHub Desktop.
Cytoid ParseLevel
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import sys | |
import tempfile | |
import json | |
import zipfile | |
import hashlib | |
from cerberus import Validator | |
from flask import jsonify, make_response, abort as flask_abort | |
from google.cloud import storage | |
import subprocess | |
from checksum import cytus_1_chart_checksum, cytus_2_chart_checksum | |
def abort(status, message): | |
flask_abort(make_response(jsonify(message=message), status)) | |
bucket = storage.Client().bucket('assets.cytoid.io') | |
def resolve_level(request): | |
if request.method != 'POST': | |
abort(405) | |
if request.headers.get('Content-Type') != 'application/json': | |
abort(415, 'Only json body is accepted') | |
payload = request.get_json(silent=True) | |
if not payload: | |
abort(400, 'can not parse json') | |
filepath = payload.get('packagePath') | |
bundlepath = payload.get('bundlePath') | |
if not filepath or not bundlepath: | |
abort(403, 'packagePath and bundlePath required') | |
return jsonify(parseLevelFile(filepath, bundlepath)) | |
def schema_version_newest(field, value, error): | |
if value != 2: | |
error(field, "You have to updated your level.json to the latest schema.") | |
schema_validator = Validator({ | |
'id': { | |
'type': 'string', | |
'regex': r'^[a-z0-9_]+([-_.][a-z0-9_]+)+$', | |
'maxlength': 64, | |
'minlength': 3, | |
'required': True, | |
}, | |
'schema_version': { | |
'type': 'integer', | |
'required': True, | |
'check_with': schema_version_newest, | |
}, | |
'version': { | |
'type': 'integer', | |
'required': True, | |
'min': 1, | |
'max': 999, | |
}, | |
"title": { | |
'type': 'string', | |
'required': True, | |
'empty': False, | |
}, | |
"title_localized": {'type': 'string', 'required': False, 'nullable': True, 'empty': False}, | |
"artist": { | |
'type': 'string', | |
'required': True, | |
'empty': False, | |
}, | |
"artist_localized": {'type': 'string', 'required': False, 'nullable': True, 'empty': False}, | |
"artist_source": {'type': 'string', 'required': False, 'nullable': True, 'empty': False}, | |
"illustrator": {'type': 'string', 'required': True, 'empty': False}, | |
"illustrator_source": {'type': 'string', 'required': False, 'nullable': True, 'empty': False}, | |
"storyboarder": {'type': 'string', 'required': False, 'nullable': True, 'empty': False}, | |
"charter": {'type': 'string', 'required': True, 'empty': False}, | |
"music": { | |
'type': 'dict', | |
'schema': { | |
'path': { | |
'type': 'string', | |
'required': True, | |
} | |
} | |
}, | |
"music_preview": { | |
'type': 'dict', | |
'schema': { | |
'path': { | |
'type': 'string', | |
'required': True, | |
} | |
} | |
}, | |
"background": { | |
'type': 'dict', | |
'schema': { | |
'path': { | |
'type': 'string', | |
'required': True, | |
} | |
} | |
}, | |
"charts": { | |
'type': 'list', | |
'minlength': 1, | |
'required': True, | |
'schema': { | |
'type': 'dict', | |
'schema': { | |
'type': { | |
'type': 'string', | |
'allowed': ['easy', 'hard', 'extreme'], | |
'required': True, | |
}, | |
'name': { | |
'type': 'string', | |
'required': False, | |
'empty': False, | |
}, | |
'difficulty': { | |
'type': 'integer', | |
'required': True, | |
'min': 0, | |
}, | |
'path': { | |
'type': 'string', | |
'required': True, | |
} | |
} | |
} | |
} | |
}) | |
schema_validator.allow_unknown = True | |
def countNotes(charts, zipFile): | |
if len(charts) < 1: | |
abort('400', 'at least one chart required!') | |
for chart in charts: | |
chartPath = chart.get('path', None) | |
with zipFile.open(chartPath) as chartFile: # open up the chart file to read | |
sha = hashlib.sha256() | |
# Calculate Hash | |
while True: | |
file_buffer = chartFile.read(65536) | |
if not len(file_buffer): | |
break | |
sha.update(file_buffer) | |
chart['hash'] = sha.hexdigest() | |
# Calculate Hash for cytus 2 | |
try: | |
chartFile.seek(0) | |
chartdata = json.load(chartFile) | |
if 'note_list' not in chartdata: | |
abort(400, 'one or more charts is invalid') | |
chart['notesCount'] = len(chartdata['note_list']) | |
chart['checksum'] = cytus_2_chart_checksum(chartdata) | |
# Calculate Hash for cytus 1 | |
except json.JSONDecodeError: | |
chartFile.seek(0) | |
count = 0 | |
lines = [a.decode('utf-8') for a in chartFile.readlines()] | |
for line in lines: | |
if line.startswith('NOTE'): | |
count += 1 | |
chart['notesCount'] = count | |
chart['checksum'] = cytus_1_chart_checksum(lines) | |
def getDuration(musicPath): | |
try: | |
result = subprocess.check_output([ | |
'ffprobe', | |
'-v', | |
'error', | |
'-show_entries', | |
'format=duration', | |
'-of', | |
'default=noprint_wrappers=1:nokey=1', | |
musicPath | |
], stderr=subprocess.PIPE) | |
result = float(result) | |
return result | |
except subprocess.CalledProcessError as e: | |
print('Error when obtaining music duration with exit code {}'.format(e.returncode), file=sys.stderr) | |
print(e.output.decode(sys.getfilesystemencoding()), file=sys.stderr) | |
print(e.stderr.decode(sys.getfilesystemencoding()), file=sys.stderr) | |
abort(500, 'Unknown error when obtaining music duration') | |
def getMusicPreview(musicPath, targetPath, duration): | |
result = subprocess.run([ | |
'ffmpeg', | |
'-ss', # Seek to given time position in seconds | |
str(0 if duration < 60 else 15), | |
'-t', # Restrict the transcoded/captured video sequence to the duration specified in seconds. | |
'30', | |
'-i', # input file | |
musicPath, | |
'-acodec', # output file | |
'copy', | |
targetPath | |
]) | |
if result.returncode != 0: | |
abort(500, 'Unknown error during music preview generation') | |
def formatErrors(errors): | |
results = [] | |
for key in errors: | |
error_items = errors[key] | |
error_strs = [] | |
for item in error_items: | |
if isinstance(item, dict): | |
error_strs.append('(' + formatErrors(item) + ')') | |
else: | |
error_strs.append(item) | |
results.append(str(key) + ': ' + ', '.join(error_strs)) | |
return '; '.join(results) | |
def parseLevelFile(filepath, bundlepath): | |
fileblob = bucket.get_blob(filepath) | |
if not fileblob: | |
abort(404, 'the package does not exist') | |
return | |
# Temp File to download level file into | |
filedata = {} | |
try: | |
with tempfile.TemporaryDirectory() as temp_dir, \ | |
tempfile.TemporaryFile() as temp_file: | |
fileblob.download_to_file(temp_file) | |
filedata['size'] = temp_file.tell() | |
with zipfile.ZipFile(temp_file, 'r') as zipFile, \ | |
zipFile.open('level.json') as meta_file: | |
metadata = json.load(meta_file) | |
if not schema_validator.validate(metadata): | |
results = [] | |
for key in schema_validator.errors: | |
results.append(key + ': ' + ', '.join(schema_validator.errors[key])) | |
abort(400, '; '.join(results)) | |
filedata['metadata'] = metadata | |
fileblob.content_disposition = 'attachment; filename="' + metadata['id'] + '.cytoidlevel"' | |
fileblob.cache_control = 'public, max-age=31536000' | |
fileblob.patch() | |
countNotes(metadata['charts'], zipFile) | |
zipFile.extract(metadata['background']['path'], path=temp_dir) | |
backgroundPath = os.path.join(temp_dir, metadata['background']['path']) | |
backgroundBlob = bucket.blob( | |
os.path.join(bundlepath, metadata['background']['path']) | |
) | |
backgroundBlob.upload_from_filename(backgroundPath) | |
backgroundBlob.cache_control = 'public, max-age=31536000' | |
backgroundBlob.patch() | |
os.remove(backgroundPath) | |
zipFile.extract(metadata['music']['path'], path=temp_dir) | |
musicPath = os.path.join(temp_dir, metadata['music']['path']) | |
duration = getDuration(musicPath) | |
filedata['duration'] = duration | |
musicBlob = bucket.blob( | |
os.path.join(bundlepath, metadata['music']['path']) | |
) | |
musicBlob.upload_from_filename(musicPath) | |
musicBlob.cache_control = 'public, max-age=31536000' | |
musicBlob.patch() | |
if 'music_preview' in metadata and 'path' in metadata['music_preview'] and \ | |
metadata['music_preview']['path'] != metadata['music']['path']: | |
zipFile.extract(metadata['music_preview']['path'], path=temp_dir) | |
previewPath = os.path.join(temp_dir, metadata['music_preview']['path']) | |
previewBlob = bucket.blob( | |
os.path.join(bundlepath, metadata['music_preview']['path']) | |
) | |
previewBlob.upload_from_filename(previewPath) | |
previewBlob.cache_control = 'public, max-age=31536000' | |
previewBlob.patch() | |
os.remove(previewPath) | |
elif duration < 30: | |
# use original | |
metadata['music_preview'] = { | |
"path": metadata['music']['path'] | |
} | |
else: | |
# generate new | |
extensionName = musicPath.split('.')[-1] | |
metadata['music_preview'] = { | |
"path": 'gen_preview.' + extensionName | |
} | |
previewPath = os.path.join(temp_dir, metadata['music_preview']['path']) | |
getMusicPreview(musicPath, previewPath, duration) | |
previewBlob = bucket.blob( | |
os.path.join(bundlepath, metadata['music_preview']['path']) | |
) | |
previewBlob.upload_from_filename(previewPath) | |
previewBlob.cache_control = 'public, max-age=31536000' | |
previewBlob.patch() | |
os.remove(previewPath) | |
os.remove(musicPath) | |
except FileNotFoundError as error: | |
print(error) | |
abort(400, 'file specified in metadata not found') | |
except zipfile.BadZipFile as error: | |
print(error) | |
abort(400, 'Zip package invalid') | |
except zipfile.LargeZipFile as error: | |
print(error) | |
abort(413, 'zip package too large') | |
except KeyError as error: | |
print(error) | |
abort(400, 'cant find the specified file in the zip package') | |
except json.decoder.JSONDecodeError as error: | |
print(error) | |
abort(400, 'Can not parse level.json. Invalid file format.') | |
return filedata |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment