Skip to content

Instantly share code, notes, and snippets.

@FelixWolf
Created August 22, 2021 21:28
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save FelixWolf/66e989a1eb1f7fff3be26219c2da561d to your computer and use it in GitHub Desktop.
Save FelixWolf/66e989a1eb1f7fff3be26219c2da561d to your computer and use it in GitHub Desktop.
@addAssetHandler("texture") #Default = j2c
def getTexture(Key, Handle, format="j2c", nocache=False):
if format == "j2c":
return (
200,
[
('Content-type','image/x-j2c'),
('Content-Disposition', 'inline; filename="{}.texture"'.format(Key))
]+utilNeverExpireHeaders(),
Handle.read()
)
elif format == "png":
exists, cHandle = getCache(Key, "png")
if exists:
return (
200,
[
('Content-type','image/png'),
('Content-Disposition', 'inline; filename="{}.png"'.format(Key))
]+utilNeverExpireHeaders(),
cHandle.read()
)
else:
proc = subprocess.Popen(['convert', 'j2c:-', 'png:-'],
stdout=subprocess.PIPE,
stdin=subprocess.PIPE
)
proc.stdin.write(Handle.read())
proc.stdin.close()
result = proc.stdout.read()
cHandle.write(result)
return (
200,
[
('Content-type','image/png'),
('Content-Disposition', 'inline; filename="{}.png"'.format(Key))
]+utilNeverExpireHeaders(),
result
)
elif format == "tga":
exists, cHandle = getCache(Key, "tga")
if exists:
return (
200,
[
('Content-type','image/tga'),
('Content-Disposition', 'inline; filename="{}.tga"'.format(Key))
]+utilNeverExpireHeaders(),
cHandle.read()
)
else:
proc = subprocess.Popen(['convert', 'j2c:-', 'tga:-'],
stdout=subprocess.PIPE,
stdin=subprocess.PIPE
)
proc.stdin.write(Handle.read())
proc.stdin.close()
result = proc.stdout.read()
cHandle.write(result)
return (
200,
[
('Content-type','image/tga'),
('Content-Disposition', 'inline; filename="{}.tga"'.format(Key))
]+utilNeverExpireHeaders(),
result
)
elif format == "jpg":
exists, cHandle = getCache(Key, "jpg")
if exists:
return (
200,
[
('Content-type','image/jpg'),
('Content-Disposition', 'inline; filename="{}.jpg"'.format(Key))
]+utilNeverExpireHeaders(),
cHandle.read()
)
else:
proc = subprocess.Popen(['convert', 'j2c:-', 'jpg:-'],
stdout=subprocess.PIPE,
stdin=subprocess.PIPE
)
proc.stdin.write(Handle.read())
proc.stdin.close()
result = proc.stdout.read()
cHandle.write(result)
return (
200,
[
('Content-type','image/jpg'),
('Content-Disposition', 'inline; filename="{}.jpg"'.format(Key))
]+utilNeverExpireHeaders(),
result
)
elif format == "json":
result = {
"has_data": False,
"agent_id": "",
"date": 0,
"size": [0,0],
"color": [0,0,0,0]
}
comments = None
while True:
marker_bytes = Handle.read(2)
marker, marker_type = struct.unpack('BB', marker_bytes)
if marker != 0xff:
break
if marker_type == 0x90:
# we found the SOT, so we're done.
try:
if type(comments) == bytes:
break
col = int(comments["c"],base=16)
result = {
"has_data": True,
"agent_id": comments["a"],
"date": datetime.datetime.strptime(comments["z"], "%Y%m%d%H%M%S").timestamp(),
"original_size": [int(comments.get("w", 0)), int(comments.get("h", 0))],
"color": [col>>24&0xff,col>>16&0xff,col>>8&0xff,col&0xff]
}
except Exception as e:
result = {
"has_data": False,
"agent_id": "",
"date": 0,
"size": [0,0],
"color": [0,0,0,0],
"debug": str(e)
}
break
elif marker_type == 0x4f:
# this is SOC, which does not have a lsoc element
continue
size_bytes = Handle.read(2)
size = struct.unpack('!H', size_bytes)[0] - 2
if marker_type == 0x64:
# found a CME. read it, and interpret it.
rcme = Handle.read(2)
rcme = struct.unpack('!H', rcme)[0]
if rcme == 1:
raw_comment = Handle.read(size - 2)
new_comment = queryStringParser(raw_comment.decode())
if len(new_comment) == 0:
new_comment = raw_comment
else:
new_comment = None
if comments is None:
comments = new_comment
elif type(comments) == list:
comments.append(new_comment)
elif type(comments) == dict:
if type(new_comment) == dict:
comments.update(new_comment)
else:
comments = [comments, new_comments]
continue
Handle.seek(size, 1)
return (
200,
[
('Content-type', 'application/json'),
('Content-Disposition', 'inline; filename="{}.json"'.format(Key))
]+utilNeverExpireHeaders(),
json.dumps(result).encode()
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment