Skip to content

Instantly share code, notes, and snippets.

@DavidDeCoding
Created August 16, 2022 19:34
Show Gist options
  • Save DavidDeCoding/24dfc77673ece32ff388c04b4a069c2f to your computer and use it in GitHub Desktop.
Save DavidDeCoding/24dfc77673ece32ff388c04b4a069c2f to your computer and use it in GitHub Desktop.
Whiteboarding - on_message.py
import boto3
import os
import json
WHITEBOARDING_TABLE = os.environ['WHITEBOARDING_TABLE']
db_client = boto3.resource('dynamodb')
table = db_client.Table(WHITEBOARDING_TABLE)
def handler(event, context):
print(f"Event: {event}, Context: {context}")
domain = event["requestContext"]["domainName"]
stage = event["requestContext"]["stage"]
endpoint_url = f"https://{domain}/{stage}"
client = boto3.client('apigatewaymanagementapi', endpoint_url=endpoint_url)
connection_id = event["requestContext"]["connectionId"]
body = json.loads(event["body"])
response = table.get_item(
Key={'whiteboardId': "123"}
)
print(response)
connections = []
if "Item" in response:
for user in response['Item']['users']:
if user == connection_id:
connections.append(user)
continue
try:
client.post_to_connection(
ConnectionId=user,
Data=body["message"]
)
connections.append(user)
except:
print(f"user update failed: {user}")
table.update_item(
Key={'whiteboardId': "123"},
ExpressionAttributeNames={'#users': 'users'},
ExpressionAttributeValues={
':users': connections
},
UpdateExpression="SET #users = :users"
)
return {"statusCode": 200}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment