Skip to content

Instantly share code, notes, and snippets.

@lays147
Created February 3, 2022 23:44
Show Gist options
  • Save lays147/2db91c629bff39a567d3c3664237e45f to your computer and use it in GitHub Desktop.
Save lays147/2db91c629bff39a567d3c3664237e45f to your computer and use it in GitHub Desktop.
boto3 x aws sdk js
import { DynamoDB } from "aws-sdk"
let client = null
export const getClient = (): DynamoDB => {
if (client) return client
client = new DynamoDB({
httpOptions: {
connectTimeout: 1000,
timeout: 1000
}
})
return client
}
def dynamo_db_client(localhost: bool = False):
"""Return a DynamoDB Client."""
config = Config(
connect_timeout=60,
read_timeout=60,
retries={"total_max_attempts": 3},
)
return (
boto3.resource("dynamodb", config=config)
if not localhost
else boto3.resource(
"dynamodb",
config=config,
endpoint_url="http://localhost:8000",
)
)
import boto3
@pytest.fixture(scope="session")
def create_stepfunction():
name = str(uuid4())
with mock_iam():
name = str(uuid4())
role_client = boto3.client("iam")
document = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": ["sts:AssumeRole"],
"Resource": "*",
}
],
}
role = role_client.create_role(
RoleName=name,
AssumeRolePolicyDocument=json.dumps(document),
)
with mock_stepfunctions():
sfn_client = boto3.client("stepfunctions")
state = sfn_client.create_state_machine(
name=name,
definition=json.dumps({}),
roleArn=role["Role"]["Arn"],
)
yield
import json
import logging
from typing import Dict
from uuid import uuid4
import boto3
from botocore.exceptions import ClientError
logger = logging.getLogger(__name__)
def trigger_step_function(context: Dict, function_name: str) -> None:
"""
Trigger a step function
:param context: context to be passed to function
"""
sfn_client = boto3.client("stepfunctions")
try:
sfn_client.start_execution(
stateMachineArn=function_name,
input=json.dumps(context),
name=str(uuid4()),
)
except ClientError as err:
response = err.response.get("Error", None)
logger.error(response)
raise Exception()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment