Skip to content

Instantly share code, notes, and snippets.

@garystafford
Last active March 29, 2023 13:40
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save garystafford/f23f1fa6747eb6b2ca5bd6918c96d07c to your computer and use it in GitHub Desktop.
Save garystafford/f23f1fa6747eb6b2ca5bd6918c96d07c to your computer and use it in GitHub Desktop.
Example of Python script created with the assistance of GitHub Copilot.
"""
Purpose: Creates an Amazon DynamoDB table, adds an item to the table,
gets that item from the table, and finally deletes the table
Author(s): Gary A. Stafford and GitHub Copilot
Created: 2023-03-26
Usage: python3 github_copilot_test.py table_name
pytest github_copilot_test.py -v
"""
import boto3
from botocore.exceptions import ClientError
import logging
from logging import StreamHandler, Formatter
import sys
import unittest
from moto import mock_dynamodb
class DynamoUtilities:
def __init__(self):
self.dynamodb = boto3.resource("dynamodb")
self.logger = logging.getLogger(__name__)
self.logger.setLevel(logging.INFO)
handler = StreamHandler(stream=sys.stdout)
handler.setFormatter(Formatter(fmt="[%(asctime)s: %(levelname)s] %(message)s"))
self.logger.addHandler(handler)
# function to create a new DynamoDB table with the name users, a partition key of username, a sort key of last_name, provisioned throughput of 5 RCUs and 5 WCUs, passes table name into function as a variable
def create_table(self, table_name):
"""Create a table in the database.
Args:
table_name: the name of the table to create.
Returns:
True if the table was created, False if it already exists.
"""
try:
table = self.dynamodb.create_table(
TableName=table_name,
KeySchema=[
{"AttributeName": "username", "KeyType": "HASH"},
{"AttributeName": "last_name", "KeyType": "RANGE"},
],
AttributeDefinitions=[
{"AttributeName": "username", "AttributeType": "S"},
{"AttributeName": "last_name", "AttributeType": "S"},
],
ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5},
)
waiter = self.dynamodb.meta.client.get_waiter("table_exists")
waiter.wait(TableName=table_name)
self.logger.info(f"Table {table_name} created")
return True
except self.dynamodb.meta.client.exceptions.ResourceInUseException:
self.logger.warning(f"Table {table_name} already exists")
return False
except ClientError as e:
self.logger.error(e)
return False
# function to delete a DynamoDB table, passes table name into function as a variable
def delete_table(self, table_name):
"""Deletes the table with the given name.
Args:
table_name: The name of the table to delete.
Returns:
True if the table was deleted, False if the table was not found.
"""
try:
table = self.dynamodb.Table(table_name)
table.delete()
waiter = self.dynamodb.meta.client.get_waiter("table_not_exists")
waiter.wait(TableName=table_name)
self.logger.info(f"Table {table_name} deleted")
return True
except self.dynamodb.meta.client.exceptions.ResourceNotFoundException:
self.logger.warning(f"Table {table_name} not found")
return False
except ClientError as e:
self.logger.error(e)
return False
# function to add a new item to a DynamoDB table, passes table name and item into function as variables
def add_item(self, table_name, item):
"""Add an item to a table.
Args:
table_name: The name of the table.
item: The item to add.
Returns:
The response from the database, None if the table was not found.
"""
try:
table = self.dynamodb.Table(table_name)
response = table.put_item(Item=item)
self.logger.info(f"Added item: {item} to Table {table_name}")
return response
except self.dynamodb.meta.client.exceptions.ResourceNotFoundException:
self.logger.error(f"Table {table_name} not found")
return None
except ClientError as e:
self.logger.error(e)
return None
# function to get an item from a DynamoDB table, passes table name and key into function as variables
def get_item(self, table_name, key):
"""Return the value of the given key in a DynamoDB table.
Args:
table_name: The name of the DynamoDB table.
key: The key to retrieve.
Returns:
The value associated with the given key, None if the table was not found.
"""
try:
table = self.dynamodb.Table(table_name)
response = table.get_item(Key=key)
self.logger.info(f"Got item: {response['Item']} from Table {table_name}")
return response["Item"]
except self.dynamodb.meta.client.exceptions.ResourceNotFoundException:
self.logger.error(f"Table {table_name} not found")
return None
except ClientError as e:
self.logger.error(e)
return None
class DynamoUtilitiesTests(unittest.TestCase):
def setUp(self):
self.dynamo_utilities = DynamoUtilities()
self.table_name = "users"
@mock_dynamodb
def test_create_table(self):
self.dynamo_utilities.create_table(self.table_name)
dynamodb = boto3.client("dynamodb")
table_list = dynamodb.list_tables()["TableNames"]
self.assertIn(self.table_name, table_list)
@mock_dynamodb
def test_add_item(self):
self.dynamo_utilities.create_table(self.table_name)
item = {
"username": "test_user",
"last_name": "test_last_name",
"first_name": "test_first_name",
"email": "test_email",
"age": 20,
"account_type": "standard_user",
"last_login": "2020-01-01 00:00:00",
"active": True,
}
self.dynamo_utilities.add_item(self.table_name, item)
key = {"username": "test_user", "last_name": "test_last_name"}
response = self.dynamo_utilities.get_item(self.table_name, key)
self.assertNotEqual(response, None)
@mock_dynamodb
def test_delete_table(self):
dynamodb = boto3.client("dynamodb")
dynamodb.create_table(
TableName=self.table_name,
KeySchema=[
{"AttributeName": "username", "KeyType": "HASH"},
{"AttributeName": "last_name", "KeyType": "RANGE"},
],
AttributeDefinitions=[
{"AttributeName": "username", "AttributeType": "S"},
{"AttributeName": "last_name", "AttributeType": "S"},
],
ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5},
)
self.assertIn(self.table_name, dynamodb.list_tables()["TableNames"])
self.dynamo_utilities.delete_table(self.table_name)
self.assertNotIn(self.table_name, dynamodb.list_tables()["TableNames"])
def main():
table_name = sys.argv[1] if len(sys.argv) > 1 else "users"
logging.info(f"Table name: {table_name}")
dynamo_utilities = DynamoUtilities()
# create a new table
dynamo_utilities.create_table(table_name)
# add a new item to the table we created
item = {
"username": "janedoe",
"first_name": "Jane",
"last_name": "Doe",
"email": "jdoe@example.com",
"age": 20,
"account_type": "standard_user",
"last_login": "2020-01-01 00:00:00",
"active": True,
}
dynamo_utilities.add_item(table_name, item)
# get the item we just added to the table
key = {"username": "janedoe", "last_name": "Doe"}
dynamo_utilities.get_item(table_name, key)
# delete the table we created
dynamo_utilities.delete_table(table_name)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment