Last active
March 19, 2024 03:04
-
-
Save ajkerrigan/1151733a4b92a03129aaf8f39d957444 to your computer and use it in GitHub Desktop.
Use S3 Select the ugly way
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
Search files in S3 similar to grep. | |
Use S3 Select with intentionally broad CSV parameters to make | |
it behave more like naive grep. This enables it to be used more | |
effectively on data which is malformed or unpredictably structured | |
without requiring a pre-processing step. | |
Reference: https://aws.amazon.com/blogs/aws/s3-glacier-select/ | |
""" | |
import gzip | |
import logging | |
import re | |
from collections import ChainMap | |
from concurrent.futures import ThreadPoolExecutor, as_completed | |
from io import BytesIO | |
import boto3 | |
import click | |
from botocore.exceptions import ClientError, EventStreamError | |
COMMON_SELECT_PARAMS = { | |
"ExpressionType": "SQL", | |
"InputSerialization": { | |
"CompressionType": "GZIP", | |
"CSV": { | |
"FileHeaderInfo": "IGNORE", | |
"RecordDelimiter": "\n", | |
"FieldDelimiter": "", | |
"QuoteCharacter": "", | |
"QuoteEscapeCharacter": "", | |
"AllowQuotedRecordDelimiter": False, | |
}, | |
}, | |
"OutputSerialization": { | |
"CSV": { | |
"RecordDelimiter": "\n", | |
"FieldDelimiter": "", | |
"QuoteCharacter": "", | |
"QuoteEscapeCharacter": "", | |
"QuoteFields": "ASNEEDED", | |
} | |
}, | |
} | |
@click.command() | |
@click.option("--source-bucket", help="Search objects in this bucket") | |
@click.option("--prefix", help="Search objects under this prefix within the bucket") | |
@click.option("--start-after", default="", help="Start object listing at this key") | |
@click.option( | |
"--thread-count", default=None, type=int, help="Use this many worker threads" | |
) | |
@click.option( | |
"--output-file", | |
default="-", | |
type=click.File("wb"), | |
help="Write to this output file (default: stdout)", | |
) | |
@click.option("--pattern", type=str, help="Search pattern used to match lines with a SQL WHERE/LIKE clause") | |
def search_objects(source_bucket, prefix, start_after, thread_count, output_file, pattern): | |
""" | |
Search objects in an S3 bucket for content matching the specified pattern. | |
""" | |
client = boto3.client("s3") | |
outbuffer = BytesIO() | |
params = { | |
"Bucket": source_bucket, | |
"StartAfter": start_after, | |
"Prefix": prefix, | |
"MaxKeys": 1000, | |
} | |
logging.getLogger().setLevel(logging.INFO) | |
logging.getLogger("boto").setLevel(logging.ERROR) | |
logging.getLogger("botocore").setLevel(logging.ERROR) | |
paginator = client.get_paginator("list_objects_v2") | |
with ThreadPoolExecutor(max_workers=thread_count) as executor: | |
futures = ( | |
executor.submit( | |
search_page, | |
page.get("Contents", []), source_bucket, pattern | |
) | |
for page in paginator.paginate(**params) | |
) | |
for future in as_completed(futures): | |
outbuffer.write(future.result()) | |
with gzip.open(output_file, "wb") as gzout: | |
gzout.write(outbuffer.getvalue()) | |
def search_page(page, source_bucket, pattern): | |
""" | |
Search a collection of S3 objects for the specified pattern. | |
Use S3 Select where possible, and fall back to streaming regex | |
in case of failures. | |
""" | |
logger = logging.getLogger() | |
s3_client = boto3.client("s3") | |
buf = BytesIO() | |
for obj in page: | |
key = obj.get("Key") | |
logger.info("Searching key: %s", key) | |
select_params = ChainMap( | |
{ | |
"Bucket": source_bucket, | |
"Key": key, | |
"Expression": f"Select * from S3Object s where s._1 like '%{pattern}%'", | |
}, | |
COMMON_SELECT_PARAMS, | |
) | |
try: | |
search_with_s3select(s3_client, select_params, buf) | |
except (ClientError, EventStreamError): | |
logger.exception("S3 select search failed for key: %s", key) | |
logger.info("Falling back to regex search...") | |
try: | |
search_with_regex(s3_client, source_bucket, key, pattern, buf) | |
except ClientError: | |
logger.exception( | |
"Regex search failed for key: %s", key) | |
return buf.getvalue() | |
def search_with_regex(s3_client, source_bucket, key, pattern, buf): | |
resp = s3_client.get_object(Bucket=source_bucket, Key=key) | |
with gzip.open(resp["Body"]._raw_stream, "rb") as gzbody: | |
matching_lines = ( | |
line | |
for line in gzbody | |
if re.search(pattern.replace("%", ".*").encode("utf-8"), line) | |
) | |
for line in matching_lines: | |
buf.write(line) | |
def search_with_s3select(s3_client, select_params, buf): | |
logger = logging.getLogger() | |
resp = s3_client.select_object_content(**select_params) | |
for event in resp["Payload"]: | |
if "Records" in event: | |
records = event["Records"]["Payload"] | |
buf.write(records) | |
elif "Stats" in event: | |
stats = event["Stats"]["Details"] | |
logger.info( | |
"Stats details bytesScanned: %s", stats["BytesScanned"] | |
) | |
logger.info( | |
"Stats details bytesProcessed: %s", | |
stats["BytesProcessed"], | |
) | |
if __name__ == "__main__": | |
logging.basicConfig(level=logging.INFO) | |
search_objects() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment