Skip to content

Instantly share code, notes, and snippets.

@akaihola
Last active May 31, 2018 20:02
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save akaihola/7d8a3ed4da73e3e9c4f0076f18c459e5 to your computer and use it in GitHub Desktop.
Save akaihola/7d8a3ed4da73e3e9c4f0076f18c459e5 to your computer and use it in GitHub Desktop.
Split a non-seekable stream into a header reader and a content reader
import io
from typing import Tuple
def tee_stream_head(stream: io.IOBase,
head_max_lines: int=3) -> Tuple[io.RawIOBase,
io.RawIOBase]:
"""Split a stream, enable parallel reading of initial lines
This helper solves the use case where a limited number of initial lines
from a non-seekable need to be inspected before reading the whole stream
again in one go.
Memory is only consumed for the initial lines instead of buffering all read
data until the end of the stream.
Example (assuming no compression done by the HTTP server)::
>>> response = requests.get('http://acme/', stream=True)
>>> headers, content = tee_stream_head(response.raw)
>>> first_line = next(headers)
>>> second_line = next(headers)
>>> all_lines = content.readlines()
:param stream: The stream to read
:param head_max_lines: The number of initial lines to keep in a buffer
:return: Two readers for the stream. The first one only can read the
initial lines, and the second one all the way until the end of the
stream. The same initial lines can be read from both readers
independently.
"""
head_line_count = 0
head = io.BytesIO()
class HeadBufferedReader(io.RawIOBase):
"""Buffering of initial lines of the stream"""
def __init__(self, head_only: bool):
super(HeadBufferedReader, self).__init__()
self.read_pos = 0
self.head_only = head_only
def readable(self) -> bool:
return True
def readinto(self, buf: memoryview) -> int:
"""Read data from the stream or stored initial lines into a buffer
If the other reader has already read some initial lines into the
`head` buffer, and the read pointer isn't beyond the initial lines,
re-use that data. Otherwise read data from the stream and insert it
into the `head` buffer for up to `head_max_lines` lines of data.
:param buf: The buffer to fill with read data
:return: The number of bytes read into the buffer
"""
nonlocal head_line_count
buf_len = len(buf)
head_length = head.tell()
if self.read_pos < head_length:
# there's unread data in the head
head.seek(self.read_pos)
chunk = head.read(buf_len)
# seek to zero bytes (0) from end of file (2)
head.seek(0, 2)
else:
if self.head_only and head_line_count >= head_max_lines:
# check if we've read beyond the head
raise ValueError(f"Can't read beyond the first "
f"{head_max_lines} lines")
# read data from the actual stream
chunk = stream.read(buf_len)
if head_line_count < head_max_lines:
head.write(chunk)
lines_read = chunk.count(b'\n')
head_line_count += lines_read
chunk_len = len(chunk)
buf[:chunk_len] = chunk
self.read_pos += chunk_len
return chunk_len
header_reader = HeadBufferedReader(head_only=True)
all_content_reader = HeadBufferedReader(head_only=False)
return header_reader, all_content_reader
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment