Skip to content

Instantly share code, notes, and snippets.

@ajorg
Last active October 25, 2023 02:02
Show Gist options
  • Save ajorg/bb14202ad505126898ca91dcdbc05aa6 to your computer and use it in GitHub Desktop.
Save ajorg/bb14202ad505126898ca91dcdbc05aa6 to your computer and use it in GitHub Desktop.
Wire copies something off the Internet into S3, in a Lambda function
# Copyright Andrew Jorgensen
# SPDX-License-Identifier: MIT
"""Copies data from a URL sources to S3 objects"""
import hashlib
import json
import os
from base64 import b64encode
from decimal import Decimal
from urllib.parse import urlparse
from urllib.request import Request, urlopen
import boto3
from botocore.exceptions import ClientError
NAME = "Wire"
VERSION = "1.0"
SOURCE = "https://gist.github.com/ajorg/bb14202ad505126898ca91dcdbc05aa6"
HEADERS = {"User-Agent": os.environ.get("USER_AGENT", f"{NAME}/{VERSION} ({SOURCE})")}
OBJECTS_WEEKLY = json.loads(os.environ.get("OBJECTS_WEEKLY", "{}"))
OBJECTS_DAILY = json.loads(os.environ.get("OBJECTS_DAILY", "{}"))
OBJECTS_HOURLY = json.loads(os.environ.get("OBJECTS_HOURLY", "{}"))
CLIENT = boto3.client("s3")
def wire(destination, url):
"""Copies data from a URL source to an S3 object"""
print(f"Wiring {url} to {destination}")
parsed_d = urlparse(destination)
key = parsed_d.path.lstrip("/")
bucket = parsed_d.netloc
if key.endswith(".csv"):
content_type = "text/csv"
elif key.endswith(".json"):
content_type = "application/json"
elif key.endswith(".zip"):
content_type = "application/zip"
else:
content_type = "application/octet-stream"
request = Request(url, headers=HEADERS)
with urlopen(request) as response:
if response.status >= 300:
raise ()
data = response.read()
md5 = hashlib.new("md5")
md5.update(data)
size = Decimal(len(data)) / 2**20
print(f"{size:.2f}MiB")
print(md5.hexdigest())
try:
obj = CLIENT.head_object(Bucket=bucket, Key=key)
except ClientError:
obj = {}
if obj.get("ETag") != f'"{md5.hexdigest()}"':
CLIENT.put_object(
Bucket=bucket,
Key=key,
ACL="public-read",
Body=data,
ContentMD5=b64encode(md5.digest()).decode("us-ascii"),
ContentType=content_type,
Metadata={"Location": url},
)
print(f"Put {key}")
else:
print(f"Matched {key}")
def lambda_handler(event, context):
"""AWS Lambda handler function"""
# print(json.dumps(dict(os.environ), sort_keys=True))
print(json.dumps(event, sort_keys=True))
if "resources" not in event:
# Assume raw invocation
objects = event
else:
source = event["resources"][0]
source_name = source.split("/")[1]
if source_name in ("weekly",):
objects = OBJECTS_WEEKLY
elif source_name in ("1pm-UTC", "daily"):
objects = OBJECTS_DAILY
elif source_name in ("hourly",):
objects = OBJECTS_HOURLY
else:
raise ValueError(f"Unknown event source {source}")
print(json.dumps(objects, sort_keys=True))
for destination, url in objects.items():
try:
wire(destination, url)
except Exception as e:
print(e)
if __name__ == "__main__":
from argparse import ArgumentParser
parser = ArgumentParser(prog=NAME)
parser.add_argument("objects")
args = parser.parse_args()
lambda_handler(json.loads(args.objects), None)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment