Last active
January 26, 2020 13:48
-
-
Save praveen001/1b045d1c31cd9c72e4e6638e9f883f83 to your computer and use it in GitHub Desktop.
Serverless WebSockets with API Gateway and Golang Lambda
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Connect will receive the $connect request | |
// It will handle the authorization also | |
func Connect(request APIGatewayWebsocketProxyRequest) (interface{}, error) { | |
if request.RequestContext.Authorizer == nil { | |
return Authorizer(request) | |
} | |
id := request.RequestContext.Authorizer.(map[string]interface{})["cognito:username"].(string) | |
connectionID := request.RequestContext.ConnectionID | |
StoreSocket(id, connectionID) | |
return events.APIGatewayProxyResponse{ | |
StatusCode: 200, | |
}, nil | |
} | |
// StoreSocket will store the id,connectionid map in dynamodb | |
func StoreSocket(id, connectionID string) error { | |
m := models.UserSocket{ | |
ID: id, | |
ConnectionID: connectionID, | |
} | |
av, err := dynamodbattribute.MarshalMap(m) | |
if err != nil { | |
log.Fatalln("Unable to marshal user socket map", err.Error()) | |
} | |
input := &dynamodb.PutItemInput{ | |
TableName: aws.String("GoChatSockets"), | |
Item: av, | |
} | |
sess := GetSession() | |
db := dynamodb.New(sess) | |
_, err = db.PutItem(input) | |
if err != nil { | |
log.Fatal("INSERT ERROR", err.Error()) | |
} | |
return nil | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Disconnect will receive the $disconnect requests | |
func Disconnect(request APIGatewayWebsocketProxyRequest) (interface{}, error) { | |
id := request.RequestContext.Authorizer.(map[string]interface{})["cognito:username"].(string) | |
connectionID := request.RequestContext.ConnectionID | |
RemoveSocket(id, connectionID) | |
return events.APIGatewayProxyResponse{ | |
StatusCode: 200, | |
}, nil | |
} | |
// RemoveSocket will remove the id,connectionId socket from dynamodb | |
func RemoveSocket(id, connectionID string) { | |
input := &dynamodb.DeleteItemInput{ | |
TableName: aws.String("GoChatSockets"), | |
Key: map[string]*dynamodb.AttributeValue{ | |
"connectionId": &dynamodb.AttributeValue{ | |
S: aws.String(connectionID), | |
}, | |
"id": &dynamodb.AttributeValue{ | |
S: aws.String(id), | |
}, | |
}, | |
} | |
db := dynamodb.New(GetSession()) | |
_, err := db.DeleteItem(input) | |
if err != nil { | |
log.Fatalln("Unable to remove user socket map", err.Error()) | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Handler is the base handler that will receive all web socket request | |
func Handler(request APIGatewayWebsocketProxyRequest) (interface{}, error) { | |
switch request.RequestContext.RouteKey { | |
case "$connect": | |
return Connect(request) | |
case "$disconnect": | |
return Disconnect(request) | |
default: | |
return Default(request) | |
} | |
} | |
func main() { | |
lambda.Start(Handler) | |
} | |
// GetSession creates a new aws session and returns it | |
func GetSession() *session.Session { | |
sess, err := session.NewSession(&aws.Config{ | |
Region: aws.String("ap-south-1"), | |
}) | |
if err != nil { | |
log.Fatalln("Unable to create AWS session", err.Error()) | |
} | |
return sess | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
type MessageAction struct { | |
Type string `json:"type"` | |
Payload MessagePayload `json:"payload"` | |
} | |
// MessagePayload .. | |
type MessagePayload struct { | |
Message MessageWithInfo `json:"message"` | |
} | |
// MessageWithInfo .. | |
type MessageWithInfo struct { | |
To string `json:"to"` | |
From string `json:"from"` | |
Message interface{} `json:"message"` | |
} | |
// Default .. | |
func Default(request APIGatewayWebsocketProxyRequest) (interface{}, error) { | |
log.Println(request.Body) | |
b := MessageAction{} | |
log.Println(b, b.Payload, b.Payload.Message) | |
if err := json.NewDecoder(strings.NewReader(request.Body)).Decode(&b); err != nil { | |
log.Println("Unable to decode body", err.Error()) | |
} | |
data, _ := json.Marshal(b) | |
sess := GetSession() | |
db := dynamodb.New(sess) | |
queryInput := &dynamodb.QueryInput{ | |
TableName: aws.String("GoChatSockets"), | |
ExpressionAttributeNames: map[string]*string{ | |
"#id": aws.String("id"), | |
}, | |
ExpressionAttributeValues: map[string]*dynamodb.AttributeValue{ | |
":id": &dynamodb.AttributeValue{ | |
S: aws.String(b.Payload.Message.To), | |
}, | |
}, | |
KeyConditionExpression: aws.String("#id=:id"), | |
} | |
output, err := db.Query(queryInput) | |
if err != nil { | |
log.Println("Unable to find connection ID", err.Error()) | |
return nil, err | |
} | |
userSocks := make([]models.UserSocket, *output.Count) | |
dynamodbattribute.UnmarshalListOfMaps(output.Items, &userSocks) | |
for _, userSock := range userSocks { | |
input := &apigatewaymanagementapi.PostToConnectionInput{ | |
ConnectionId: aws.String(userSock.ConnectionID), | |
Data: data, | |
} | |
apigateway := apigatewaymanagementapi.New(sess, aws.NewConfig().WithEndpoint("21gooh1x39.execute-api.ap-south-1.amazonaws.com/GoChatWebSocketTest")) | |
_, err = apigateway.PostToConnection(input) | |
if err != nil { | |
log.Println("ERROR", err.Error()) | |
} | |
} | |
return events.APIGatewayProxyResponse{ | |
StatusCode: 200, | |
}, nil | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// GetSession creates a new aws session and returns it | |
func GetSession() *session.Session { | |
sess, err := session.NewSession(&aws.Config{ | |
Region: aws.String("ap-south-1"), | |
}) | |
if err != nil { | |
log.Fatalln("Unable to create AWS session", err.Error()) | |
} | |
return sess | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// APIGatewayWebsocketProxyRequest contains data coming from the API Gateway proxy | |
type APIGatewayWebsocketProxyRequest struct { | |
MethodArn string `json:"methodArn"` | |
Resource string `json:"resource"` // The resource path defined in API Gateway | |
Path string `json:"path"` // The url path for the caller | |
HTTPMethod string `json:"httpMethod"` | |
Headers map[string]string `json:"headers"` | |
MultiValueHeaders map[string][]string `json:"multiValueHeaders"` | |
QueryStringParameters map[string]string `json:"queryStringParameters"` | |
MultiValueQueryStringParameters map[string][]string `json:"multiValueQueryStringParameters"` | |
PathParameters map[string]string `json:"pathParameters"` | |
StageVariables map[string]string `json:"stageVariables"` | |
RequestContext APIGatewayWebsocketProxyRequestContext `json:"requestContext"` | |
Body string `json:"body"` | |
IsBase64Encoded bool `json:"isBase64Encoded,omitempty"` | |
} | |
// APIGatewayWebsocketProxyRequestContext contains the information to identify | |
// the AWS account and resources invoking the Lambda function. It also includes | |
// Cognito identity information for the caller. | |
type APIGatewayWebsocketProxyRequestContext struct { | |
AccountID string `json:"accountId"` | |
ResourceID string `json:"resourceId"` | |
Stage string `json:"stage"` | |
RequestID string `json:"requestId"` | |
Identity events.APIGatewayRequestIdentity `json:"identity"` | |
ResourcePath string `json:"resourcePath"` | |
Authorizer interface{} `json:"authorizer"` | |
HTTPMethod string `json:"httpMethod"` | |
APIID string `json:"apiId"` // The API Gateway rest API Id | |
ConnectedAt int64 `json:"connectedAt"` | |
ConnectionID string `json:"connectionId"` | |
DomainName string `json:"domainName"` | |
Error string `json:"error"` | |
EventType string `json:"eventType"` | |
ExtendedRequestId string `json:"extendedRequestId"` | |
IntegrationLatency string `json:"integrationLatency"` | |
MessageDirection string `json:"messageDirection"` | |
MessageID interface{} `json:"messageId"` | |
RequestTime string `json:"requestTime"` | |
RequestTimeEpoch int64 `json:"requestTimeEpoch"` | |
RouteKey string `json:"routeKey"` | |
Status string `json:"status"` | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment