Created
November 26, 2019 21:45
-
-
Save parity3/ebaa2c6ec661ddcec2ade772a490b790 to your computer and use it in GitHub Desktop.
example of de-chunking a http-spec based chunked transfer encoding via a coroutine (entrypoint function: coro_te_with_packets)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import cStringIO | |
import logging | |
log = logging.getLogger('log') | |
class te_lengthfound_exception(StandardError): | |
def __init__(self, length, extension): | |
self.length=length | |
self.extension=extension | |
class te_trailer_exception(StandardError): | |
def __init__(self, trailer, extension): | |
self.trailer=trailer | |
self.extension=extension | |
def coro_TE_readstart(data,packetnum,write,getvalue,seek,truncate,readlines,trailer_ok=None,extension_ok=None): | |
# starting with data, read until the first line, and parse the number before it | |
# if it's a termination header, caller should catch a StopIteration | |
# otherwise the length is wrapped in a raised te_lengthfound_exception | |
# in either case, the buffer will contain the remaining data | |
# needs to start with an empty buffer | |
# packetnum is provided for debug / info purposes | |
while True: | |
write(data) | |
seek(0) | |
lines = readlines() | |
if not lines: | |
data = yield | |
if not data: | |
raise EOFError("%s - did not receive starting length: %s" % (packetnum, getvalue())) | |
continue | |
l = lines[0] | |
if not l.endswith(b'\r\n'): | |
data = yield | |
if not data: | |
raise EOFError("%s - did not receive starting length: %s" % (packetnum, getvalue())) | |
continue | |
lsplit = l.split(';', 1) | |
len_str = lsplit[0] | |
try: | |
length = int(len_str, 16) | |
except ValueError: | |
raise RuntimeError("%s - received: %s instead of a length: %s" % (packetnum, repr(getvalue()), repr(len_str))) | |
seek(0) | |
write(''.join(lines[1:])) | |
truncate() | |
extension = None if len(lsplit) == 1 else lsplit[1] | |
if length == 0: | |
while True: | |
seek(0) | |
lines = readlines() | |
try: | |
ind = lines.index(b'\r\n') | |
except ValueError: | |
while True: | |
data = yield | |
if not data: | |
raise EOFError("%s - did not receive TRAILER empty line: %s" % (packetnum, getvalue())) | |
write(data) | |
if b'\r\n' in data: | |
break | |
continue | |
remain = ''.join(lines[ind + 1:]) | |
seek(0) | |
write(remain) | |
truncate() | |
if ind: | |
if trailer_ok is None: | |
log.info('%s - throwing out TRAILER data: %s', packetnum, lines[:ind]) | |
else: | |
raise te_trailer_exception(lines[:ind], extension) | |
if extension_ok and extension: | |
raise te_trailer_exception([],extension) | |
# log.info('TRAILER ended, remaining: %s', remain) | |
return | |
else: | |
raise te_lengthfound_exception(length, extension) | |
class te_extradata(StandardError): | |
def __init__(self,extradata): | |
self.extradata=extradata | |
def coro_copy_data(packetnum,num_bytes_to_read,write, data=''): | |
# will raise te_extradata when num_bytes_to_read is reached | |
pos = 0 | |
if data: | |
len_data = len(data) | |
pos += len_data | |
if pos > num_bytes_to_read: | |
overage = pos-num_bytes_to_read | |
to_ind = len_data - overage | |
leftovers = data[to_ind:] | |
write(data[:to_ind]) | |
raise te_extradata(leftovers) | |
elif pos == num_bytes_to_read: | |
write(data) | |
raise te_extradata('') | |
else: | |
write(data) | |
while True: | |
data = yield | |
if not data: | |
raise EOFError("%s: EOF reached, pos: %s < %s"%(packetnum,pos,num_bytes_to_read)) | |
len_data = len(data) | |
pos += len_data | |
if pos > num_bytes_to_read: | |
overage = pos-num_bytes_to_read | |
to_ind = len_data - overage | |
leftovers = data[to_ind:] | |
write(data[:to_ind]) | |
raise te_extradata(leftovers) | |
elif pos == num_bytes_to_read: | |
write(data) | |
raise te_extradata('') | |
else: | |
write(data) | |
class Extension: | |
def __init__(self, packet_s, extension): | |
self.packet_s = packet_s | |
self.extension = extension | |
def coro_te_packets(trailer_container=None,ignore_extensions=True,buf=None): | |
# yields None until a verified full packet is available on the wire | |
if buf is None: | |
buf = cStringIO.StringIO() | |
getvalue = buf.getvalue | |
write = buf.write | |
truncate = buf.truncate | |
seek = buf.seek | |
readlines = buf.readlines | |
packetnum=0 | |
data = '' | |
packets = [] | |
while True: # each loop represents 1 TE chunk | |
# get the chunked TE length | |
c = coro_TE_readstart(data,packetnum,write,getvalue,seek,truncate,readlines, trailer_ok=trailer_container) | |
try: | |
next(c) | |
while True: | |
if packets: | |
p = packets | |
packets = [] | |
c.send((yield p)) | |
del p | |
else: | |
c.send((yield)) | |
except StopIteration: | |
if packets: | |
yield packets | |
return | |
except te_lengthfound_exception as e: | |
length = e.length | |
extension = e.extension | |
except te_trailer_exception as e: | |
if packets: | |
yield packets | |
trailer_container.extend(e.trailer) | |
return | |
# write to the buffer | |
data = getvalue() | |
truncate(0) | |
c = coro_copy_data(packetnum, length, write, data) | |
try: | |
next(c) | |
while True: | |
if packets: | |
p = packets | |
packets = [] | |
c.send((yield p)) | |
del p | |
else: | |
c.send((yield)) | |
except te_extradata as e: | |
data = e.extradata | |
packet_s = getvalue() | |
truncate(0) | |
# read final TE packet footer CRLF | |
c = coro_copy_data(packetnum,2,write,data) | |
try: | |
next(c) | |
while True: | |
if packets: | |
p = packets | |
packets = [] | |
c.send((yield p)) | |
del p | |
else: | |
c.send((yield)) | |
except te_extradata as e: | |
data = e.extradata | |
newline = getvalue() | |
if newline != b'\r\n': | |
raise RuntimeError("%s - final CRLF not found. Received: %s instead"%(packetnum,repr(newline+data))) | |
truncate(0) | |
# finally write the meta and notify | |
packetnum += 1 | |
if ignore_extensions or extension is None: | |
packets.append(packet_s) | |
else: | |
packets.append(Extension(packet_s,extension)) | |
def coro_te_with_packets(packets, trailer_container=None,ignore_extensions=True,buf=None): | |
# yields None until a verified full packet is available on the wire | |
if buf is None: | |
buf = cStringIO.StringIO() | |
getvalue = buf.getvalue | |
write = buf.write | |
truncate = buf.truncate | |
seek = buf.seek | |
readlines = buf.readlines | |
packetnum=0 | |
data = '' | |
while True: # each loop represents 1 TE chunk | |
# get the chunked TE length | |
c = coro_TE_readstart(data,packetnum,write,getvalue,seek,truncate,readlines, trailer_ok=trailer_container) | |
try: | |
next(c) | |
while True: | |
c.send((yield)) | |
except StopIteration: | |
return | |
except te_lengthfound_exception as e: | |
length = e.length | |
extension = e.extension | |
except te_trailer_exception as e: | |
if trailer_container is not None: | |
trailer_container.extend(e.trailer) | |
return | |
# write to the buffer | |
data = getvalue() | |
truncate(0) | |
c = coro_copy_data(packetnum, length, write, data) | |
try: | |
next(c) | |
while True: | |
c.send((yield)) | |
except te_extradata as e: | |
data = e.extradata | |
packet_s = getvalue() | |
truncate(0) | |
# read final TE packet footer CRLF | |
c = coro_copy_data(packetnum,2,write,data) | |
try: | |
next(c) | |
while True: | |
c.send((yield)) | |
except te_extradata as e: | |
data = e.extradata | |
newline = getvalue() | |
if newline != b'\r\n': | |
raise RuntimeError("%s - final CRLF not found. Received: %s instead"%(packetnum,repr(newline+data))) | |
truncate(0) | |
# finally write the meta and notify | |
packetnum += 1 | |
if ignore_extensions or extension is None: | |
packets.append(packet_s) | |
else: | |
packets.append(Extension(packet_s,extension)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment