Skip to content

Instantly share code, notes, and snippets.

@vacing
Last active November 27, 2020 09:39
Show Gist options
  • Save vacing/f65d0c45638e902b8a68ffb5c5f557ac to your computer and use it in GitHub Desktop.
Save vacing/f65d0c45638e902b8a68ffb5c5f557ac to your computer and use it in GitHub Desktop.
import sys
import re
from StringIO import StringIO
# modify from: https://github.com/simon-engledew/python-chunks
# ref: https://github.com/jchristn/ChunkDecoder
'''
Usage:
decode_http_chunked.py file_in file_out
test on: python 2.7.5
file format:
str(length)\r\n
data\r\n
str(length)\r\n
data\r\n
....
str(length)\r\n
data\r\n
'''
def from_pattern(pattern, type, *args):
def coerce(value):
value = str(value)
match = pattern.search(value)
if match is not None:
return type(match.group(1), *args)
raise ValueError('unable to coerce "%s" into a %s' % (value, type.__name__))
return coerce
to_int = from_pattern(re.compile('([-+]?[0-9]+)', re.IGNORECASE), int)
to_hex = from_pattern(re.compile('([-+]?[0-9A-F]+)', re.IGNORECASE), int, 16)
to_float = from_pattern(re.compile('([-+]?[0-9]*\.?[0-9]+)'), float)
to_megabytes = lambda n: n * 1024 * 1024
def encode(fileobj, chunk_limit=to_megabytes(0.5)):
while True:
value = fileobj.read(int(chunk_limit))
bytes = len(value)
if bytes:
yield '%x\r\n' % bytes
yield '%s\r\n' % value
else:
yield '0\r\n'
yield '\r\n'
return
def decode(fileobj, chunk_limit=to_megabytes(1)):
i = 0
while True:
i += 1
# str(length) + \r\n
index = fileobj.readline(len('%x' % chunk_limit))
if not index:
if fileobj.read(1) == "":
print "file eof"
return
raise EOFError('unexpected blank line')
else:
# strip \r\n
print str(i) + "," + index.rstrip()
length = to_hex(index)
if length > chunk_limit:
raise OverflowError('invalid chunk size of "%d" requested, max is "%d"' % (length, chunk_limit))
# data
value = fileobj.read(length)
assert len(value) == length
yield value
# tail, \r\n
tail = fileobj.read(2)
if not tail:
raise ValueError('missing \\r\\n after chunk')
assert tail == '\r\n', 'unexpected characters "%s" after chunk' % tail
if not length:
return
'''
input = """25\r\nThis is the data in the first chunk\r\n\r\n8\r\nsequence\r\n0\r\n\r\n"""
decoded = StringIO()
for chunk in decode(StringIO(input)):
decoded.write(chunk)
decoded.seek(0)
print decoded.getvalue()
'''
if __name__ == '__main__':
if len(sys.argv) != 3:
print "Usage:", sys.argv[0], "file_in file_out"
sys.exit(1)
filePath = sys.argv[1]
filePathDst = sys.argv[2]
print "dechunk [", filePath, "] to [", filePathDst, "]"
with open(filePath, "rb") as fin, open(filePathDst, "wb") as fout:
for chunk in decode(fin):
fout.write(chunk)
fout.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment