Last active
March 7, 2022 20:35
-
-
Save gabcoyne/6f6265b863a8f41a04a1ec2309f52ef8 to your computer and use it in GitHub Desktop.
Flow Starter Lambda
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import logging | |
import os | |
import typing | |
from datetime import datetime | |
import boto3 | |
import prefect | |
from prefect.run_configs.kubernetes import KubernetesRun | |
logging.basicConfig() | |
logger = logging.getLogger() | |
logger.setLevel(logging.INFO) | |
s3_client = boto3.client("s3") | |
ssm_client = boto3.client("ssm", "us-east-1") | |
def decrypt_parameter(parameter_name: str): | |
parameter = ssm_client.get_parameter(Name=parameter_name, WithDecryption=True) | |
return parameter["Parameter"]["Value"] | |
PREFECT_AUTH_TOKEN = decrypt_parameter(f"/{os.getenv('STAGE')}/prefect/token") | |
PREFECT_VERSION_GROUP_ID = decrypt_parameter( | |
f"/{os.getenv('STAGE')}/prefect/my_important_flow/version_group_id" | |
) | |
prefect_client = prefect.Client(api_key=PREFECT_AUTH_TOKEN) | |
def trigger_flow_run( | |
bucket_name: str, | |
s3_key: list, | |
memory_request: int, | |
) -> dict: | |
return prefect_client.create_flow_run( | |
run_name=f"{s3_key.split('/')[-1]} Processing Run", | |
version_group_id=PREFECT_VERSION_GROUP_ID, | |
parameters=dict( | |
bucket_name=bucket_name, | |
s3_key=s3_key, | |
), | |
run_config=KubernetesRun(memory_request=memory_request), | |
) | |
def get_memory_required(s3_key: str, s3_bucket: str): | |
default_memory_required = 100000 | |
try: | |
memory_required = 0 | |
bucket = s3_key["loc"].replace("s3://", "").split("/")[0] | |
key = s3_key["loc"].replace(f"s3://{bucket}/", "") | |
print(s3_bucket, key) | |
for key in s3_client.list_objects(Bucket=s3_bucket, Prefix=key)["Contents"]: | |
memory_required = memory_required + key["Size"] | |
return memory_required | |
except Exception as ex: | |
print(ex) | |
return default_memory_required | |
def run(event, context): | |
s3_bucket = event["Records"][0]["s3"]["bucket"]["name"] | |
s3_key = event["Records"][0]["s3"]["object"]["key"] | |
logger.info(f"Received message via object {s3_key} in bucket {s3_bucket}") | |
prefect_response = trigger_flow_run( | |
bucket_name=s3_bucket, | |
s3_key=s3_key, | |
memory_request=get_memory_required(s3_key=s3_key, s3_bucket=s3_bucket), | |
) | |
logger.info(f"Flow Run ID: {prefect_response}") | |
return {"success": True} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[tool.poetry] | |
name = "flowrun-lambda" | |
version = "0.1.0" | |
description = "" | |
authors = ["George Coyne <@gabcoyne>"] | |
[tool.poetry.dependencies] | |
python = ">=3.8,<3.10" | |
prefect = "^0.15.3" | |
[tool.poetry.dev-dependencies] | |
black = "^21.7b0" | |
boto3 = "^1.18.57" | |
[build-system] | |
requires = ["poetry-core>=1.0.0"] | |
build-backend = "poetry.core.masonry.api" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
flow-starter-lambda:$ sls deploy --stage dev --verbose | |
Running "serverless" from node_modules | |
Deploying flow-starter-lambda to stage dev (us-east-1) | |
Packaging | |
Generating requirements.txt from "pyproject.toml" | |
Parsed requirements.txt from pyproject.toml in /Users/gcoyne/src/personal/blogs/flow-starter-lambda/.serverless/requirements.txt | |
Installing requirements from "/Users/gcoyne/Library/Caches/serverless-python-requirements/3caa8622b77aad6da640d49705c6a65a8d4cd4b59ec3b6084e33747ba08c1dc7_x86_64_slspyc/requirements.txt" | |
Docker Image: lambci/lambda:build-python3.8 | |
Using download cache directory /Users/gcoyne/Library/Caches/serverless-python-requirements/downloadCacheslspyc | |
Running docker run --rm -v /Users/gcoyne/Library/Caches/serverless-python-requirements/3caa8622b77aad6da640d49705c6a65a8d4cd4b59ec3b6084e33747ba08c1dc7_x86_64_slspyc\:/var/task\:z -v /Users/gcoyne/Library/Caches/serverless-python-requirements/downloadCacheslspyc\:/var/useDownloadCache\:z -u 0 lambci/lambda\:build-python3.8 python3.8 -m pip install -t /var/task/ -r /var/task/requirements.txt --cache-dir /var/useDownloadCache... | |
Excluding development dependencies for service package | |
Packaging Python Requirements Lambda Layer | |
Found cached Python Requirements Lambda Layer file | |
Retrieving CloudFormation stack | |
Creating CloudFormation stack | |
Creating new change set | |
Waiting for new change set to be created | |
Executing created change set | |
UPDATE_IN_PROGRESS - AWS::CloudFormation::Stack - flow-starter-lambda-dev | |
CREATE_IN_PROGRESS - AWS::Logs::LogGroup - FlowDashstarterLogGroup | |
CREATE_IN_PROGRESS - AWS::Lambda::LayerVersion - PythonRequirementsLambdaLayer | |
CREATE_IN_PROGRESS - AWS::IAM::Role - IamRoleLambdaExecution | |
CREATE_IN_PROGRESS - AWS::Logs::LogGroup - FlowDashstarterLogGroup | |
CREATE_COMPLETE - AWS::Logs::LogGroup - FlowDashstarterLogGroup | |
CREATE_IN_PROGRESS - AWS::IAM::Role - IamRoleLambdaExecution | |
CREATE_IN_PROGRESS - AWS::Lambda::LayerVersion - PythonRequirementsLambdaLayer | |
CREATE_COMPLETE - AWS::Lambda::LayerVersion - PythonRequirementsLambdaLayer | |
CREATE_COMPLETE - AWS::IAM::Role - IamRoleLambdaExecution | |
CREATE_IN_PROGRESS - AWS::Lambda::Function - FlowDashstarterLambdaFunction | |
CREATE_IN_PROGRESS - AWS::Lambda::Function - FlowDashstarterLambdaFunction | |
CREATE_COMPLETE - AWS::Lambda::Function - FlowDashstarterLambdaFunction | |
CREATE_IN_PROGRESS - AWS::Lambda::Version - FlowDashstarterLambdaVersiongXk3W8oi9HiW3uxM34a5ajB8lzssnmRgpXJv2AGsg | |
CREATE_IN_PROGRESS - AWS::Lambda::Permission - FlowDashstarterLambdaPermissionAbucketdevprefectioS3 | |
CREATE_IN_PROGRESS - AWS::Lambda::Permission - FlowDashstarterLambdaPermissionAbucketdevprefectioS3 | |
CREATE_IN_PROGRESS - AWS::Lambda::Version - FlowDashstarterLambdaVersiongXk3W8oi9HiW3uxM34a5ajB8lzssnmRgpXJv2AGsg | |
CREATE_COMPLETE - AWS::Lambda::Version - FlowDashstarterLambdaVersiongXk3W8oi9HiW3uxM34a5ajB8lzssnmRgpXJv2AGsg | |
CREATE_COMPLETE - AWS::Lambda::Permission - FlowDashstarterLambdaPermissionAbucketdevprefectioS3 | |
CREATE_IN_PROGRESS - AWS::S3::Bucket - S3BucketAbucketdevprefectio | |
CREATE_IN_PROGRESS - AWS::S3::Bucket - S3BucketAbucketdevprefectio | |
CREATE_COMPLETE - AWS::S3::Bucket - S3BucketAbucketdevprefectio | |
UPDATE_COMPLETE_CLEANUP_IN_PROGRESS - AWS::CloudFormation::Stack - flow-starter-lambda-dev | |
UPDATE_COMPLETE - AWS::CloudFormation::Stack - flow-starter-lambda-dev | |
Retrieving CloudFormation stack | |
Removing old service artifacts from S3 | |
✔ Service deployed to stack flow-starter-lambda-dev (160s) | |
functions: | |
flow-starter: flow-starter-lambda-dev-flow-starter (174 kB) | |
layers: | |
pythonRequirements: arn:aws:lambda:us-east-1:304383062342:layer:flow-starter-lambda-dev-python-requirements:4 | |
Stack Outputs: | |
PythonRequirementsLambdaLayerS3Key: serverless/flow-starter-lambda/dev/1646683776199-2022-03-07T20:09:36.199Z/pythonRequirements.zip | |
FlowDashstarterLambdaFunctionQualifiedArn: arn:aws:lambda:us-east-1:304383062342:function:flow-starter-lambda-dev-flow-starter:4 | |
PythonRequirementsLambdaLayerQualifiedArn: arn:aws:lambda:us-east-1:304383062342:layer:flow-starter-lambda-dev-python-requirements:4 | |
PythonRequirementsLambdaLayerHash: be18630601aca7e9cd949744d5ac6b8a1037773c | |
ServerlessDeploymentBucketName: flow-starter-lambda-dev-serverlessdeploymentbucke-lb2eoir4130u |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
service: flow-starter-lambda | |
frameworkVersion: "3" | |
provider: | |
name: aws | |
runtime: python3.8 | |
lambdaHashingVersion: 20201221 | |
iamRoleStatements: | |
- Effect: Allow | |
Action: | |
- ssm:GetParameter | |
Resource: | |
Fn::Join: | |
- "" | |
- - "arn:aws:ssm:" | |
- Ref: AWS::Region | |
- ":" | |
- Ref: AWS::AccountId | |
- ":parameter/*" | |
- Effect: "Allow" | |
Action: | |
- "*" | |
Resource: | |
- arn:aws:s3:::${self:custom.messageBucket}/* | |
- arn:aws:s3:::${self:custom.messageBucket} | |
functions: | |
flow-starter: | |
handler: handler.run | |
layers: | |
- { Ref: PythonRequirementsLambdaLayer } | |
events: | |
- s3: | |
bucket: ${self:custom.messageBucket} | |
event: s3:ObjectCreated:* | |
rules: | |
- prefix: raw_files/ | |
- suffix: .json | |
environment: | |
STAGE: ${opt:stage, 'dev'} | |
PREFECT_API_URL: https://api.prefect.io/graphql | |
PREFECT_VERSION_GROUP_ID: 1bd38827-3c57-417b-b310-a9fe34083b6a | |
plugins: | |
- serverless-python-requirements | |
custom: | |
pythonRequirements: | |
dockerizePip: non-linux | |
layer: true | |
messageBucket: a-bucket.${opt:stage, 'dev'}.prefect.io |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment