Last active
May 1, 2020 11:41
-
-
Save salrashid123/92adfda447f3c023efdaf6c320603aa1 to your computer and use it in GitHub Desktop.
Cloud Run middelware for CloudEvents (https://github.com/gcpevents/eventsforcloudrun)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"context" | |
"encoding/base64" | |
"encoding/json" | |
"fmt" | |
"io/ioutil" | |
"net/http" | |
//"net/http/httputil" | |
"time" | |
cloudevents "github.com/cloudevents/sdk-go/v2" | |
"github.com/gorilla/mux" | |
"golang.org/x/net/http2" | |
logging "google.golang.org/api/logging/v2" | |
pubsub "google.golang.org/api/pubsub/v1" | |
// https://godoc.org/google.golang.org/api/servicecontrol/v1#AuditLog | |
svc "google.golang.org/api/servicecontrol/v1" | |
) | |
var () | |
type key int | |
const ( | |
ceType = "Ce-Type" | |
ceTime = "Ce-Time" | |
ceSpecversion = "Ce-Specversion" | |
ceSource = "Ce-Source" | |
ceId = "Ce-Id" | |
ceDataSchema = "Ce-Dataschema" | |
pubSubEventType = "com.google.cloud.pubsub.topic.publish" | |
auditLogEventType = "com.google.cloud.auditlog.event" | |
pubSubTimeLayout = "2006-01-02T15:04:05.000Z" | |
loggingTimeLayout = "2006-01-02T15:04:05.000000000Z" | |
) | |
type contextKey string | |
const contextEventKey contextKey = "event" | |
type wrappedPubSub struct { | |
Subscription string `json:"subscription"` | |
Message pubsub.PubsubMessage `json:"message"` | |
} | |
func eventsMiddleware(h http.Handler) http.Handler { | |
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | |
// requestDump, err := httputil.DumpRequest(r, true) | |
// if err != nil { | |
// http.Error(w, err.Error(), http.StatusInternalServerError) | |
// return | |
// } | |
// fmt.Println(string(requestDump)) | |
b, err := ioutil.ReadAll(r.Body) | |
defer r.Body.Close() | |
if err != nil { | |
http.Error(w, err.Error(), http.StatusInternalServerError) | |
return | |
} | |
event := cloudevents.NewEvent() | |
event.SetSource(r.Header.Get(ceSource)) | |
event.SetType(r.Header.Get(ceType)) | |
event.SetID(r.Header.Get(ceId)) | |
event.SetSpecVersion(r.Header.Get(ceSpecversion)) | |
var data interface{} | |
var t time.Time | |
switch r.Header.Get(ceType) { | |
case pubSubEventType: | |
// Time format: 2020-04-30T17:24:35.933Z | |
t, err = time.Parse(pubSubTimeLayout, r.Header.Get(ceTime)) | |
if err != nil { | |
http.Error(w, "Unable to Parse Ce-Time header", http.StatusInternalServerError) | |
return | |
} | |
pubsubData := &wrappedPubSub{} | |
err = json.Unmarshal(b, pubsubData) | |
if err != nil { | |
http.Error(w, err.Error(), 500) | |
return | |
} | |
data = pubsubData | |
case auditLogEventType: | |
// Time format: 2020-04-30T18:54:29.404327175Z | |
t, err = time.Parse(loggingTimeLayout, r.Header.Get(ceTime)) | |
if err != nil { | |
http.Error(w, "Unable to Parse Ce-Time header", http.StatusInternalServerError) | |
return | |
} | |
fmt.Printf("Data Schema: %s", r.Header.Get(ceDataSchema)) | |
event.SetDataSchema(r.Header.Get(ceDataSchema)) | |
loggingData := &logging.LogEntry{} | |
err = json.Unmarshal(b, loggingData) | |
if err != nil { | |
http.Error(w, err.Error(), http.StatusInternalServerError) | |
return | |
} | |
audit := &svc.AuditLog{} | |
err := json.Unmarshal(loggingData.ProtoPayload, audit) | |
if err != nil { | |
http.Error(w, err.Error(), http.StatusInternalServerError) | |
return | |
} | |
data = audit | |
default: | |
http.Error(w, err.Error(), 500) | |
return | |
} | |
event.SetTime(t) | |
err = event.SetData(cloudevents.ApplicationJSON, data) | |
if err != nil { | |
fmt.Printf("Error setting Data..%v", err) | |
http.Error(w, err.Error(), http.StatusInternalServerError) | |
return | |
} | |
ctx := context.WithValue(r.Context(), contextEventKey, event) | |
h.ServeHTTP(w, r.WithContext(ctx)) | |
}) | |
} | |
func fronthandler(w http.ResponseWriter, r *http.Request) { | |
fmt.Println("/ called") | |
val := r.Context().Value(contextEventKey).(cloudevents.Event) | |
fmt.Printf("Got eventID: %x \n", val.ID()) | |
fmt.Printf("Got eventType: %s\n", val.Type()) | |
// you don't have to get an event type here, | |
// if you get only pubsub, just get/set that into the context | |
switch val.Type() { | |
case pubSubEventType: | |
pubsubData := &wrappedPubSub{} | |
if err := val.DataAs(pubsubData); err != nil { | |
http.Error(w, err.Error(), http.StatusInternalServerError) | |
return | |
} | |
decoded, err := base64.StdEncoding.DecodeString(pubsubData.Message.Data) | |
if err != nil { | |
http.Error(w, err.Error(), http.StatusInternalServerError) | |
return | |
} | |
fmt.Printf("Pubsub Message %s", decoded) | |
case auditLogEventType: | |
audit := &svc.AuditLog{} | |
if err := val.DataAs(audit); err != nil { | |
http.Error(w, err.Error(), http.StatusInternalServerError) | |
return | |
} | |
fmt.Printf("Audit ResourceName %s", audit.ResourceName) | |
default: | |
http.Error(w, "Unknown Event Type", 500) | |
return | |
} | |
fmt.Fprint(w, "ok") | |
} | |
func main() { | |
router := mux.NewRouter() | |
router.Methods(http.MethodPost).Path("/").HandlerFunc(fronthandler) | |
var server *http.Server | |
server = &http.Server{ | |
Addr: ":8080", | |
Handler: eventsMiddleware(router), | |
} | |
http2.ConfigureServer(server, &http2.Server{}) | |
fmt.Println("Starting Server..") | |
err := server.ListenAndServe() | |
fmt.Printf("Unable to start Server %v", err) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment