Skip to content

Instantly share code, notes, and snippets.

@parity3
Created November 26, 2019 21:45
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save parity3/ebaa2c6ec661ddcec2ade772a490b790 to your computer and use it in GitHub Desktop.
Save parity3/ebaa2c6ec661ddcec2ade772a490b790 to your computer and use it in GitHub Desktop.
example of de-chunking a http-spec based chunked transfer encoding via a coroutine (entrypoint function: coro_te_with_packets)
import cStringIO
import logging
log = logging.getLogger('log')
class te_lengthfound_exception(StandardError):
def __init__(self, length, extension):
self.length=length
self.extension=extension
class te_trailer_exception(StandardError):
def __init__(self, trailer, extension):
self.trailer=trailer
self.extension=extension
def coro_TE_readstart(data,packetnum,write,getvalue,seek,truncate,readlines,trailer_ok=None,extension_ok=None):
# starting with data, read until the first line, and parse the number before it
# if it's a termination header, caller should catch a StopIteration
# otherwise the length is wrapped in a raised te_lengthfound_exception
# in either case, the buffer will contain the remaining data
# needs to start with an empty buffer
# packetnum is provided for debug / info purposes
while True:
write(data)
seek(0)
lines = readlines()
if not lines:
data = yield
if not data:
raise EOFError("%s - did not receive starting length: %s" % (packetnum, getvalue()))
continue
l = lines[0]
if not l.endswith(b'\r\n'):
data = yield
if not data:
raise EOFError("%s - did not receive starting length: %s" % (packetnum, getvalue()))
continue
lsplit = l.split(';', 1)
len_str = lsplit[0]
try:
length = int(len_str, 16)
except ValueError:
raise RuntimeError("%s - received: %s instead of a length: %s" % (packetnum, repr(getvalue()), repr(len_str)))
seek(0)
write(''.join(lines[1:]))
truncate()
extension = None if len(lsplit) == 1 else lsplit[1]
if length == 0:
while True:
seek(0)
lines = readlines()
try:
ind = lines.index(b'\r\n')
except ValueError:
while True:
data = yield
if not data:
raise EOFError("%s - did not receive TRAILER empty line: %s" % (packetnum, getvalue()))
write(data)
if b'\r\n' in data:
break
continue
remain = ''.join(lines[ind + 1:])
seek(0)
write(remain)
truncate()
if ind:
if trailer_ok is None:
log.info('%s - throwing out TRAILER data: %s', packetnum, lines[:ind])
else:
raise te_trailer_exception(lines[:ind], extension)
if extension_ok and extension:
raise te_trailer_exception([],extension)
# log.info('TRAILER ended, remaining: %s', remain)
return
else:
raise te_lengthfound_exception(length, extension)
class te_extradata(StandardError):
def __init__(self,extradata):
self.extradata=extradata
def coro_copy_data(packetnum,num_bytes_to_read,write, data=''):
# will raise te_extradata when num_bytes_to_read is reached
pos = 0
if data:
len_data = len(data)
pos += len_data
if pos > num_bytes_to_read:
overage = pos-num_bytes_to_read
to_ind = len_data - overage
leftovers = data[to_ind:]
write(data[:to_ind])
raise te_extradata(leftovers)
elif pos == num_bytes_to_read:
write(data)
raise te_extradata('')
else:
write(data)
while True:
data = yield
if not data:
raise EOFError("%s: EOF reached, pos: %s < %s"%(packetnum,pos,num_bytes_to_read))
len_data = len(data)
pos += len_data
if pos > num_bytes_to_read:
overage = pos-num_bytes_to_read
to_ind = len_data - overage
leftovers = data[to_ind:]
write(data[:to_ind])
raise te_extradata(leftovers)
elif pos == num_bytes_to_read:
write(data)
raise te_extradata('')
else:
write(data)
class Extension:
def __init__(self, packet_s, extension):
self.packet_s = packet_s
self.extension = extension
def coro_te_packets(trailer_container=None,ignore_extensions=True,buf=None):
# yields None until a verified full packet is available on the wire
if buf is None:
buf = cStringIO.StringIO()
getvalue = buf.getvalue
write = buf.write
truncate = buf.truncate
seek = buf.seek
readlines = buf.readlines
packetnum=0
data = ''
packets = []
while True: # each loop represents 1 TE chunk
# get the chunked TE length
c = coro_TE_readstart(data,packetnum,write,getvalue,seek,truncate,readlines, trailer_ok=trailer_container)
try:
next(c)
while True:
if packets:
p = packets
packets = []
c.send((yield p))
del p
else:
c.send((yield))
except StopIteration:
if packets:
yield packets
return
except te_lengthfound_exception as e:
length = e.length
extension = e.extension
except te_trailer_exception as e:
if packets:
yield packets
trailer_container.extend(e.trailer)
return
# write to the buffer
data = getvalue()
truncate(0)
c = coro_copy_data(packetnum, length, write, data)
try:
next(c)
while True:
if packets:
p = packets
packets = []
c.send((yield p))
del p
else:
c.send((yield))
except te_extradata as e:
data = e.extradata
packet_s = getvalue()
truncate(0)
# read final TE packet footer CRLF
c = coro_copy_data(packetnum,2,write,data)
try:
next(c)
while True:
if packets:
p = packets
packets = []
c.send((yield p))
del p
else:
c.send((yield))
except te_extradata as e:
data = e.extradata
newline = getvalue()
if newline != b'\r\n':
raise RuntimeError("%s - final CRLF not found. Received: %s instead"%(packetnum,repr(newline+data)))
truncate(0)
# finally write the meta and notify
packetnum += 1
if ignore_extensions or extension is None:
packets.append(packet_s)
else:
packets.append(Extension(packet_s,extension))
def coro_te_with_packets(packets, trailer_container=None,ignore_extensions=True,buf=None):
# yields None until a verified full packet is available on the wire
if buf is None:
buf = cStringIO.StringIO()
getvalue = buf.getvalue
write = buf.write
truncate = buf.truncate
seek = buf.seek
readlines = buf.readlines
packetnum=0
data = ''
while True: # each loop represents 1 TE chunk
# get the chunked TE length
c = coro_TE_readstart(data,packetnum,write,getvalue,seek,truncate,readlines, trailer_ok=trailer_container)
try:
next(c)
while True:
c.send((yield))
except StopIteration:
return
except te_lengthfound_exception as e:
length = e.length
extension = e.extension
except te_trailer_exception as e:
if trailer_container is not None:
trailer_container.extend(e.trailer)
return
# write to the buffer
data = getvalue()
truncate(0)
c = coro_copy_data(packetnum, length, write, data)
try:
next(c)
while True:
c.send((yield))
except te_extradata as e:
data = e.extradata
packet_s = getvalue()
truncate(0)
# read final TE packet footer CRLF
c = coro_copy_data(packetnum,2,write,data)
try:
next(c)
while True:
c.send((yield))
except te_extradata as e:
data = e.extradata
newline = getvalue()
if newline != b'\r\n':
raise RuntimeError("%s - final CRLF not found. Received: %s instead"%(packetnum,repr(newline+data)))
truncate(0)
# finally write the meta and notify
packetnum += 1
if ignore_extensions or extension is None:
packets.append(packet_s)
else:
packets.append(Extension(packet_s,extension))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment