Skip to content

Instantly share code, notes, and snippets.

Created October 25, 2012 05:01
Show Gist options
  • Save anonymous/3950498 to your computer and use it in GitHub Desktop.
Save anonymous/3950498 to your computer and use it in GitHub Desktop.
class ChunkedReader(object):
def __init__(self, req, unreader):
self.req = req
self.parser = self.parse_chunked(unreader)
self.buf = StringIO()
def read(self, size):
if self.parser:
while self.buf.tell() < size:
try:
self.buf.write(self.parser.next())
except StopIteration:
self.parser = None
break
def parse_chunked(self, unreader):
(size, rest) = self.parse_chunk_size(unreader)
while size > 0:
while size > len(rest):
size -= len(rest)
yield rest
rest = unreader.read()
if not rest:
raise NoMoreData()
yield rest[:size]
# Remove \r\n after chunk
rest = rest[size:]
while len(rest) < 2:
rest += unreader.read()
if rest[:2] != '\r\n':
raise ChunkMissingTerminator(rest[:2])
(size, rest) = self.parse_chunk_size(unreader, data=rest[2:])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment