Skip to content

Instantly share code, notes, and snippets.

@gabcoyne
Last active March 7, 2022 20:35
Show Gist options
  • Save gabcoyne/6f6265b863a8f41a04a1ec2309f52ef8 to your computer and use it in GitHub Desktop.
Save gabcoyne/6f6265b863a8f41a04a1ec2309f52ef8 to your computer and use it in GitHub Desktop.
Flow Starter Lambda
import json
import logging
import os
import typing
from datetime import datetime
import boto3
import prefect
from prefect.run_configs.kubernetes import KubernetesRun
logging.basicConfig()
logger = logging.getLogger()
logger.setLevel(logging.INFO)
s3_client = boto3.client("s3")
ssm_client = boto3.client("ssm", "us-east-1")
def decrypt_parameter(parameter_name: str):
parameter = ssm_client.get_parameter(Name=parameter_name, WithDecryption=True)
return parameter["Parameter"]["Value"]
PREFECT_AUTH_TOKEN = decrypt_parameter(f"/{os.getenv('STAGE')}/prefect/token")
PREFECT_VERSION_GROUP_ID = decrypt_parameter(
f"/{os.getenv('STAGE')}/prefect/my_important_flow/version_group_id"
)
prefect_client = prefect.Client(api_key=PREFECT_AUTH_TOKEN)
def trigger_flow_run(
bucket_name: str,
s3_key: list,
memory_request: int,
) -> dict:
return prefect_client.create_flow_run(
run_name=f"{s3_key.split('/')[-1]} Processing Run",
version_group_id=PREFECT_VERSION_GROUP_ID,
parameters=dict(
bucket_name=bucket_name,
s3_key=s3_key,
),
run_config=KubernetesRun(memory_request=memory_request),
)
def get_memory_required(s3_key: str, s3_bucket: str):
default_memory_required = 100000
try:
memory_required = 0
bucket = s3_key["loc"].replace("s3://", "").split("/")[0]
key = s3_key["loc"].replace(f"s3://{bucket}/", "")
print(s3_bucket, key)
for key in s3_client.list_objects(Bucket=s3_bucket, Prefix=key)["Contents"]:
memory_required = memory_required + key["Size"]
return memory_required
except Exception as ex:
print(ex)
return default_memory_required
def run(event, context):
s3_bucket = event["Records"][0]["s3"]["bucket"]["name"]
s3_key = event["Records"][0]["s3"]["object"]["key"]
logger.info(f"Received message via object {s3_key} in bucket {s3_bucket}")
prefect_response = trigger_flow_run(
bucket_name=s3_bucket,
s3_key=s3_key,
memory_request=get_memory_required(s3_key=s3_key, s3_bucket=s3_bucket),
)
logger.info(f"Flow Run ID: {prefect_response}")
return {"success": True}
[tool.poetry]
name = "flowrun-lambda"
version = "0.1.0"
description = ""
authors = ["George Coyne <@gabcoyne>"]
[tool.poetry.dependencies]
python = ">=3.8,<3.10"
prefect = "^0.15.3"
[tool.poetry.dev-dependencies]
black = "^21.7b0"
boto3 = "^1.18.57"
[build-system]
requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"
flow-starter-lambda:$ sls deploy --stage dev --verbose
Running "serverless" from node_modules
Deploying flow-starter-lambda to stage dev (us-east-1)
Packaging
Generating requirements.txt from "pyproject.toml"
Parsed requirements.txt from pyproject.toml in /Users/gcoyne/src/personal/blogs/flow-starter-lambda/.serverless/requirements.txt
Installing requirements from "/Users/gcoyne/Library/Caches/serverless-python-requirements/3caa8622b77aad6da640d49705c6a65a8d4cd4b59ec3b6084e33747ba08c1dc7_x86_64_slspyc/requirements.txt"
Docker Image: lambci/lambda:build-python3.8
Using download cache directory /Users/gcoyne/Library/Caches/serverless-python-requirements/downloadCacheslspyc
Running docker run --rm -v /Users/gcoyne/Library/Caches/serverless-python-requirements/3caa8622b77aad6da640d49705c6a65a8d4cd4b59ec3b6084e33747ba08c1dc7_x86_64_slspyc\:/var/task\:z -v /Users/gcoyne/Library/Caches/serverless-python-requirements/downloadCacheslspyc\:/var/useDownloadCache\:z -u 0 lambci/lambda\:build-python3.8 python3.8 -m pip install -t /var/task/ -r /var/task/requirements.txt --cache-dir /var/useDownloadCache...
Excluding development dependencies for service package
Packaging Python Requirements Lambda Layer
Found cached Python Requirements Lambda Layer file
Retrieving CloudFormation stack
Creating CloudFormation stack
Creating new change set
Waiting for new change set to be created
Executing created change set
UPDATE_IN_PROGRESS - AWS::CloudFormation::Stack - flow-starter-lambda-dev
CREATE_IN_PROGRESS - AWS::Logs::LogGroup - FlowDashstarterLogGroup
CREATE_IN_PROGRESS - AWS::Lambda::LayerVersion - PythonRequirementsLambdaLayer
CREATE_IN_PROGRESS - AWS::IAM::Role - IamRoleLambdaExecution
CREATE_IN_PROGRESS - AWS::Logs::LogGroup - FlowDashstarterLogGroup
CREATE_COMPLETE - AWS::Logs::LogGroup - FlowDashstarterLogGroup
CREATE_IN_PROGRESS - AWS::IAM::Role - IamRoleLambdaExecution
CREATE_IN_PROGRESS - AWS::Lambda::LayerVersion - PythonRequirementsLambdaLayer
CREATE_COMPLETE - AWS::Lambda::LayerVersion - PythonRequirementsLambdaLayer
CREATE_COMPLETE - AWS::IAM::Role - IamRoleLambdaExecution
CREATE_IN_PROGRESS - AWS::Lambda::Function - FlowDashstarterLambdaFunction
CREATE_IN_PROGRESS - AWS::Lambda::Function - FlowDashstarterLambdaFunction
CREATE_COMPLETE - AWS::Lambda::Function - FlowDashstarterLambdaFunction
CREATE_IN_PROGRESS - AWS::Lambda::Version - FlowDashstarterLambdaVersiongXk3W8oi9HiW3uxM34a5ajB8lzssnmRgpXJv2AGsg
CREATE_IN_PROGRESS - AWS::Lambda::Permission - FlowDashstarterLambdaPermissionAbucketdevprefectioS3
CREATE_IN_PROGRESS - AWS::Lambda::Permission - FlowDashstarterLambdaPermissionAbucketdevprefectioS3
CREATE_IN_PROGRESS - AWS::Lambda::Version - FlowDashstarterLambdaVersiongXk3W8oi9HiW3uxM34a5ajB8lzssnmRgpXJv2AGsg
CREATE_COMPLETE - AWS::Lambda::Version - FlowDashstarterLambdaVersiongXk3W8oi9HiW3uxM34a5ajB8lzssnmRgpXJv2AGsg
CREATE_COMPLETE - AWS::Lambda::Permission - FlowDashstarterLambdaPermissionAbucketdevprefectioS3
CREATE_IN_PROGRESS - AWS::S3::Bucket - S3BucketAbucketdevprefectio
CREATE_IN_PROGRESS - AWS::S3::Bucket - S3BucketAbucketdevprefectio
CREATE_COMPLETE - AWS::S3::Bucket - S3BucketAbucketdevprefectio
UPDATE_COMPLETE_CLEANUP_IN_PROGRESS - AWS::CloudFormation::Stack - flow-starter-lambda-dev
UPDATE_COMPLETE - AWS::CloudFormation::Stack - flow-starter-lambda-dev
Retrieving CloudFormation stack
Removing old service artifacts from S3
✔ Service deployed to stack flow-starter-lambda-dev (160s)
functions:
flow-starter: flow-starter-lambda-dev-flow-starter (174 kB)
layers:
pythonRequirements: arn:aws:lambda:us-east-1:304383062342:layer:flow-starter-lambda-dev-python-requirements:4
Stack Outputs:
PythonRequirementsLambdaLayerS3Key: serverless/flow-starter-lambda/dev/1646683776199-2022-03-07T20:09:36.199Z/pythonRequirements.zip
FlowDashstarterLambdaFunctionQualifiedArn: arn:aws:lambda:us-east-1:304383062342:function:flow-starter-lambda-dev-flow-starter:4
PythonRequirementsLambdaLayerQualifiedArn: arn:aws:lambda:us-east-1:304383062342:layer:flow-starter-lambda-dev-python-requirements:4
PythonRequirementsLambdaLayerHash: be18630601aca7e9cd949744d5ac6b8a1037773c
ServerlessDeploymentBucketName: flow-starter-lambda-dev-serverlessdeploymentbucke-lb2eoir4130u
service: flow-starter-lambda
frameworkVersion: "3"
provider:
name: aws
runtime: python3.8
lambdaHashingVersion: 20201221
iamRoleStatements:
- Effect: Allow
Action:
- ssm:GetParameter
Resource:
Fn::Join:
- ""
- - "arn:aws:ssm:"
- Ref: AWS::Region
- ":"
- Ref: AWS::AccountId
- ":parameter/*"
- Effect: "Allow"
Action:
- "*"
Resource:
- arn:aws:s3:::${self:custom.messageBucket}/*
- arn:aws:s3:::${self:custom.messageBucket}
functions:
flow-starter:
handler: handler.run
layers:
- { Ref: PythonRequirementsLambdaLayer }
events:
- s3:
bucket: ${self:custom.messageBucket}
event: s3:ObjectCreated:*
rules:
- prefix: raw_files/
- suffix: .json
environment:
STAGE: ${opt:stage, 'dev'}
PREFECT_API_URL: https://api.prefect.io/graphql
PREFECT_VERSION_GROUP_ID: 1bd38827-3c57-417b-b310-a9fe34083b6a
plugins:
- serverless-python-requirements
custom:
pythonRequirements:
dockerizePip: non-linux
layer: true
messageBucket: a-bucket.${opt:stage, 'dev'}.prefect.io
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment