Skip to content

Instantly share code, notes, and snippets.

@Ugbot
Created April 26, 2024 09:15
Show Gist options
  • Save Ugbot/430a8be3b3def413a2ee3300a3598f12 to your computer and use it in GitHub Desktop.
Save Ugbot/430a8be3b3def413a2ee3300a3598f12 to your computer and use it in GitHub Desktop.
s3 sql
import ast
import boto3
from botocore.exceptions import ClientError
def stream_s3_file(bucket: str, key: str, file_size: int, chunk_bytes=5000) -> tuple[dict]:
"""Streams a S3 file via a generator.
Args:
bucket (str): S3 bucket
key (str): S3 object path
chunk_bytes (int): Chunk size in bytes. Defaults to 5000
Returns:
tuple[dict]: Returns a tuple of dictionary containing rows of file content
"""
aws_profile = current_app.config.get('AWS_PROFILE_NAME')
s3_client = boto3.session.Session(profile_name=aws_profile).client('s3')
expression = 'SELECT * FROM S3Object'
start_range = 0
end_range = min(chunk_bytes, file_size)
while start_range < file_size:
response = s3_client.select_object_content(
Bucket=bucket,
Key=key,
ExpressionType='SQL',
Expression=expression,
InputSerialization={
'CSV': {
'FileHeaderInfo': 'USE',
'FieldDelimiter': ',',
'RecordDelimiter': '\n'
}
},
OutputSerialization={
'JSON': {
'RecordDelimiter': ','
}
},
ScanRange={
'Start': start_range,
'End': end_range
},
)
"""
select_object_content() response is an event stream that can be looped to concatenate the overall result set
Hence, we are joining the results of the stream in a string before converting it to a tuple of dict
"""
result_stream = []
for event in response['Payload']:
if records := event.get('Records'):
result_stream.append(records['Payload'].decode('utf-8'))
yield ast.literal_eval(''.join(result_stream))
start_range = end_range
end_range = end_range + min(chunk_bytes, file_size - end_range)
def s3_file_processing():
bucket = '<s3-bucket>'
key = '<s3-key>'
file_size = get_s3_file_size(bucket=bucket, key=key)
logger.debug(f'Initiating streaming file of {file_size} bytes')
chunk_size = 524288 # 512KB or 0.5MB
for file_chunk in stream_s3_file(bucket=bucket, key=key,
file_size=file_size, chunk_bytes=chunk_size):
logger.info(f'\n{30 * "*"} New chunk {30 * "*"}')
id_set = set()
for row in file_chunk:
# perform any other processing here
id_set.add(int(row.get('id')))
logger.info(f'{min(id_set)} --> {max(id_set)}')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment