Skip to content

Instantly share code, notes, and snippets.

@ShahBinoy
Forked from uhho/pandas_s3_streaming.py
Created November 1, 2020 05:51
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ShahBinoy/90c9b0369df18acbf782b54a954d0190 to your computer and use it in GitHub Desktop.
Save ShahBinoy/90c9b0369df18acbf782b54a954d0190 to your computer and use it in GitHub Desktop.
Streaming pandas DataFrame to/from S3 with on-the-fly processing and GZIP compression
def s3_to_pandas(client, bucket, key, header=None):
# get key using boto3 client
obj = client.get_object(Bucket=bucket, Key=key)
gz = gzip.GzipFile(fileobj=obj['Body'])
# load stream directly to DF
return pd.read_csv(gz, header=header, dtype=str)
def s3_to_pandas_with_processing(client, bucket, key, header=None):
# get key using boto3 client
obj = client.get_object(Bucket=bucket, Key=key)
gz = gzip.GzipFile(fileobj=obj['Body'])
# replace some characters in incomming stream and load it to DF
lines = "\n".join([line.replace('?', ' ') for line in gz.read().decode('utf-8').split('\n')])
return pd.read_csv(io.StringIO(lines), header=None, dtype=str)
def pandas_to_s3(df, client, bucket, key):
# write DF to string stream
csv_buffer = io.StringIO()
df.to_csv(csv_buffer, index=False)
# reset stream position
csv_buffer.seek(0)
# create binary stream
gz_buffer = io.BytesIO()
# compress string stream using gzip
with gzip.GzipFile(mode='w', fileobj=gz_buffer) as gz_file:
gz_file.write(bytes(csv_buffer.getvalue(), 'utf-8'))
# write stream to S3
obj = client.put_object(Bucket=bucket, Key=key, Body=gz_buffer.getvalue())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment