Last active
June 1, 2023 12:27
-
-
Save alyoshenka/eefe9ad7b53b275c895f0dbe696694ec to your computer and use it in GitHub Desktop.
Project Neo AWS Resources
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import boto3 | |
response_topic = 'cmd/neo/schedule/del/res' | |
client = boto3.client('iot-data', region_name=region) | |
step_function = boto3.client('stepfunctions', region_name=region) | |
dynamodb = boto3.client('dynamodb', region_name=region) | |
lambda_ = boto3.client('lambda', region_name=region) | |
def delete_dynamodb(scheduleID): | |
"""Deletes an item with the given ID from the ScheduledOperations table, returns success""" | |
try: | |
dynamodb.delete_item(TableName='ScheduledOperations', Key={'scheduleID': {'S': scheduleID}}) | |
# Todo: how to know if it was successful? | |
return True | |
except Exception as err: | |
return str(err) | |
def delete_stepfunction(stepFunctionName): | |
"""Deletes a step function with the given name, returns success""" | |
try: | |
arn = f'arn:aws:states:{region}:{accountID}:execution:MQTTWaitThenSendBack:{stepFunctionName}' | |
step_function.stop_execution(executionArn=arn) | |
return True | |
except Exception as err: | |
return str(err) | |
def lambda_handler(event, context): | |
scheduleID = None | |
stepFunctionName = None | |
try: | |
scheduleID = event['scheduleID'] | |
dyn = { "scheduleID": scheduleID, "success?": delete_dynamodb(scheduleID) } | |
except Exception as err: | |
dyn = { "scheduleID": scheduleID, "error": str(err) } | |
try: | |
stepFunctionName = event['stepFunctionName'] | |
stp = { "stepFunctionName": stepFunctionName, "success?": delete_stepfunction(stepFunctionName) } | |
except Exception as err: | |
stp = { "stepFunctionName": stepFunctionName, "error": str(err) } | |
# Publish new Operations Database | |
try: | |
lambda_.invoke(FunctionName=f'arn:aws:lambda:{region}:{accountID}:function:PublishScheduledOperationsDBContents') | |
except Exception as err: | |
print('Error invoking push changes lambda:', err) | |
obj = { "DynamoDBDelete": dyn, "StepFunctionDelete": stp } | |
# Publish success/failure response | |
response = client.publish( | |
topic=response_topic, | |
qos=1, | |
payload=json.dumps(obj) | |
) | |
print(response) | |
return { | |
'statusCode': 200, | |
'body': json.dumps('Published to topic') | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import boto3 | |
from datetime import datetime | |
from dateutil import parser | |
dynamodb = boto3.resource('dynamodb', region_name=region) | |
iot_data = boto3.client('iot-data', region_name=region) | |
def date_in_future(iso_str): | |
try: | |
datetime.fromisoformat(iso_str.replace('Z', '+00:00')) | |
now_str = datetime.utcnow().isoformat()[:-3] + 'Z' | |
print(now_str, '<', iso_str, '=', now_str < iso_str) | |
return now_str < iso_str | |
except Exception as err: | |
print('Invalid date:', iso_str, err) | |
return False | |
def lambda_handler(event, context): | |
table = dynamodb.Table('ScheduledOperations') | |
items = table.scan()['Items'] | |
for item in items: | |
try: | |
if date_in_future(item['executeAt']): | |
try: | |
item['operation'] = json.loads(item['operation']) | |
except Exception as err: | |
print('Error loading json:', err) | |
else: | |
try: | |
items.remove(item) | |
table.delete_item(Key={ | |
'scheduleID': item['scheduleID'] | |
}) | |
print('Deleted item') | |
except Exception as err: | |
print('Error deleting item:', err) | |
except Exception as err: | |
print(err) | |
iot_data.publish( | |
topic='dt/neo/scheduled/res', | |
qos=1, | |
payload=json.dumps({'scheduledOperations': items}) | |
) | |
return { | |
'statusCode': 200, | |
'body': json.dumps('Hello from Lambda!') | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import boto3 | |
client = boto3.client('iot-data', region_name=region) | |
def lambda_handler(event, context): | |
print('received event:', event) | |
try: | |
topic = event['publishTopic'] | |
except Exception as err: | |
print('no "publishTopic" in event:', err) | |
topic='SendScheduledOperation' | |
try: | |
operation = event['operation'] | |
response = client.publish( | |
topic=topic, | |
qos=1, | |
payload=json.dumps(operation) | |
) | |
except Exception as err: | |
print('Unable to parse operation JSON:', err) | |
response = client.publish( | |
topic='SendScheduledOperationError', | |
qos=1, | |
payload=json.dumps({"Error": str(err)}) | |
) | |
return { | |
'statusCode': 200, | |
'body': json.dumps('PublishToMQTT was successful') | |
} | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import boto3 | |
import uuid | |
from datetime import datetime | |
from dateutil import parser | |
iot_data = boto3.client('iot-data', region_name=region) | |
step_function = boto3.client('stepfunctions', region_name=region) | |
dynamodb = boto3.client('dynamodb', region_name=region) | |
lambda_ = boto3.client('lambda', region_name=region) | |
def date_in_future(iso_str): | |
try: | |
datetime.fromisoformat(iso_str.replace('Z', '+00:00')) | |
now_str = datetime.utcnow().isoformat()[:-3] + 'Z' | |
print(now_str, '<', iso_str, '=', now_str < iso_str) | |
return now_str < iso_str | |
except Exception as err: | |
print('Invalid date:', iso_str, err) | |
return False | |
def validate_payload(payload): | |
"""Validates that the received payload has all the required fields for the scheduler; | |
returns True(success)/False(failure) and a response message""" | |
response = { 'success?': False } | |
response['executeAt?'] = 'executeAt' in payload | |
response['responseTopic?'] = 'responseTopic' in payload | |
response['inFuture?'] = False | |
response['putItem?'] = False | |
response['stepFunction?'] = False | |
if 'executeAt' in payload: | |
response['executeAt?'] = True | |
if date_in_future(payload['executeAt']): | |
response['inFuture?'] = True | |
response['success?'] = True | |
return True, response | |
return False, response | |
def create_step_function_payload(event): | |
payload = {} | |
try: | |
payload['executeAt'] = event['executeAt'] | |
payload['publishTopic'] = event['publishTopic'] | |
payload['operation'] = event['operation'] | |
print('Payload:', payload) | |
return payload | |
except Exception as err: | |
print('Error creating step function payload:', err) | |
return None | |
def try_get_operation_friendly_name(event): | |
if 'operation' not in event: | |
return 'NoOperation' | |
if 'friendlyName' not in event['operation']: | |
return 'NoFriendlyName' | |
try: | |
return (event['operation']['friendlyName']).replace(' ', '') | |
except Exception as err: | |
return 'BadFriendlyName' | |
def lambda_handler(event, context): | |
# receive the event | |
print('received event:', event) | |
success, response = validate_payload(event) | |
step_function_name = try_get_operation_friendly_name(event) + '_' + str(uuid.uuid1()) | |
step_function_name = step_function_name.replace(':', '-') | |
if success: | |
# Add schedule to the database | |
try: | |
dynamodb.put_item(TableName='ScheduledOperations', Item={ | |
'scheduleID':{'S': str(uuid.uuid1())}, | |
'stepFunctionName':{'S': step_function_name}, | |
'executeAt':{'S': event['executeAt']}, | |
'publishTopic':{'S': event['publishTopic']}, | |
'operation':{'S': json.dumps(event['operation'])}, | |
}) | |
response['putItem?'] = True | |
except Exception as err: | |
print('Error adding item:', err) | |
# Send updated database items | |
try: | |
lambda_.invoke(FunctionName=f'arn:aws:lambda:{region}:{accountID}:function:PublishScheduledOperationsDBContents') | |
except Exception as err: | |
print('Error invoking push changes lambda:', err) | |
# Start the step function | |
try: | |
payload = create_step_function_payload(event) | |
if payload is not None: | |
step_function.start_execution( | |
stateMachineArn=f'arn:aws:states:{region}:{accountID}:stateMachine:MQTTWaitThenSendBack', | |
name=step_function_name, | |
input=json.dumps(payload) | |
) | |
response['stepFunction?'] = True | |
except Exception as err: | |
print('Error starting step function:', err) | |
# Send a response | |
topic = event['responseTopic'] if 'responseTopic' in event else 'ReceiverLambdaResponseTopic' | |
iot_data.publish( | |
topic=topic, | |
qos=1, | |
payload=json.dumps(response) | |
) | |
else: | |
print('failure:', response) | |
# Send a failure response | |
topic = event['responseTopic'] if 'responseTopic' in event else 'ReceiveScheduledOperation' | |
iot_data.publish( | |
topic=topic, | |
qos=1, | |
payload=json.dumps(response) | |
) | |
return { | |
'statusCode': 200, | |
'body': json.dumps('ReceiveScheduleFunction was successful') | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"Comment": "A description of my state machine", | |
"StartAt": "Wait Until Time", | |
"States": { | |
"Wait Until Time": { | |
"Type": "Wait", | |
"TimestampPath": "$.executeAt", | |
"Next": "Invoke PublishToMQTT" | |
}, | |
"Invoke PublishToMQTT": { | |
"Type": "Task", | |
"Resource": "arn:aws:states:::lambda:invoke", | |
"OutputPath": "$.Payload", | |
"Parameters": { | |
"Payload.$": "$", | |
"FunctionName": "arn:aws:lambda:region:accountID:function:PublishToMQTT:$LATEST" | |
}, | |
"Retry": [ | |
{ | |
"ErrorEquals": [ | |
"Lambda.ServiceException", | |
"Lambda.AWSLambdaException", | |
"Lambda.SdkClientException", | |
"Lambda.TooManyRequestsException" | |
], | |
"IntervalSeconds": 2, | |
"MaxAttempts": 6, | |
"BackoffRate": 2 | |
} | |
], | |
"End": true | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment