Skip to content

Instantly share code, notes, and snippets.

@praveen001
Last active January 26, 2020 13:48
Show Gist options
  • Save praveen001/1b045d1c31cd9c72e4e6638e9f883f83 to your computer and use it in GitHub Desktop.
Save praveen001/1b045d1c31cd9c72e4e6638e9f883f83 to your computer and use it in GitHub Desktop.
Serverless WebSockets with API Gateway and Golang Lambda
// Authorizer custom api authorizer
func Authorizer(request APIGatewayWebsocketProxyRequest) (events.APIGatewayCustomAuthorizerResponse, error) {
token := request.QueryStringParameters["token"]
// Fetch all keys
jwkSet, err := jwk.Fetch("https://cognito-idp.ap-south-1.amazonaws.com/ap-south-1_vvx4f42sK/.well-known/jwks.json")
if err != nil {
log.Fatalln("Unable to fetch keys")
}
// Verify
t, err := jwt.Parse(token, func(t *jwt.Token) (interface{}, error) {
keys := jwkSet.LookupKeyID(t.Header["kid"].(string))
return keys[0].Materialize()
})
if err != nil || !t.Valid {
log.Fatalln("Unauthorized")
}
claims := t.Claims.(jwt.MapClaims)
return events.APIGatewayCustomAuthorizerResponse{
PrincipalID: "me",
PolicyDocument: events.APIGatewayCustomAuthorizerPolicy{
Version: "2012-10-17",
Statement: []events.IAMPolicyStatement{
events.IAMPolicyStatement{
Action: []string{"execute-api:*"},
Effect: "Allow",
Resource: []string{request.MethodArn},
},
},
},
Context: claims,
}, nil
}
// Connect will receive the $connect request
// It will handle the authorization also
func Connect(request APIGatewayWebsocketProxyRequest) (interface{}, error) {
if request.RequestContext.Authorizer == nil {
return Authorizer(request)
}
id := request.RequestContext.Authorizer.(map[string]interface{})["cognito:username"].(string)
connectionID := request.RequestContext.ConnectionID
StoreSocket(id, connectionID)
return events.APIGatewayProxyResponse{
StatusCode: 200,
}, nil
}
// StoreSocket will store the id,connectionid map in dynamodb
func StoreSocket(id, connectionID string) error {
m := models.UserSocket{
ID: id,
ConnectionID: connectionID,
}
av, err := dynamodbattribute.MarshalMap(m)
if err != nil {
log.Fatalln("Unable to marshal user socket map", err.Error())
}
input := &dynamodb.PutItemInput{
TableName: aws.String("GoChatSockets"),
Item: av,
}
sess := GetSession()
db := dynamodb.New(sess)
_, err = db.PutItem(input)
if err != nil {
log.Fatal("INSERT ERROR", err.Error())
}
return nil
}
// Disconnect will receive the $disconnect requests
func Disconnect(request APIGatewayWebsocketProxyRequest) (interface{}, error) {
id := request.RequestContext.Authorizer.(map[string]interface{})["cognito:username"].(string)
connectionID := request.RequestContext.ConnectionID
RemoveSocket(id, connectionID)
return events.APIGatewayProxyResponse{
StatusCode: 200,
}, nil
}
// RemoveSocket will remove the id,connectionId socket from dynamodb
func RemoveSocket(id, connectionID string) {
input := &dynamodb.DeleteItemInput{
TableName: aws.String("GoChatSockets"),
Key: map[string]*dynamodb.AttributeValue{
"connectionId": &dynamodb.AttributeValue{
S: aws.String(connectionID),
},
"id": &dynamodb.AttributeValue{
S: aws.String(id),
},
},
}
db := dynamodb.New(GetSession())
_, err := db.DeleteItem(input)
if err != nil {
log.Fatalln("Unable to remove user socket map", err.Error())
}
}
// Handler is the base handler that will receive all web socket request
func Handler(request APIGatewayWebsocketProxyRequest) (interface{}, error) {
switch request.RequestContext.RouteKey {
case "$connect":
return Connect(request)
case "$disconnect":
return Disconnect(request)
default:
return Default(request)
}
}
func main() {
lambda.Start(Handler)
}
// GetSession creates a new aws session and returns it
func GetSession() *session.Session {
sess, err := session.NewSession(&aws.Config{
Region: aws.String("ap-south-1"),
})
if err != nil {
log.Fatalln("Unable to create AWS session", err.Error())
}
return sess
}
type MessageAction struct {
Type string `json:"type"`
Payload MessagePayload `json:"payload"`
}
// MessagePayload ..
type MessagePayload struct {
Message MessageWithInfo `json:"message"`
}
// MessageWithInfo ..
type MessageWithInfo struct {
To string `json:"to"`
From string `json:"from"`
Message interface{} `json:"message"`
}
// Default ..
func Default(request APIGatewayWebsocketProxyRequest) (interface{}, error) {
log.Println(request.Body)
b := MessageAction{}
log.Println(b, b.Payload, b.Payload.Message)
if err := json.NewDecoder(strings.NewReader(request.Body)).Decode(&b); err != nil {
log.Println("Unable to decode body", err.Error())
}
data, _ := json.Marshal(b)
sess := GetSession()
db := dynamodb.New(sess)
queryInput := &dynamodb.QueryInput{
TableName: aws.String("GoChatSockets"),
ExpressionAttributeNames: map[string]*string{
"#id": aws.String("id"),
},
ExpressionAttributeValues: map[string]*dynamodb.AttributeValue{
":id": &dynamodb.AttributeValue{
S: aws.String(b.Payload.Message.To),
},
},
KeyConditionExpression: aws.String("#id=:id"),
}
output, err := db.Query(queryInput)
if err != nil {
log.Println("Unable to find connection ID", err.Error())
return nil, err
}
userSocks := make([]models.UserSocket, *output.Count)
dynamodbattribute.UnmarshalListOfMaps(output.Items, &userSocks)
for _, userSock := range userSocks {
input := &apigatewaymanagementapi.PostToConnectionInput{
ConnectionId: aws.String(userSock.ConnectionID),
Data: data,
}
apigateway := apigatewaymanagementapi.New(sess, aws.NewConfig().WithEndpoint("21gooh1x39.execute-api.ap-south-1.amazonaws.com/GoChatWebSocketTest"))
_, err = apigateway.PostToConnection(input)
if err != nil {
log.Println("ERROR", err.Error())
}
}
return events.APIGatewayProxyResponse{
StatusCode: 200,
}, nil
}
// GetSession creates a new aws session and returns it
func GetSession() *session.Session {
sess, err := session.NewSession(&aws.Config{
Region: aws.String("ap-south-1"),
})
if err != nil {
log.Fatalln("Unable to create AWS session", err.Error())
}
return sess
}
// APIGatewayWebsocketProxyRequest contains data coming from the API Gateway proxy
type APIGatewayWebsocketProxyRequest struct {
MethodArn string `json:"methodArn"`
Resource string `json:"resource"` // The resource path defined in API Gateway
Path string `json:"path"` // The url path for the caller
HTTPMethod string `json:"httpMethod"`
Headers map[string]string `json:"headers"`
MultiValueHeaders map[string][]string `json:"multiValueHeaders"`
QueryStringParameters map[string]string `json:"queryStringParameters"`
MultiValueQueryStringParameters map[string][]string `json:"multiValueQueryStringParameters"`
PathParameters map[string]string `json:"pathParameters"`
StageVariables map[string]string `json:"stageVariables"`
RequestContext APIGatewayWebsocketProxyRequestContext `json:"requestContext"`
Body string `json:"body"`
IsBase64Encoded bool `json:"isBase64Encoded,omitempty"`
}
// APIGatewayWebsocketProxyRequestContext contains the information to identify
// the AWS account and resources invoking the Lambda function. It also includes
// Cognito identity information for the caller.
type APIGatewayWebsocketProxyRequestContext struct {
AccountID string `json:"accountId"`
ResourceID string `json:"resourceId"`
Stage string `json:"stage"`
RequestID string `json:"requestId"`
Identity events.APIGatewayRequestIdentity `json:"identity"`
ResourcePath string `json:"resourcePath"`
Authorizer interface{} `json:"authorizer"`
HTTPMethod string `json:"httpMethod"`
APIID string `json:"apiId"` // The API Gateway rest API Id
ConnectedAt int64 `json:"connectedAt"`
ConnectionID string `json:"connectionId"`
DomainName string `json:"domainName"`
Error string `json:"error"`
EventType string `json:"eventType"`
ExtendedRequestId string `json:"extendedRequestId"`
IntegrationLatency string `json:"integrationLatency"`
MessageDirection string `json:"messageDirection"`
MessageID interface{} `json:"messageId"`
RequestTime string `json:"requestTime"`
RequestTimeEpoch int64 `json:"requestTimeEpoch"`
RouteKey string `json:"routeKey"`
Status string `json:"status"`
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment