Skip to content

Instantly share code, notes, and snippets.

@Neo-Zhixing
Created June 20, 2020 20:46
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Neo-Zhixing/df551b5b1d6539cd3127b83e7fd3a373 to your computer and use it in GitHub Desktop.
Save Neo-Zhixing/df551b5b1d6539cd3127b83e7fd3a373 to your computer and use it in GitHub Desktop.
Cytoid ParseLevel
import os
import sys
import tempfile
import json
import zipfile
import hashlib
from cerberus import Validator
from flask import jsonify, make_response, abort as flask_abort
from google.cloud import storage
import subprocess
from checksum import cytus_1_chart_checksum, cytus_2_chart_checksum
def abort(status, message):
flask_abort(make_response(jsonify(message=message), status))
bucket = storage.Client().bucket('assets.cytoid.io')
def resolve_level(request):
if request.method != 'POST':
abort(405)
if request.headers.get('Content-Type') != 'application/json':
abort(415, 'Only json body is accepted')
payload = request.get_json(silent=True)
if not payload:
abort(400, 'can not parse json')
filepath = payload.get('packagePath')
bundlepath = payload.get('bundlePath')
if not filepath or not bundlepath:
abort(403, 'packagePath and bundlePath required')
return jsonify(parseLevelFile(filepath, bundlepath))
def schema_version_newest(field, value, error):
if value != 2:
error(field, "You have to updated your level.json to the latest schema.")
schema_validator = Validator({
'id': {
'type': 'string',
'regex': r'^[a-z0-9_]+([-_.][a-z0-9_]+)+$',
'maxlength': 64,
'minlength': 3,
'required': True,
},
'schema_version': {
'type': 'integer',
'required': True,
'check_with': schema_version_newest,
},
'version': {
'type': 'integer',
'required': True,
'min': 1,
'max': 999,
},
"title": {
'type': 'string',
'required': True,
'empty': False,
},
"title_localized": {'type': 'string', 'required': False, 'nullable': True, 'empty': False},
"artist": {
'type': 'string',
'required': True,
'empty': False,
},
"artist_localized": {'type': 'string', 'required': False, 'nullable': True, 'empty': False},
"artist_source": {'type': 'string', 'required': False, 'nullable': True, 'empty': False},
"illustrator": {'type': 'string', 'required': True, 'empty': False},
"illustrator_source": {'type': 'string', 'required': False, 'nullable': True, 'empty': False},
"storyboarder": {'type': 'string', 'required': False, 'nullable': True, 'empty': False},
"charter": {'type': 'string', 'required': True, 'empty': False},
"music": {
'type': 'dict',
'schema': {
'path': {
'type': 'string',
'required': True,
}
}
},
"music_preview": {
'type': 'dict',
'schema': {
'path': {
'type': 'string',
'required': True,
}
}
},
"background": {
'type': 'dict',
'schema': {
'path': {
'type': 'string',
'required': True,
}
}
},
"charts": {
'type': 'list',
'minlength': 1,
'required': True,
'schema': {
'type': 'dict',
'schema': {
'type': {
'type': 'string',
'allowed': ['easy', 'hard', 'extreme'],
'required': True,
},
'name': {
'type': 'string',
'required': False,
'empty': False,
},
'difficulty': {
'type': 'integer',
'required': True,
'min': 0,
},
'path': {
'type': 'string',
'required': True,
}
}
}
}
})
schema_validator.allow_unknown = True
def countNotes(charts, zipFile):
if len(charts) < 1:
abort('400', 'at least one chart required!')
for chart in charts:
chartPath = chart.get('path', None)
with zipFile.open(chartPath) as chartFile: # open up the chart file to read
sha = hashlib.sha256()
# Calculate Hash
while True:
file_buffer = chartFile.read(65536)
if not len(file_buffer):
break
sha.update(file_buffer)
chart['hash'] = sha.hexdigest()
# Calculate Hash for cytus 2
try:
chartFile.seek(0)
chartdata = json.load(chartFile)
if 'note_list' not in chartdata:
abort(400, 'one or more charts is invalid')
chart['notesCount'] = len(chartdata['note_list'])
chart['checksum'] = cytus_2_chart_checksum(chartdata)
# Calculate Hash for cytus 1
except json.JSONDecodeError:
chartFile.seek(0)
count = 0
lines = [a.decode('utf-8') for a in chartFile.readlines()]
for line in lines:
if line.startswith('NOTE'):
count += 1
chart['notesCount'] = count
chart['checksum'] = cytus_1_chart_checksum(lines)
def getDuration(musicPath):
try:
result = subprocess.check_output([
'ffprobe',
'-v',
'error',
'-show_entries',
'format=duration',
'-of',
'default=noprint_wrappers=1:nokey=1',
musicPath
], stderr=subprocess.PIPE)
result = float(result)
return result
except subprocess.CalledProcessError as e:
print('Error when obtaining music duration with exit code {}'.format(e.returncode), file=sys.stderr)
print(e.output.decode(sys.getfilesystemencoding()), file=sys.stderr)
print(e.stderr.decode(sys.getfilesystemencoding()), file=sys.stderr)
abort(500, 'Unknown error when obtaining music duration')
def getMusicPreview(musicPath, targetPath, duration):
result = subprocess.run([
'ffmpeg',
'-ss', # Seek to given time position in seconds
str(0 if duration < 60 else 15),
'-t', # Restrict the transcoded/captured video sequence to the duration specified in seconds.
'30',
'-i', # input file
musicPath,
'-acodec', # output file
'copy',
targetPath
])
if result.returncode != 0:
abort(500, 'Unknown error during music preview generation')
def formatErrors(errors):
results = []
for key in errors:
error_items = errors[key]
error_strs = []
for item in error_items:
if isinstance(item, dict):
error_strs.append('(' + formatErrors(item) + ')')
else:
error_strs.append(item)
results.append(str(key) + ': ' + ', '.join(error_strs))
return '; '.join(results)
def parseLevelFile(filepath, bundlepath):
fileblob = bucket.get_blob(filepath)
if not fileblob:
abort(404, 'the package does not exist')
return
# Temp File to download level file into
filedata = {}
try:
with tempfile.TemporaryDirectory() as temp_dir, \
tempfile.TemporaryFile() as temp_file:
fileblob.download_to_file(temp_file)
filedata['size'] = temp_file.tell()
with zipfile.ZipFile(temp_file, 'r') as zipFile, \
zipFile.open('level.json') as meta_file:
metadata = json.load(meta_file)
if not schema_validator.validate(metadata):
results = []
for key in schema_validator.errors:
results.append(key + ': ' + ', '.join(schema_validator.errors[key]))
abort(400, '; '.join(results))
filedata['metadata'] = metadata
fileblob.content_disposition = 'attachment; filename="' + metadata['id'] + '.cytoidlevel"'
fileblob.cache_control = 'public, max-age=31536000'
fileblob.patch()
countNotes(metadata['charts'], zipFile)
zipFile.extract(metadata['background']['path'], path=temp_dir)
backgroundPath = os.path.join(temp_dir, metadata['background']['path'])
backgroundBlob = bucket.blob(
os.path.join(bundlepath, metadata['background']['path'])
)
backgroundBlob.upload_from_filename(backgroundPath)
backgroundBlob.cache_control = 'public, max-age=31536000'
backgroundBlob.patch()
os.remove(backgroundPath)
zipFile.extract(metadata['music']['path'], path=temp_dir)
musicPath = os.path.join(temp_dir, metadata['music']['path'])
duration = getDuration(musicPath)
filedata['duration'] = duration
musicBlob = bucket.blob(
os.path.join(bundlepath, metadata['music']['path'])
)
musicBlob.upload_from_filename(musicPath)
musicBlob.cache_control = 'public, max-age=31536000'
musicBlob.patch()
if 'music_preview' in metadata and 'path' in metadata['music_preview'] and \
metadata['music_preview']['path'] != metadata['music']['path']:
zipFile.extract(metadata['music_preview']['path'], path=temp_dir)
previewPath = os.path.join(temp_dir, metadata['music_preview']['path'])
previewBlob = bucket.blob(
os.path.join(bundlepath, metadata['music_preview']['path'])
)
previewBlob.upload_from_filename(previewPath)
previewBlob.cache_control = 'public, max-age=31536000'
previewBlob.patch()
os.remove(previewPath)
elif duration < 30:
# use original
metadata['music_preview'] = {
"path": metadata['music']['path']
}
else:
# generate new
extensionName = musicPath.split('.')[-1]
metadata['music_preview'] = {
"path": 'gen_preview.' + extensionName
}
previewPath = os.path.join(temp_dir, metadata['music_preview']['path'])
getMusicPreview(musicPath, previewPath, duration)
previewBlob = bucket.blob(
os.path.join(bundlepath, metadata['music_preview']['path'])
)
previewBlob.upload_from_filename(previewPath)
previewBlob.cache_control = 'public, max-age=31536000'
previewBlob.patch()
os.remove(previewPath)
os.remove(musicPath)
except FileNotFoundError as error:
print(error)
abort(400, 'file specified in metadata not found')
except zipfile.BadZipFile as error:
print(error)
abort(400, 'Zip package invalid')
except zipfile.LargeZipFile as error:
print(error)
abort(413, 'zip package too large')
except KeyError as error:
print(error)
abort(400, 'cant find the specified file in the zip package')
except json.decoder.JSONDecodeError as error:
print(error)
abort(400, 'Can not parse level.json. Invalid file format.')
return filedata
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment