Skip to content

Instantly share code, notes, and snippets.

@kakarukeys
Last active October 21, 2021 05:56
Show Gist options
  • Save kakarukeys/2ec6e692001f668e822820649e93349a to your computer and use it in GitHub Desktop.
Save kakarukeys/2ec6e692001f668e822820649e93349a to your computer and use it in GitHub Desktop.
stream from gz
import io
import time
from gzip import GzipFile
import pandas as pd
# https://stackoverflow.com/a/20260030/496852
def iterable_to_stream(iterable, buffer_size=io.DEFAULT_BUFFER_SIZE):
"""
Lets you use an iterable (e.g. a generator) that yields bytestrings as a read-only
input stream.
The stream implements Python 3's newer I/O API (available in Python 2's io module).
For efficiency, the stream is buffered.
"""
class IterStream(io.RawIOBase):
def __init__(self):
self.leftover = None
def readable(self):
return True
def readinto(self, b):
try:
l = len(b) # We're supposed to return at most this much
chunk = self.leftover or next(iterable)
output, self.leftover = chunk[:l], chunk[l:]
b[:len(output)] = output
return len(output)
except StopIteration:
return 0 # indicate EOF
return io.BufferedReader(IterStream(), buffer_size=buffer_size)
def convert_to_line_delimited(fobj):
buffer = b''
for line in fobj:
if line in (b'[\n', b']\n'):
continue
if line == b'},\n':
yield buffer.lstrip() + b'}\n'
buffer = b''
else:
buffer += line.rstrip()
yield buffer.lstrip()
# with GzipFile("a.gz") as gzf:
# json_stream = iterable_to_stream(convert_to_line_delimited(gzf))
# for d in pd.read_json(json_stream, lines=True, chunksize=1):
# print(d)
# time.sleep(1)
with GzipFile("a.gz") as gzf:
for d in convert_to_line_delimited(gzf):
time.sleep(1)
print(json.loads(d))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment