Skip to content

Instantly share code, notes, and snippets.

@alyoshenka
Last active June 1, 2023 12:27
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save alyoshenka/eefe9ad7b53b275c895f0dbe696694ec to your computer and use it in GitHub Desktop.
Save alyoshenka/eefe9ad7b53b275c895f0dbe696694ec to your computer and use it in GitHub Desktop.
Project Neo AWS Resources
import json
import boto3
response_topic = 'cmd/neo/schedule/del/res'
client = boto3.client('iot-data', region_name=region)
step_function = boto3.client('stepfunctions', region_name=region)
dynamodb = boto3.client('dynamodb', region_name=region)
lambda_ = boto3.client('lambda', region_name=region)
def delete_dynamodb(scheduleID):
"""Deletes an item with the given ID from the ScheduledOperations table, returns success"""
try:
dynamodb.delete_item(TableName='ScheduledOperations', Key={'scheduleID': {'S': scheduleID}})
# Todo: how to know if it was successful?
return True
except Exception as err:
return str(err)
def delete_stepfunction(stepFunctionName):
"""Deletes a step function with the given name, returns success"""
try:
arn = f'arn:aws:states:{region}:{accountID}:execution:MQTTWaitThenSendBack:{stepFunctionName}'
step_function.stop_execution(executionArn=arn)
return True
except Exception as err:
return str(err)
def lambda_handler(event, context):
scheduleID = None
stepFunctionName = None
try:
scheduleID = event['scheduleID']
dyn = { "scheduleID": scheduleID, "success?": delete_dynamodb(scheduleID) }
except Exception as err:
dyn = { "scheduleID": scheduleID, "error": str(err) }
try:
stepFunctionName = event['stepFunctionName']
stp = { "stepFunctionName": stepFunctionName, "success?": delete_stepfunction(stepFunctionName) }
except Exception as err:
stp = { "stepFunctionName": stepFunctionName, "error": str(err) }
# Publish new Operations Database
try:
lambda_.invoke(FunctionName=f'arn:aws:lambda:{region}:{accountID}:function:PublishScheduledOperationsDBContents')
except Exception as err:
print('Error invoking push changes lambda:', err)
obj = { "DynamoDBDelete": dyn, "StepFunctionDelete": stp }
# Publish success/failure response
response = client.publish(
topic=response_topic,
qos=1,
payload=json.dumps(obj)
)
print(response)
return {
'statusCode': 200,
'body': json.dumps('Published to topic')
}
import json
import boto3
from datetime import datetime
from dateutil import parser
dynamodb = boto3.resource('dynamodb', region_name=region)
iot_data = boto3.client('iot-data', region_name=region)
def date_in_future(iso_str):
try:
datetime.fromisoformat(iso_str.replace('Z', '+00:00'))
now_str = datetime.utcnow().isoformat()[:-3] + 'Z'
print(now_str, '<', iso_str, '=', now_str < iso_str)
return now_str < iso_str
except Exception as err:
print('Invalid date:', iso_str, err)
return False
def lambda_handler(event, context):
table = dynamodb.Table('ScheduledOperations')
items = table.scan()['Items']
for item in items:
try:
if date_in_future(item['executeAt']):
try:
item['operation'] = json.loads(item['operation'])
except Exception as err:
print('Error loading json:', err)
else:
try:
items.remove(item)
table.delete_item(Key={
'scheduleID': item['scheduleID']
})
print('Deleted item')
except Exception as err:
print('Error deleting item:', err)
except Exception as err:
print(err)
iot_data.publish(
topic='dt/neo/scheduled/res',
qos=1,
payload=json.dumps({'scheduledOperations': items})
)
return {
'statusCode': 200,
'body': json.dumps('Hello from Lambda!')
}
import json
import boto3
client = boto3.client('iot-data', region_name=region)
def lambda_handler(event, context):
print('received event:', event)
try:
topic = event['publishTopic']
except Exception as err:
print('no "publishTopic" in event:', err)
topic='SendScheduledOperation'
try:
operation = event['operation']
response = client.publish(
topic=topic,
qos=1,
payload=json.dumps(operation)
)
except Exception as err:
print('Unable to parse operation JSON:', err)
response = client.publish(
topic='SendScheduledOperationError',
qos=1,
payload=json.dumps({"Error": str(err)})
)
return {
'statusCode': 200,
'body': json.dumps('PublishToMQTT was successful')
}
import json
import boto3
import uuid
from datetime import datetime
from dateutil import parser
iot_data = boto3.client('iot-data', region_name=region)
step_function = boto3.client('stepfunctions', region_name=region)
dynamodb = boto3.client('dynamodb', region_name=region)
lambda_ = boto3.client('lambda', region_name=region)
def date_in_future(iso_str):
try:
datetime.fromisoformat(iso_str.replace('Z', '+00:00'))
now_str = datetime.utcnow().isoformat()[:-3] + 'Z'
print(now_str, '<', iso_str, '=', now_str < iso_str)
return now_str < iso_str
except Exception as err:
print('Invalid date:', iso_str, err)
return False
def validate_payload(payload):
"""Validates that the received payload has all the required fields for the scheduler;
returns True(success)/False(failure) and a response message"""
response = { 'success?': False }
response['executeAt?'] = 'executeAt' in payload
response['responseTopic?'] = 'responseTopic' in payload
response['inFuture?'] = False
response['putItem?'] = False
response['stepFunction?'] = False
if 'executeAt' in payload:
response['executeAt?'] = True
if date_in_future(payload['executeAt']):
response['inFuture?'] = True
response['success?'] = True
return True, response
return False, response
def create_step_function_payload(event):
payload = {}
try:
payload['executeAt'] = event['executeAt']
payload['publishTopic'] = event['publishTopic']
payload['operation'] = event['operation']
print('Payload:', payload)
return payload
except Exception as err:
print('Error creating step function payload:', err)
return None
def try_get_operation_friendly_name(event):
if 'operation' not in event:
return 'NoOperation'
if 'friendlyName' not in event['operation']:
return 'NoFriendlyName'
try:
return (event['operation']['friendlyName']).replace(' ', '')
except Exception as err:
return 'BadFriendlyName'
def lambda_handler(event, context):
# receive the event
print('received event:', event)
success, response = validate_payload(event)
step_function_name = try_get_operation_friendly_name(event) + '_' + str(uuid.uuid1())
step_function_name = step_function_name.replace(':', '-')
if success:
# Add schedule to the database
try:
dynamodb.put_item(TableName='ScheduledOperations', Item={
'scheduleID':{'S': str(uuid.uuid1())},
'stepFunctionName':{'S': step_function_name},
'executeAt':{'S': event['executeAt']},
'publishTopic':{'S': event['publishTopic']},
'operation':{'S': json.dumps(event['operation'])},
})
response['putItem?'] = True
except Exception as err:
print('Error adding item:', err)
# Send updated database items
try:
lambda_.invoke(FunctionName=f'arn:aws:lambda:{region}:{accountID}:function:PublishScheduledOperationsDBContents')
except Exception as err:
print('Error invoking push changes lambda:', err)
# Start the step function
try:
payload = create_step_function_payload(event)
if payload is not None:
step_function.start_execution(
stateMachineArn=f'arn:aws:states:{region}:{accountID}:stateMachine:MQTTWaitThenSendBack',
name=step_function_name,
input=json.dumps(payload)
)
response['stepFunction?'] = True
except Exception as err:
print('Error starting step function:', err)
# Send a response
topic = event['responseTopic'] if 'responseTopic' in event else 'ReceiverLambdaResponseTopic'
iot_data.publish(
topic=topic,
qos=1,
payload=json.dumps(response)
)
else:
print('failure:', response)
# Send a failure response
topic = event['responseTopic'] if 'responseTopic' in event else 'ReceiveScheduledOperation'
iot_data.publish(
topic=topic,
qos=1,
payload=json.dumps(response)
)
return {
'statusCode': 200,
'body': json.dumps('ReceiveScheduleFunction was successful')
}
{
"Comment": "A description of my state machine",
"StartAt": "Wait Until Time",
"States": {
"Wait Until Time": {
"Type": "Wait",
"TimestampPath": "$.executeAt",
"Next": "Invoke PublishToMQTT"
},
"Invoke PublishToMQTT": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"OutputPath": "$.Payload",
"Parameters": {
"Payload.$": "$",
"FunctionName": "arn:aws:lambda:region:accountID:function:PublishToMQTT:$LATEST"
},
"Retry": [
{
"ErrorEquals": [
"Lambda.ServiceException",
"Lambda.AWSLambdaException",
"Lambda.SdkClientException",
"Lambda.TooManyRequestsException"
],
"IntervalSeconds": 2,
"MaxAttempts": 6,
"BackoffRate": 2
}
],
"End": true
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment