Skip to content

Instantly share code, notes, and snippets.

@peacing
Last active August 19, 2020 04:25
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save peacing/ced3469e77a7a78f2fbc49f5fb45182a to your computer and use it in GitHub Desktop.
Save peacing/ced3469e77a7a78f2fbc49f5fb45182a to your computer and use it in GitHub Desktop.
import boto3
import json
import pytest
from moto import mock_dynamodb2
from lambda_function import *
@pytest.fixture
def test_sqs_event():
test_sqs_event = {
"Records": [{
"body": json.dumps({
"user_id": "B9B3022F98Fjvjs83AB8/80C185D8",
"amount": 10
"updated_at": "2020-03-03T00:50:47"
})
}]}
return test_sqs_event
class MockBotoSession:
class Session:
def client(self, service_name, region_name):
class Client:
def list_crawlers(self):
return {'CrawlerNames': ['crawler_1', 'crawler_2',]}
return Client()
@mock_dynamodb2
@pytest.fixture(autouse=True)
def set_up(monkeypatch):
dynamodb = boto3.resource('dynamodb', 'us-east-1')
table = dynamodb.create_table(
TableName='user_spending',
KeySchema=[{'AttributeName': 'user_id', 'KeyType': 'HASH'}],
AttributeDefinitions=[{'AttributeName': 'user_id','AttributeType': 'S'}],
ProvisionedThroughput={'ReadCapacityUnits': 5, 'WriteCapacityUnits': 5}
)
monkeypatch.setattr(boto3, 'session', MockBotoSession)
yield
def test_lambda_handler(test_sqs_event, set_up):
response = lambda_handler(event=test_sqs_event, context={})
table = dynamodb.Table(table_name)
response = table.get_item(
Key={'user_id': 'B9B3022F98Fjvjs83AB8a80C185D'}
)
item = response['Item']
assert item['amount'] == 10
assert 'updated_at' in item.keys()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment