Skip to content

Instantly share code, notes, and snippets.

@diva-D
Created April 27, 2022 22:02
Show Gist options
  • Save diva-D/5445e03b207671c3878093984d237e0c to your computer and use it in GitHub Desktop.
Save diva-D/5445e03b207671c3878093984d237e0c to your computer and use it in GitHub Desktop.
This example also shows how to use it for the "Flow" framework to replace the automatically created database instance with the mock one
import os
import boto3
import pytest
from moto import mock_dynamodb2
from flow import Flow, FlowDynamoDB
flow = Flow(name="test", database=FlowDynamoDB())
def create_table():
# set up flow so that the database is the mocked version
# has to come in the decorated function so the mock is set up
flow = Flow(name="test", database=FlowDynamoDB())
# set up this new flow on the module
product_flow.flow = flow
# Get just the flow worker function
dynamodb = boto3.resource("dynamodb", region_name="ap-southeast-2")
table = dynamodb.create_table(
TableName=os.getenv("DYNAMODB_TABLE_NAME"),
KeySchema=[
{"AttributeName": "_id", "KeyType": "HASH"},
{"AttributeName": "_sort", "KeyType": "RANGE"},
],
AttributeDefinitions=[
{"AttributeName": "_id", "AttributeType": "S"},
{"AttributeName": "_sort", "AttributeType": "S"},
{"AttributeName": "_flowSortIndex", "AttributeType": "S"},
],
GlobalSecondaryIndexes=[
{
"IndexName": "_flowSortIndex",
"KeySchema": [
{"AttributeName": "_flowSortIndex", "KeyType": "HASH"},
],
"Projection": {"ProjectionType": "ALL"},
}
],
ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5},
)
return table
# this decorator makes sure we're mocking
@mock_dynamodb2
def test_dynamodb():
table = create_table()
...
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment