Skip to content

Instantly share code, notes, and snippets.

@salrashid123
Last active May 1, 2020 11:41
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save salrashid123/92adfda447f3c023efdaf6c320603aa1 to your computer and use it in GitHub Desktop.
Save salrashid123/92adfda447f3c023efdaf6c320603aa1 to your computer and use it in GitHub Desktop.
Cloud Run middelware for CloudEvents (https://github.com/gcpevents/eventsforcloudrun)
package main
import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
//"net/http/httputil"
"time"
cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/gorilla/mux"
"golang.org/x/net/http2"
logging "google.golang.org/api/logging/v2"
pubsub "google.golang.org/api/pubsub/v1"
// https://godoc.org/google.golang.org/api/servicecontrol/v1#AuditLog
svc "google.golang.org/api/servicecontrol/v1"
)
var ()
type key int
const (
ceType = "Ce-Type"
ceTime = "Ce-Time"
ceSpecversion = "Ce-Specversion"
ceSource = "Ce-Source"
ceId = "Ce-Id"
ceDataSchema = "Ce-Dataschema"
pubSubEventType = "com.google.cloud.pubsub.topic.publish"
auditLogEventType = "com.google.cloud.auditlog.event"
pubSubTimeLayout = "2006-01-02T15:04:05.000Z"
loggingTimeLayout = "2006-01-02T15:04:05.000000000Z"
)
type contextKey string
const contextEventKey contextKey = "event"
type wrappedPubSub struct {
Subscription string `json:"subscription"`
Message pubsub.PubsubMessage `json:"message"`
}
func eventsMiddleware(h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// requestDump, err := httputil.DumpRequest(r, true)
// if err != nil {
// http.Error(w, err.Error(), http.StatusInternalServerError)
// return
// }
// fmt.Println(string(requestDump))
b, err := ioutil.ReadAll(r.Body)
defer r.Body.Close()
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
event := cloudevents.NewEvent()
event.SetSource(r.Header.Get(ceSource))
event.SetType(r.Header.Get(ceType))
event.SetID(r.Header.Get(ceId))
event.SetSpecVersion(r.Header.Get(ceSpecversion))
var data interface{}
var t time.Time
switch r.Header.Get(ceType) {
case pubSubEventType:
// Time format: 2020-04-30T17:24:35.933Z
t, err = time.Parse(pubSubTimeLayout, r.Header.Get(ceTime))
if err != nil {
http.Error(w, "Unable to Parse Ce-Time header", http.StatusInternalServerError)
return
}
pubsubData := &wrappedPubSub{}
err = json.Unmarshal(b, pubsubData)
if err != nil {
http.Error(w, err.Error(), 500)
return
}
data = pubsubData
case auditLogEventType:
// Time format: 2020-04-30T18:54:29.404327175Z
t, err = time.Parse(loggingTimeLayout, r.Header.Get(ceTime))
if err != nil {
http.Error(w, "Unable to Parse Ce-Time header", http.StatusInternalServerError)
return
}
fmt.Printf("Data Schema: %s", r.Header.Get(ceDataSchema))
event.SetDataSchema(r.Header.Get(ceDataSchema))
loggingData := &logging.LogEntry{}
err = json.Unmarshal(b, loggingData)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
audit := &svc.AuditLog{}
err := json.Unmarshal(loggingData.ProtoPayload, audit)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
data = audit
default:
http.Error(w, err.Error(), 500)
return
}
event.SetTime(t)
err = event.SetData(cloudevents.ApplicationJSON, data)
if err != nil {
fmt.Printf("Error setting Data..%v", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
ctx := context.WithValue(r.Context(), contextEventKey, event)
h.ServeHTTP(w, r.WithContext(ctx))
})
}
func fronthandler(w http.ResponseWriter, r *http.Request) {
fmt.Println("/ called")
val := r.Context().Value(contextEventKey).(cloudevents.Event)
fmt.Printf("Got eventID: %x \n", val.ID())
fmt.Printf("Got eventType: %s\n", val.Type())
// you don't have to get an event type here,
// if you get only pubsub, just get/set that into the context
switch val.Type() {
case pubSubEventType:
pubsubData := &wrappedPubSub{}
if err := val.DataAs(pubsubData); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
decoded, err := base64.StdEncoding.DecodeString(pubsubData.Message.Data)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
fmt.Printf("Pubsub Message %s", decoded)
case auditLogEventType:
audit := &svc.AuditLog{}
if err := val.DataAs(audit); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
fmt.Printf("Audit ResourceName %s", audit.ResourceName)
default:
http.Error(w, "Unknown Event Type", 500)
return
}
fmt.Fprint(w, "ok")
}
func main() {
router := mux.NewRouter()
router.Methods(http.MethodPost).Path("/").HandlerFunc(fronthandler)
var server *http.Server
server = &http.Server{
Addr: ":8080",
Handler: eventsMiddleware(router),
}
http2.ConfigureServer(server, &http2.Server{})
fmt.Println("Starting Server..")
err := server.ListenAndServe()
fmt.Printf("Unable to start Server %v", err)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment