Skip to content

Instantly share code, notes, and snippets.

@ajkerrigan
Last active March 19, 2024 03:04
Show Gist options
  • Save ajkerrigan/1151733a4b92a03129aaf8f39d957444 to your computer and use it in GitHub Desktop.
Save ajkerrigan/1151733a4b92a03129aaf8f39d957444 to your computer and use it in GitHub Desktop.
Use S3 Select the ugly way
#!/usr/bin/env python3
"""
Search files in S3 similar to grep.
Use S3 Select with intentionally broad CSV parameters to make
it behave more like naive grep. This enables it to be used more
effectively on data which is malformed or unpredictably structured
without requiring a pre-processing step.
Reference: https://aws.amazon.com/blogs/aws/s3-glacier-select/
"""
import gzip
import logging
import re
from collections import ChainMap
from concurrent.futures import ThreadPoolExecutor, as_completed
from io import BytesIO
import boto3
import click
from botocore.exceptions import ClientError, EventStreamError
COMMON_SELECT_PARAMS = {
"ExpressionType": "SQL",
"InputSerialization": {
"CompressionType": "GZIP",
"CSV": {
"FileHeaderInfo": "IGNORE",
"RecordDelimiter": "\n",
"FieldDelimiter": "",
"QuoteCharacter": "",
"QuoteEscapeCharacter": "",
"AllowQuotedRecordDelimiter": False,
},
},
"OutputSerialization": {
"CSV": {
"RecordDelimiter": "\n",
"FieldDelimiter": "",
"QuoteCharacter": "",
"QuoteEscapeCharacter": "",
"QuoteFields": "ASNEEDED",
}
},
}
@click.command()
@click.option("--source-bucket", help="Search objects in this bucket")
@click.option("--prefix", help="Search objects under this prefix within the bucket")
@click.option("--start-after", default="", help="Start object listing at this key")
@click.option(
"--thread-count", default=None, type=int, help="Use this many worker threads"
)
@click.option(
"--output-file",
default="-",
type=click.File("wb"),
help="Write to this output file (default: stdout)",
)
@click.option("--pattern", type=str, help="Search pattern used to match lines with a SQL WHERE/LIKE clause")
def search_objects(source_bucket, prefix, start_after, thread_count, output_file, pattern):
"""
Search objects in an S3 bucket for content matching the specified pattern.
"""
client = boto3.client("s3")
outbuffer = BytesIO()
params = {
"Bucket": source_bucket,
"StartAfter": start_after,
"Prefix": prefix,
"MaxKeys": 1000,
}
logging.getLogger().setLevel(logging.INFO)
logging.getLogger("boto").setLevel(logging.ERROR)
logging.getLogger("botocore").setLevel(logging.ERROR)
paginator = client.get_paginator("list_objects_v2")
with ThreadPoolExecutor(max_workers=thread_count) as executor:
futures = (
executor.submit(
search_page,
page.get("Contents", []), source_bucket, pattern
)
for page in paginator.paginate(**params)
)
for future in as_completed(futures):
outbuffer.write(future.result())
with gzip.open(output_file, "wb") as gzout:
gzout.write(outbuffer.getvalue())
def search_page(page, source_bucket, pattern):
"""
Search a collection of S3 objects for the specified pattern.
Use S3 Select where possible, and fall back to streaming regex
in case of failures.
"""
logger = logging.getLogger()
s3_client = boto3.client("s3")
buf = BytesIO()
for obj in page:
key = obj.get("Key")
logger.info("Searching key: %s", key)
select_params = ChainMap(
{
"Bucket": source_bucket,
"Key": key,
"Expression": f"Select * from S3Object s where s._1 like '%{pattern}%'",
},
COMMON_SELECT_PARAMS,
)
try:
search_with_s3select(s3_client, select_params, buf)
except (ClientError, EventStreamError):
logger.exception("S3 select search failed for key: %s", key)
logger.info("Falling back to regex search...")
try:
search_with_regex(s3_client, source_bucket, key, pattern, buf)
except ClientError:
logger.exception(
"Regex search failed for key: %s", key)
return buf.getvalue()
def search_with_regex(s3_client, source_bucket, key, pattern, buf):
resp = s3_client.get_object(Bucket=source_bucket, Key=key)
with gzip.open(resp["Body"]._raw_stream, "rb") as gzbody:
matching_lines = (
line
for line in gzbody
if re.search(pattern.replace("%", ".*").encode("utf-8"), line)
)
for line in matching_lines:
buf.write(line)
def search_with_s3select(s3_client, select_params, buf):
logger = logging.getLogger()
resp = s3_client.select_object_content(**select_params)
for event in resp["Payload"]:
if "Records" in event:
records = event["Records"]["Payload"]
buf.write(records)
elif "Stats" in event:
stats = event["Stats"]["Details"]
logger.info(
"Stats details bytesScanned: %s", stats["BytesScanned"]
)
logger.info(
"Stats details bytesProcessed: %s",
stats["BytesProcessed"],
)
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
search_objects()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment