Skip to content

Instantly share code, notes, and snippets.

@lawrencejones
Last active July 17, 2024 23:14
Show Gist options
  • Save lawrencejones/7e6f07a6838c49aff7dc97171cf9e45c to your computer and use it in GitHub Desktop.
Save lawrencejones/7e6f07a6838c49aff7dc97171cf9e45c to your computer and use it in GitHub Desktop.
Example event processing code taken from incident-io

eventadapter

Shared in advance of a talk to Netflix's engineering team as part of their Architecture Discussion Forum, this is an excerpt of code from the incident-io Go monolith that is used to power our async workers.

It depends on internal libraries (such as our o11y and error packages) but is a good starting point for anyone who wanted to build something similar.

Usage

You define an event in a common event package such as:

// pkg/event/alert_handle_event.go
package event

import (
    "github.com/incident-io/core/server/pkg/domain"
)

func init() {
    registerEvent(AlertHandleEvent{})
}

// codegen:matcher,event-metadata
type AlertHandleEvent struct {
    OrganisationID      string                 `json:"organisation_id"`
    Actor               *domain.ActorReference `json:"actor"`
    AlertSourceConfigID string                 `json:"alert_source_config_id"`
    IdempotencyKey      string                 `json:"idempotency_key"`
    Payload             string                 `json:"payload"`
}

func (e AlertHandleEvent) Name() string {
    return "alert.handle-event"
}

func (e AlertHandleEvent) Description() string {
    return "Asynchronously handle an event."
}

You can then publish events like so:

// Publishing in response to an incoming alert event.
messageID, err := eventadapter.Publish(ctx, db, &event.AlertHandleEvent{
    OrganisationID:      identity.Organisation.ID,
    Actor:               &identity.Actor.ActorReference,
    AlertSourceConfigID: sourceConfig.ID,
    IdempotencyKey:      idempotencyKey,
    Payload:             string(encodedPayload),
})
if err != nil {
    return "", errors.Wrap(ctx, err, "publishing message")
}

And register a subscriber that processes these events in whichever package is intended to handle them:

// Take the events enqueued asynchronously and feed them into HandleEvent.
func init() {
    subscribe(
        func(ctx context.Context, db *gorm.DB, ev *event.AlertHandleEvent, meta eventadapter.EventMetadata) error {
            return handleEventFromAsync(ctx, db, ev, meta)
        },
        eventadapter.SubscribeParams{
            SubscriberID:   "handle-events",
            ErrorUrgency:   errors.UrgencyPage,
            StaleThreshold: eventadapter.ReconcileThreshold,
        },
    )
}
package eventadapter
import (
"fmt"
"runtime"
"strings"
)
type (
ctxKey string
)
// SubscriptionID returns a unique ID for the given topic and subscriber ID. It is what we
// name the subscriptions in Google.
func SubscriptionID(topicName, subscriberID string) string {
return fmt.Sprintf("%s---%s", topicName, subscriberID)
}
// getCallerPackageName returns the package name of the caller of the function that called
// our caller.
func getCallerPackageName() string {
for back := 0; back < 10; back++ {
pc, _, _, _ := runtime.Caller(back)
// e.g. "github.com/incident-io/core/server/app/escalator/executor.(*Executor).Run"
packageName := runtime.FuncForPC(pc).Name()
// Check we're in our own code.
if !strings.Contains(packageName, "github.com/incident-io/core/server/") {
continue
}
// e.g. "app/escalator/executor.(*Executor).Run"
packageName = strings.TrimPrefix(packageName, "github.com/incident-io/core/server/")
// e.g. "app/escalator/executor"
packageName = strings.Split(packageName, ".")[0]
if strings.HasPrefix(packageName, "lib/database") ||
strings.HasPrefix(packageName, "pkg/event/eventadapter") {
continue
}
return packageName
}
return "unknown"
}
package eventadapter
import (
"context"
"github.com/incident-io/core/server/lib/md"
"github.com/incident-io/core/server/lib/safe"
"gorm.io/gorm"
)
var (
batchKey ctxKey = "event.batchKey"
batchNestedKey ctxKey = "event.batchNestedKey"
)
type Batch struct {
events []Eventer
}
// Add puts an event into the batch.
func (b *Batch) Add(ev Eventer) {
b.events = append(b.events, ev)
}
// Flush attempts to publish each event stored in the batch buffer.
func (b *Batch) Flush(ctx context.Context, db *gorm.DB) error {
ctx, span := md.StartSpan(ctx, "Batch.Flush")
defer span.End()
// This is true when this context was created as a subsequent call on an already batched
// context, in which case we should wait until we get to the original batch to publish
// the events.
if getBatchNested(ctx) {
return nil // do nothing, as we're nested in a batch
}
// Capture the package we're publishing from now before we go into a goroutine,
// otherwise we won't be able to determine it.
publishPackage := getCallerPackageName()
// Remove the batch otherwise we'll just re-batch the events.
ctx = context.WithValue(ctx, batchKey, nil)
p := safe.NewPool(ctx)
for len(b.events) > 0 {
var ev Eventer
ev, b.events = b.events[0], b.events[1:] // unshift each event until we're empty
p.Go(ctx, func(ctx context.Context) error {
_, err := publish(ctx, db, ev, publishPackage)
if err != nil {
return err
}
return nil
})
}
return p.Wait()
}
// BatchPublish changes the behaviour of the Pub/Sub eventer so any events published
// against the returned context will be buffered, and will require manually flushing the
// buffer when appropriate.
func BatchPublish(ctx context.Context) (context.Context, *Batch) {
batch := GetBatch(ctx)
if batch != nil {
return context.WithValue(ctx, batchNestedKey, true), batch
}
// There is no batch already, we must create one.
batch = &Batch{
events: []Eventer{},
}
return context.WithValue(ctx, batchKey, batch), batch
}
func GetBatch(ctx context.Context) *Batch {
batch, ok := ctx.Value(batchKey).(*Batch)
if ok {
return batch
}
return nil
}
// If you wish to publish an event outside of a batch, you can use this function to
// get a context without any batching.
func Unbatch(ctx context.Context) context.Context {
return context.WithValue(ctx, batchKey, nil)
}
func getBatchNested(ctx context.Context) bool {
batchNested, ok := ctx.Value(batchNestedKey).(bool)
if !ok {
return false
}
return batchNested
}
package eventadapter
import (
"cloud.google.com/go/pubsub"
"github.com/incident-io/core/server/lib/md"
)
// Eventer is the interface implemented by all events.
type Eventer interface {
// Name is how we identify this type of event.
//
// Prefer hyphens, to lowercase. Use noun.verb. For example, prefer application.start,
// to application_started. Prefer incident.lead-updated to incident_lead_update.
Name() string
// A description of what the event means. This is used internally to understand better
// what the event is and does.
Description() string
// Validate validates the fields of the event before publishing.
Validate() error
// GetOrganisationID returns the organisation ID associated with the event, which we use
// to add to event telemetry.
GetOrganisationID() string
}
// EventerWithMetadata is for those events that want to decorate their telemetry with
// additional metadata.
type EventerWithMetadata interface {
Eventer
// GetMetadata returns the metadata to be added to the event logs and traces.
GetMetadata() md.Metadata
}
// EventerWithPublishSettings is for those events that want to override the default
// publish settings.
type EventerWithPublishSettings interface {
Eventer
// ApplyPublishSettings is called with a default publish settings struct to modify those
// settings before being passed to the Pub/Sub client.
ApplyPublishSettings(settings *pubsub.PublishSettings)
}
// EventerScheduled is for events that schedule themselves in the future rather than
// running immediately.
type EventerSchedulable interface {
Eventer
// This defines an idempotency key, which can be used to deduplicate enqueue operations
TaskName() string
}
package eventadapter
import (
"context"
"encoding/json"
"sync"
"time"
"cloud.google.com/go/pubsub"
"github.com/incident-io/core/server/lib/errors"
"github.com/incident-io/core/server/lib/log"
"github.com/incident-io/core/server/lib/md"
"gorm.io/gorm"
)
// GooglePubsub provides the publishing implementation of our event package, powered by a
// combination of Pub/Sub for immediate delivery and Google Cloud Scheduler for delayed
// execution.
type GooglePubsub struct {
// client is the Pub/Sub client that we use to publish events and is shared globally
// across the app, unlike our per-subscription clients.
//
// The rationale is that a single client for publishing should suffice as we can batch
// and multiplex publish requests.
client *pubsub.Client
// A cache of topic name to topics. This avoids us thrashing gRPC by creating new
// connection pools every time we want to publish.
topics map[string]*pubsub.Topic
sync.RWMutex
}
// NewGooglePubsub returns a new GooglePublisher, which is used to publish events using
// Google Cloud infrastructure (Pub/Sub + Cloud Scheduler).
func NewGooglePubsub(ctx context.Context) (*GooglePubsub, error) {
client, err := newPubsubClient(ctx)
if err != nil {
return nil, errors.Wrap(ctx, err, "building Pub/Sub client")
}
p := &GooglePubsub{
client: client,
topics: map[string]*pubsub.Topic{},
}
return p, nil
}
func (p *GooglePubsub) publish(ctx context.Context, _ *gorm.DB, ev Eventer, publishPackage string) (messageID string, err error) {
// Marshal the event to JSON, which we use as our format for sending to/from GCP PubSub
// (it just accepts bytes).
payload, err := json.Marshal(ev)
if err != nil {
return "", err
}
// Only retry once for now. We probably want to do this via the Pub/Sub retrier that can
// be configured on the publish client but this is the simplest version for now.
for attempt := 0; attempt < 2; attempt++ {
messageID, err = p.doPublish(ctx, ev, payload, publishPackage, attempt)
if err == nil {
return messageID, nil
}
// If our context was cancelled, we should stop here (possibly app shutdown).
select {
case <-ctx.Done():
return "", ctx.Err()
default:
}
// If the topic doesn't exist, create it and try again.
if isNotFoundErr(err) {
log.Info(ctx, "The topic doesn't exist, creating it now...")
if creationErr := CreateTopicIfNotExists(ctx, p.client, ev.Name()); creationErr != nil {
return "", errors.Wrap(ctx, creationErr, "creating topic")
}
}
}
return "", errors.Wrap(ctx, err, "publishing event")
}
func (p *GooglePubsub) doPublish(ctx context.Context, ev Eventer, payload []byte, publishPackage string, attempt int) (messageID string, err error) {
ctx, span := md.StartSpan(ctx, "GooglePublisher.Publish", md.Metadata{
"topic": ev.Name(),
"retry": attempt,
})
defer span.End()
log.Debug(ctx, "Publishing event")
result := p.getTopic(ev).Publish(ctx, &pubsub.Message{
Data: payload,
Attributes: map[string]string{
"trace_id": span.SpanContext().TraceID().String(),
"span_id": span.SpanContext().SpanID().String(),
"source_package": publishPackage,
},
})
messageID, err = result.Get(ctx)
if err != nil {
return "", errors.Wrap(ctx, err, "publishing message")
}
return messageID, nil
}
// getTopic returns a client for this Pub/Sub topic from our cache, or creates one if it
// doesn't exist.
func (p *GooglePubsub) getTopic(ev Eventer) *pubsub.Topic {
p.Lock() // we might write here, so aggressively lock
defer p.Unlock()
// If cached, we can return straight away.
topic, ok := p.topics[ev.Name()]
if ok {
return topic
}
// Create a new topic client.
topic = p.client.Topic(ev.Name())
// Apply publish settings.
if ev, ok := ev.(EventerWithPublishSettings); ok {
ev.ApplyPublishSettings(&topic.PublishSettings)
} else {
p.applyDefaultSettings(&topic.PublishSettings)
}
// Cache the topic.
p.topics[ev.Name()] = topic
return topic
}
func (p *GooglePubsub) applyDefaultSettings(settings *pubsub.PublishSettings) {
// We don't want to wait ages to build a batch of messages before we send them, but we
// can hit other limits that block publishing (such as running out of goroutines to
// issue publish requests) if we are too aggressive and don't make use of batching,
// causing us to publish each message individually.
//
// We keep this setting at the default of 100 messages and rely on the flow-control to
// adjust according to our message size (e.g. we'll block publishing new messages if we
// exceed a byte threshold, rather than buffering too much and getting oom-killed).
//
// Warning: setting this to 0 is interpreted as "use the default" by the Pub/Sub client.
settings.CountThreshold = 100
// Given we rarely wait synchronously for messages to publish anymore (most of our
// messages are published as a batch implicitly from database transactions) we can
// afford to wait the default 10ms before issuing a publish RPC without it impacting
// users.
//
// This should make our batch sizes much larger, helping with overall throughput of the
// application, and avoiding stalls when we publish too much for an aggressive batching
// strategy to handle.
settings.DelayThreshold = 10 * time.Millisecond // as default
// When we're publishing a _lot_ of messages, block publishing more if the internal
// buffer is getting too large. The default behaviour means that flow control is ignored
// and we try to publish too-large batches (>1000 messages). When those batches then get
// an error and are dropped. Yikes.
//
// By using a blocking flow control, we wait for the buffer to get cleared before we
// continue publishing, which might cause a bit of extra latency.
settings.FlowControlSettings.LimitExceededBehavior = pubsub.FlowControlBlock
// Have an aggressive timeout in case we get a bad connection.
settings.Timeout = 5 * time.Second
}
package eventadapter
import (
"context"
"fmt"
"sync"
"time"
"cloud.google.com/go/pubsub"
"github.com/incident-io/core/server/lib/errors"
"github.com/incident-io/core/server/lib/log"
"github.com/incident-io/core/server/lib/md"
"github.com/incident-io/core/server/lib/safe"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"google.golang.org/api/option"
)
var (
eventsPubSubClientCreatedTotal = promauto.NewCounter(
prometheus.CounterOpts{
Name: "core_events_pubsub_client_created_total",
Help: "Total number of v2 Pub/Sub clients created",
},
)
)
// GoogleSubscriber implements event processing by listening to Google Pub/Sub.
type GoogleSubscriber struct {
// Runtime services like cache and identity that we provide into the subscribers.
deps Dependencies
// publisher is attached to the context within doHandle, meaning that subscribers can
// publish events.
publisher Publisher
// subscriptionIDs ensures that we don't have duplicate subscribers.
subscriptionIDs map[string]bool
sync.RWMutex
}
func NewGoogleSubscriber(ctx context.Context, deps Dependencies, publisher Publisher) *GoogleSubscriber {
return &GoogleSubscriber{
deps: deps,
publisher: publisher,
subscriptionIDs: map[string]bool{},
}
}
func (s *GoogleSubscriber) Subscribe(ctx context.Context, topicName string, handler subscribeHandler, params SubscribeParams) error {
ctx, span := md.StartSpan(ctx, md.SpanName(), md.Metadata{
"topic": topicName, // e.g. "escalation.created"
"subscriber": params.SubscriberID, // e.g. "send-message"
})
defer span.End()
opts := append([]func(*pubsub.ReceiveSettings){
// Apply defaults
s.applyDefaultSettings,
},
// Then allow for overrides
params.ReceiveSettings...,
)
// We use a client per-subscription in contrast to one dedicated client for publishing.
// This decision was made because we've seen issues with sharing a single client across
// the app where we hit the maximum number of streams on a single gRPC connection.
//
// The issues we were hitting are described in this GitHub issue:
// https://github.com/googleapis/nodejs-pubsub/issues/550
client, err := newPubsubClient(
// For subscribers, we disable telemetry to avoid a whole load of useless spans being
// emitted (e.g. `google.pubsub.v1.Subscriber/StreamingPull`), without a legitimate
// trace context to attach them to.
ctx, option.WithTelemetryDisabled(),
)
if err != nil {
return errors.Wrap(ctx, err, "failed to get Pub/Sub client")
}
s.Lock()
defer s.Unlock()
subscriptionID := SubscriptionID(topicName, params.SubscriberID)
if s.subscriptionIDs[subscriptionID] {
return errors.New(ctx, "duplicate subscription IDs for the same topic")
}
s.subscriptionIDs[subscriptionID] = true
// Start a fresh goroutine that will continuously try pulling events from the
// subscription and processing them.
safe.Go(func() {
// Deliberately create a new context here. The subscription is cancelled when the
// context is cancelled, and we don't want our subscriptions to be cancelled, ever.
var (
ctx = md.New(context.Background())
connectionAttempt = 0
)
// We loop infinitely, re-subscribing to events when a failure occurs.
for {
sub := client.Subscription(subscriptionID)
for _, opt := range opts {
opt(&sub.ReceiveSettings)
}
// Keep track of how many times we've tried starting the subscriber.
connectionAttempt++
ctx := md.New(ctx, md.Metadata{
"connection_attempt": connectionAttempt,
})
log.Debug(ctx, "Starting subscriber...")
err := sub.Receive(ctx,
func(ctx context.Context, msg *pubsub.Message) {
defer func() {
var err error
errors.RecoverPanic(recover(), &err)
if err != nil {
log.Error(ctx, errors.Wrap(ctx, err, "panic in Pub/Sub subscription"), map[string]any{
"topic": topicName,
"subscriber": params.SubscriberID,
"source": fmt.Sprintf("%s.%s", topicName, params.SubscriberID),
})
}
}()
sourcePackage, ok := msg.Attributes["source_package"]
if !ok {
sourcePackage = "unknown"
}
// Build the event metadata from our Pub/Sub message.
eventMetadata := EventMetadata{
Adapter: "pubsub",
Topic: topicName,
Subscriber: params.SubscriberID,
SourcePackage: sourcePackage,
MessageID: msg.ID,
ErrorCount: 0,
PublishTime: msg.PublishTime,
ParentTraceID: msg.Attributes["trace_id"],
ParentSpanID: msg.Attributes["span_id"],
}
// We should always set the publisher against the context for our subscriptions,
// as it's never the case that we'd want to subscribe to Pub/Sub but not publish
// through it.
ctx = WithPublisher(ctx, s.publisher)
// Handlers are pre-wrapped to call into process, so all we need to think about
// is Pub/Sub-level concerns like n/acking the message.
err := handler(ctx, s.deps, params, msg.Data, eventMetadata)
if err != nil {
msg.Nack()
} else {
msg.Ack()
}
},
)
// Unwrap the error to get the real error.
err = errors.Cause(err)
switch {
// The subscription didn't exist so create it and re-subscribe.
//
// We deliberately try and blindly subscribe, even if we don't know if it exists.
// This saves Pub/Sub adminstrator quota, which is limited to 100 requests per
// second. Subscribing to a topic doesn't take any quota, so we only expend our
// quota if we need to create a new subscription.
case isNotFoundErr(err):
log.Info(ctx, "Subscription didn't exist, creating it...")
if err := s.createSubscription(ctx, client, topicName, subscriptionID); err == nil {
continue
} else {
log.Error(ctx, errors.Wrap(ctx, err, "creating Pub/Sub subscription, retrying after backoff"))
}
backoff(connectionAttempt)
// We hit any other type of error. Sleep for a bit and try again.
default:
if connectionAttempt > 10 {
log.Error(ctx, errors.Wrap(ctx, err, "subscriber failed, re-connecting with backoff"))
} else if connectionAttempt > 5 {
log.Warn(ctx, errors.Wrap(ctx, err, "subscriber failed, re-connecting with backoff"))
} else {
log.Info(ctx, "subscriber failed, re-connecting with backoff", map[string]any{
"cause": err.Error(),
})
}
backoff(connectionAttempt)
}
}
})
return nil
}
// createSubscription creates a new subscription on Pub/Sub. If the subscription already
// exists, the error is swallowed.
func (s *GoogleSubscriber) createSubscription(ctx context.Context, client *pubsub.Client, topicName, subscriptionID string) error {
ctx, span := md.StartSpan(ctx, "GoogleSubscriber.createSubscription", md.Metadata{
"topic": topicName,
"subscription": subscriptionID,
})
defer span.End()
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
// First, check the topic exists, and create it if it doesn't.
if err := CreateTopicIfNotExists(ctx, client, topicName); err != nil {
return errors.Wrap(ctx, err, "error creating topic")
}
_, err := client.CreateSubscription(ctx, subscriptionID, pubsub.SubscriptionConfig{
Topic: client.Topic(topicName),
AckDeadline: 30 * time.Second,
// When a message is nacked or the deadline is extened, we want to backoff
// exponentially to avoid overloading downstream components if there is a persistent
// error. Without this, messages would be retried over and over with no delay, which
// causes issues like Sentry hitting capacity.
RetryPolicy: &pubsub.RetryPolicy{
MinimumBackoff: 10 * time.Second,
MaximumBackoff: 10 * time.Minute,
},
})
switch {
case err == nil:
log.Info(ctx, "Created subscription")
return nil
// We raced another process to create the subscription. No point in borking out
// as it exists now.
case isAlreadyExistsErr(err):
return nil
default:
return errors.Wrap(ctx, err, "error creating subscription")
}
}
func (s *GoogleSubscriber) applyDefaultSettings(settings *pubsub.ReceiveSettings) {
// By default, limit the number of concurrent goroutines processing an event to 3.
//
// This protects our system from huge numbers of concurrent goroutines stealing CPU or
// hammering the database.
//
// It can still be overridden via the opts parameter, but should only be done so with
// care.
settings.MaxOutstandingMessages = 3
// We have a lot of subscriptions in our codebase, potentially thousands. The default
// configuration in Google Pub/Sub is to use 10 goroutines per-subscription to start
// gRPC streaming connections to pull messages from a subscription.
//
// This optimises for a lot more message traffic than we're likely to need, especially
// considering each subscription only has 3 (from the above default) goroutines that
// are processing messages at any one time.
//
// This avoids our workers consuming a very large amount of CPU endlessly polling
// against gRPC endpoints to no result.
settings.NumGoroutines = 3
}
package eventadapter
import (
"context"
"encoding/json"
"fmt"
"sync"
"time"
"cloud.google.com/go/pubsub"
"github.com/incident-io/core/server/lib/crypto"
"github.com/incident-io/core/server/lib/errors"
"github.com/incident-io/core/server/lib/log"
"github.com/incident-io/core/server/lib/md"
"github.com/incident-io/core/server/lib/safe"
"github.com/oklog/ulid/v2"
"gorm.io/gorm"
)
// InMemory handles subscribing and publishing of events in memory. We only use it when
// developing.
type InMemory struct {
deps Dependencies
subs map[string]map[string]*InMemorySubscription
sync.RWMutex
}
type InMemorySubscription struct {
handler subscribeHandler
params SubscribeParams
}
var (
_ Publisher = &InMemory{}
_ Subscriber = &InMemory{}
)
func NewInMemory(deps Dependencies) *InMemory {
return &InMemory{
deps: deps,
subs: map[string]map[string]*InMemorySubscription{},
}
}
func (i *InMemory) publish(ctx context.Context, _ *gorm.DB, ev Eventer, publishPackage string) (string, error) {
// Marshal the event to JSON so we emulate how Pub/Sub behaves. You'd normally do this
// only if we need it, but we'll always try to serialise the event when using the real
// publisher so best do it always so we catch errors in development that may appear in
// production.
payload, err := json.Marshal(ev)
if err != nil {
return "", err
}
i.RLock() // about to read from handlers
defer i.RUnlock()
// Create an ID now, as even if we have no subscription we want to pretend like we
// 'accepted' an event.
messageID := i.GetMessageID()
// We can have many subscriptions for each event.
subs, ok := i.subs[ev.Name()]
if !ok {
// If there are no subscriptions, we can do nothing.
return messageID, nil
}
_, span := md.StartSpan(ctx, "InMemory.Publish", md.Metadata{
"topic": ev.Name(),
})
defer span.End()
msg := pubsub.Message{
ID: messageID,
Data: payload,
PublishTime: time.Now(),
Attributes: map[string]string{
"trace_id": span.SpanContext().TraceID().String(),
"span_id": span.SpanContext().SpanID().String(),
"source_package": publishPackage,
},
}
// This all happens in goroutines separate from this thread.
for subscriberIDIterable := range subs {
var (
subscriberID = subscriberIDIterable
subscription = subs[subscriberID]
params = subscription.params
ctx = md.New(context.Background(), md.Metadata{
"topic": ev.Name(),
"subscriber": subscriberID,
})
)
safe.Go(func() {
// Wait for a random interval before starting processing, simulating real Pub/Sub
// delay between publishing a message and it being received by a subscriber.
time.Sleep(time.Duration(crypto.Intn(500)) * time.Millisecond)
eventMetadata := EventMetadata{
Adapter: "in_memory",
Topic: ev.Name(),
Subscriber: subscriberID,
SourcePackage: publishPackage,
MessageID: msg.ID,
ErrorCount: 0,
PublishTime: msg.PublishTime,
ParentTraceID: msg.Attributes["trace_id"],
ParentSpanID: msg.Attributes["span_id"],
}
// When we're processing events in-memory, we should publish them in-memory also.
ctx = WithPublisher(ctx, i)
// We'll try to process the event 10 times, with linear back-off. This isn't exactly
// what we do in production, but it's close to how we'd ideally handle things when
// developing.
for attempt := 0; attempt < 10; attempt++ {
err := subscription.handler(ctx, i.deps, params, msg.Data, eventMetadata)
if err != nil {
log.Info(ctx, "Handler failed, retrying in a moment...", map[string]any{
"error": err.Error(),
"retry": attempt,
"delay": time.Duration(attempt) * time.Second,
})
time.Sleep(time.Duration(attempt) * time.Second)
// Increment the error counter so the process function knows it's a retry.
eventMetadata.ErrorCount++
} else {
return // success, no retry necessary
}
}
})
}
return msg.ID, nil
}
func (i *InMemory) Subscribe(ctx context.Context, topicName string, handler subscribeHandler, params SubscribeParams) error {
i.Lock() // about to write
defer i.Unlock()
// Find or create an entry for this topic's subscriptions.
subs, ok := i.subs[topicName]
if !ok {
subs = map[string]*InMemorySubscription{}
i.subs[topicName] = subs
}
// If we find an existing subscription for this ID, we panic, expecting to terminate the
// app. This is allowable because in-memory is used exclusively in development, and this
// stops us from accidentally colliding our subscribers.
_, existingSubscription := subs[params.SubscriberID]
if existingSubscription {
panic(fmt.Sprintf("duplicate subscription ID: topic=%s, subscriber=%s", topicName, params.SubscriberID))
}
// All good, let's register the new subscription ID and callback to the consumer.
i.subs[topicName][params.SubscriberID] = &InMemorySubscription{
handler: handler,
params: params,
}
return nil
}
func (i *InMemory) Schedule(ctx context.Context, _ *gorm.DB, ev EventerSchedulable, deliveryTime time.Time) (string, error) {
messageID := i.GetMessageID()
safe.Go(func() {
// wait until publish time
time.Sleep(time.Until(deliveryTime))
_, err := i.publish(ctx, nil, ev, "unknown")
if err != nil {
log.Error(ctx, errors.Wrap(ctx, err, "publishing scheduled event"))
}
})
return messageID, nil
}
// GetMessageID creates a unique message ID that we'll use to track this message through
// the system. We add an in_memory prefix so if we see this anywhere, it's clear we
// published it with this adapter.
func (i *InMemory) GetMessageID() string {
return fmt.Sprintf("in_memory_%s", ulid.Make().String())
}
package eventadapter
import (
"context"
"fmt"
"time"
"github.com/incident-io/core/server/lib/cache"
"github.com/incident-io/core/server/lib/envcheck"
"github.com/incident-io/core/server/lib/errors"
"github.com/incident-io/core/server/lib/log"
"github.com/incident-io/core/server/lib/md"
"github.com/incident-io/core/server/lib/metrics"
"github.com/incident-io/core/server/lib/safe"
"github.com/incident-io/core/server/lib/traffic"
"github.com/incident-io/core/server/pkg/identity"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/samber/lo"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
)
// Dependencies are runtime constructed dependencies provided for use when processing
// events.
type Dependencies struct {
Cache cache.Service // for storing error counts
Control *traffic.Control // for applying gate controls
Identity *identity.Service // for resolving organisation details
}
// EventMetadata contains details about the specific event that we're processing.
type EventMetadata struct {
Adapter string // e.g. pubsub, in_memory, cloud_scheduler, etc.
Topic string // e.g. escalation.changed
Subscriber string // e.g. sync-message
SourcePackage string // e.g. app/escalator/executor
MessageID string // e.g. 1234567890
ErrorCount int // e.g. 0
PublishTime time.Time // e.g. 2018-01-01T00:00:00Z
ParentTraceID string // e.g. 90ce95ed11b25942403dc821bb5d73a1
ParentSpanID string // e.g. 1234567890
}
// ProcessOutcome is used to signal the result of processing an event.
type ProcessOutcome string
var (
// ProcessOutcomeSuccess is used when an event is processed successfully.
ProcessOutcomeSuccess ProcessOutcome = "success"
// ProcessOutcomeError is used when an event is processed unsuccessfully, and may be
// retried.
ProcessOutcomeError ProcessOutcome = "error"
// ProcessOutcomeExpired is used when we refused to process an event because the time
// between publishing the event and us trying to process it has exceeded the
// subscription stale threshold.
//
// These events are dropped entirely and are not retried.
ProcessOutcomeExpired ProcessOutcome = "expired"
// ProcessOutcomeRateLimited is used when we refused to process an event because it
// exceeded the rate limit. We don't normally apply limits so this usually only happens
// in an incident when we've applied emergency limit overrides.
ProcessOutcomeRateLimited ProcessOutcome = "rate_limited"
)
var (
eventRunningSecondsTotal = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "core_event_running_seconds_total",
Help: "A live updated counter of the number of seconds events have been running for.",
},
[]string{
"topic", // e.g. "escalation.changed"
"subscriber", // e.g. "sync-message"
"package", // e.g. "app/escalator/executor"
"source_package", // e.g. "app/escalator/executor"
},
)
eventProcessedTotal = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "core_event_processed_total",
Help: "Updated whenever we finish processing an event.",
},
[]string{
"topic", // e.g. "escalation.changed"
"subscriber", // e.g. "sync-message"
"package", // e.g. "app/escalator/executor"
"source_package", // e.g. "app/escalator/executor"
"outcome", // e.g. "success", "error", "expired"
},
)
)
var (
// Apply an observation-only gate to event processing, as opposed to a limit gate which
// would apply rate-limits.
//
// As the gate is 'unlimited' we won't apply limits, but we could apply a limit override
// while in production via the traffic CLI if we find an event is problematic, making
// this more akin to a kill-switch.
gateEventSubscription = &traffic.Gate{
Name: "event_subscription",
SampleLimit: 5,
SampleFloor: -1,
GetLimit: func(labels []string) (limit int64, refillRate float64) {
return -1, 0.0 // this is a gate that observes only and does not limit
},
Labels: []string{
"topic",
"subscriber",
},
}
// Unlimited gate per organisation, but as above we could apply a limit
// override while in production via the traffic CLI if we find an event is
// problematic, making this more akin to a kill-switch.
gateEventSubscriptionOrganisation = &traffic.Gate{
Name: "event_subscription_organisation",
SampleLimit: 5,
SampleFloor: -1,
GetLimit: func(labels []string) (limit int64, refillRate float64) {
return -1, 0.0
},
Labels: []string{
"topic",
"subscriber",
"organisation_id",
},
}
)
// process is the single codepath for processing events.
func process(
ctx context.Context,
deps Dependencies, // service injection
subscribeParams SubscribeParams, // subscription behaviour
ev Eventer, // the event we're processing
eventMetadata EventMetadata, // specific event metadata
handler func(ctx context.Context) error, // the actual event handler
) (
outcome ProcessOutcome, err error,
) {
// Apply dependencies to the context.
ctx = cache.WithCache(ctx, deps.Cache)
// Build metadata as we proceed through processing the message.
//
// These will be logged against the event_processed event and is a key part of our
// observability stack. We _must_ log it correctly so we can calculate metrics on event
// activity.
//
// Be aware that changing any of these fields may break dashboards and alerts.
o11y := md.Metadata{
"topic": eventMetadata.Topic, // e.g. "escalation.changed"
"subscriber": eventMetadata.Subscriber, // e.g. "sync-message"
"package": subscribeParams.sourcePackage, // e.g. "app/escalator/executor"
"source_package": eventMetadata.SourcePackage, // e.g. "app/escalator/executor"
"adapter": eventMetadata.Adapter, // e.g. "pubsub"
"message_id": eventMetadata.MessageID, // e.g. "1234567890"
"publish_time": eventMetadata.PublishTime, // e.g. 2018-01-01T00:00:00Z
"source": fmt.Sprintf("%s.%s", eventMetadata.Topic, eventMetadata.Subscriber), // e.g. "escalation.changed.sync-message"
}
// Load any contextual metadata from the event.
{
// Organisation if it's there.
if organisationID := ev.GetOrganisationID(); organisationID != "" {
o11y["organisation_id"] = organisationID
}
// Any other metadata.
if ev, ok := ev.(EventerWithMetadata); ok {
// Reverse merge the custom metadata to ensure you can't possibly overwrite key
// fields.
data := ev.GetMetadata()
data.Merge(o11y)
// Switch it!
o11y = data
}
}
// If we have a parent trace we should create a link back to whatever published this
// message.
var parent trace.SpanContext
if eventMetadata.ParentTraceID != "" && eventMetadata.ParentSpanID != "" {
var (
parentTraceID, _ = trace.TraceIDFromHex(eventMetadata.ParentTraceID)
parentSpanID, _ = trace.SpanIDFromHex(eventMetadata.ParentSpanID)
)
parent = trace.NewSpanContext(trace.SpanContextConfig{
TraceID: parentTraceID,
SpanID: parentSpanID,
Remote: true,
})
}
// From here, we begin tracing.
ctx, span := md.StartNewRootSpanWithParentLink(
ctx, SubscriptionID(eventMetadata.Topic, eventMetadata.Subscriber), parent, trace.SpanKindConsumer, o11y,
)
defer span.End()
// If we have an organisation ID, we should try resolving the name from the identity
// service. This is totally optional, but it's nice to have.
if organisationID, ok := o11y["organisation_id"].(string); ok {
if deps.Identity != nil {
org, err := deps.Identity.GetOrganisation(ctx, organisationID)
if err != nil {
log.Warn(ctx, errors.Wrap(ctx, err, "failed to get organisation from identity service"))
} else {
// Merge the org metadata into the context, adding the organisation_name.
md.Merge(ctx, org.Metadata())
}
}
}
var (
startTime = time.Now()
publishTime = lo.
If(eventMetadata.PublishTime.IsZero(), time.Now()).
Else(eventMetadata.PublishTime)
queueLatency = startTime.Sub(publishTime)
)
// It's really important the app reports accurate metrics about what it is working on
// right now. Most telemetry increments counters only after the work is complete, but
// this leads to time-of-measurement bias where you only find out after the work is done
// how long you've been working on it.
//
// https://blog.lawrencejones.dev/incremental-measurement/
//
// We avoid this by starting a goroutine that periodically increments a counter as we're
// working the event.
done := make(chan struct{})
defer close(done)
safe.Go(func() {
c := eventRunningSecondsTotal.With(prometheus.Labels{
"topic": eventMetadata.Topic,
"subscriber": eventMetadata.Subscriber,
"package": subscribeParams.sourcePackage,
"source_package": eventMetadata.SourcePackage,
})
lastIncremented := time.Now()
for {
select {
case <-done:
c.Add(time.Since(lastIncremented).Seconds()) // take the final measurment
return
// Periodically increment the counter.
case <-time.After(time.Second):
c.Add(time.Since(lastIncremented).Seconds())
lastIncremented = time.Now()
}
}
})
// We care less about when we update the processed counter as we wouldn't expect to see
// this change until an event was done (in contrast to a measure of how long we're
// working events, which we do expect to update periodically).
defer func() {
// Increment the event_processed_total counter.
m, err := eventProcessedTotal.
GetMetricWith(prometheus.Labels{
"topic": eventMetadata.Topic,
"subscriber": eventMetadata.Subscriber,
"package": subscribeParams.sourcePackage,
"source_package": eventMetadata.SourcePackage,
"outcome": string(outcome),
})
if err != nil {
panic(err)
}
m.(prometheus.ExemplarAdder).AddWithExemplar(1, prometheus.Labels{
"traceID": span.SpanContext().TraceID().String(),
})
}()
// Add counters for database usage.
ctx = metrics.NewLogCounter(ctx, metrics.DatabaseDurationCounterKey)
ctx = metrics.NewLogCounter(ctx, metrics.DatabaseTransactionDurationCounterKey)
ctx = metrics.NewLogCounter(ctx, metrics.DatabaseConnectionDurationCounterKey)
ctx = metrics.NewLogCounter(ctx, metrics.DatabaseQueriesCounterKey)
// Initialise a new context metadata for the duration of this message processing.
ctx = md.New(ctx)
// Each subscription has a default urgency set against it. We must now set this default
// against the context so that when we later report errors, the urgency is applied.
ctx = errors.NewDefaultUrgency(ctx, subscribeParams.ErrorUrgency)
// If we're handling an event for a demo org, don't page
if isDemo {
errors.SetDefaultUrgency(ctx, errors.UrgencySentry)
}
// If we've exceeded the stale message threshold, we should drop the message without
// working it.
if time.Since(publishTime) > subscribeParams.StaleThreshold {
log.Info(ctx, "Dropping event because it's above stale message threshold")
outcome = ProcessOutcomeExpired
goto reportOutcome
}
// If we have a traffic controller we should apply the gate.
if deps.Control != nil {
_, ok := deps.Control.Take(ctx, gateEventSubscriptionOrganisation, traffic.Labelset{
eventMetadata.Topic,
eventMetadata.Subscriber,
ev.GetOrganisationID(),
}, 1)
if !ok {
outcome = ProcessOutcomeRateLimited
goto reportOutcome
}
_, ok = deps.Control.Take(ctx, gateEventSubscription, traffic.Labelset{
eventMetadata.Topic,
eventMetadata.Subscriber,
}, 1)
if !ok {
outcome = ProcessOutcomeRateLimited
goto reportOutcome
}
}
// Actually handle the message.
err = func() (err error) {
// Ensure whatever happens in the event handler, we recover from panics.
defer func() {
errors.RecoverPanic(recover(), &err)
}()
return handler(ctx)
}()
// Update the outcome so we log correctly.
if err != nil {
outcome = ProcessOutcomeError
} else {
outcome = ProcessOutcomeSuccess
}
// Provide a named label so we can skip past running the subscriber if we've triggered a
// rate-limit or tried processing an expired message.
reportOutcome:
// Again, these fields are used to build metrics and dashboards on our events. Please
// don't modify without understanding how that may impact our observability.
o11yOutcome := md.Metadata{
"outcome": outcome,
"duration": time.Since(startTime).Seconds(),
"total_latency": time.Since(publishTime).Seconds(),
"queue_latency": queueLatency.Seconds(),
}
if err != nil {
o11yOutcome["error"] = err.Error()
}
if databaseDuration, ok := metrics.GetDurationLogCounter(ctx, metrics.DatabaseDurationCounterKey); ok {
o11yOutcome["database_duration"] = databaseDuration.Seconds()
} else {
log.Warn(ctx, errors.New(nil, "could not find database duration counter"))
}
if transactionDuration, ok := metrics.GetDurationLogCounter(ctx, metrics.DatabaseTransactionDurationCounterKey); ok {
o11yOutcome["database_transaction_duration"] = transactionDuration.Seconds()
} else {
log.Warn(ctx, errors.New(nil, "could not find database transaction duration counter"))
}
if connectionDuration, ok := metrics.GetDurationLogCounter(ctx, metrics.DatabaseConnectionDurationCounterKey); ok {
o11yOutcome["database_connection_duration"] = connectionDuration.Seconds()
} else {
log.Warn(ctx, errors.New(nil, "could not find database connection duration counter"))
}
if databaseQueries, ok := metrics.GetLogCounter(ctx, metrics.DatabaseQueriesCounterKey); ok {
o11yOutcome["database_queries"] = databaseQueries
} else {
log.Warn(ctx, errors.New(nil, "could not find database connection duration counter"))
}
// Depending on how running this went, we'll want to either:
switch outcome {
// If we expired, we want to return now and not to continue into error handling.
case ProcessOutcomeExpired:
log.Info(ctx, "Dropping event because it's above stale message threshold", o11y, o11yOutcome, map[string]any{
"event": "event_processed", // outcome=expired
})
return
// If rate-limited we also want to skip error handling.
case ProcessOutcomeRateLimited:
log.Info(ctx, "Dropping event because it's been rate limited", o11y, o11yOutcome, map[string]any{
"event": "event_processed", // outcome=rate_limited
})
return
// Otherwise emit the event_processed log and proceed to error handling.
case ProcessOutcomeSuccess, ProcessOutcomeError:
log.Info(ctx, "Handled event", o11y, o11yOutcome, map[string]any{
"event": "event_processed", // outcome will state how this went
})
}
// Error handling:
if err != nil {
if ShouldSilentRetry(err, publishTime) {
// Sometimes, our consumer is unable to process an event for legit reasons and
// should be retried.
//
// Examples are:
// - Upserting an incident participant when a timeline item is created. If multiple
// timeline items are created at once, processes may be racing against each other.
// - Consuming a GitHub pull request webhook. Sometimes GitHub sends these webhooks
// before the PR is available to fetch from the API :shrug:
//
// To avoid spamming Sentry, we completely ignore these errors for the first minute
// after the event is published. After that, this is treated like a normal error
// i.e. log two failures then send the error to Sentry.
log.Info(ctx, "Consumer failed for expected reason, retrying...")
} else {
// Mark our (top level) span as failed, ensuring that we don't drop it in our
// sampler.
// Do this regardless of whether we've hit our max error count or not.
span.SetStatus(codes.Error, err.Error())
errorCount, getErrorCountErr := getErrorCount(ctx, eventMetadata.MessageID)
if getErrorCountErr != nil {
log.Warn(ctx, errors.Wrap(ctx, err, "getting error count from cache, assume we've seen an error before"))
// Adjust to be at the Sentry limit so we don't respond to an unavailable cache by
// not sending to Sentry.
errorCount = 2 // we're about to increment it
}
// Increment the error count
errorCount++
log.Info(ctx, "Incremented error count", map[string]any{
"error_count": errorCount,
})
if err := setErrorCount(ctx, eventMetadata.MessageID, errorCount); err != nil {
log.Warn(ctx, errors.Wrap(ctx, err, "failed to set error count"))
}
// We only want to error on the third failed attempt, as this implies that retrying didn't
// help to fix the issue.
//
// Note that we used to send non-pageable errors (i.e. urgency=Sentry) to Sentry for
// the first two failures, but this actually stopped us from getting paged when
// consumers fail on the third attempt because the pageable error got grouped with
// the first two (non-pageable) errors, so the issue was no longer "new".
//
// Now, we just log.Info.
if errorCount < 3 {
if envcheck.IsDevelopment() {
log.Error(ctx, err, o11yOutcome)
} else {
log.Info(ctx, "Encountered error but <3 attempt, retrying...", o11y, map[string]any{
"error_count": errorCount,
"cause": err.Error(),
})
}
} else {
if _, ok := errors.As[SilentlyRetryableError](err); ok {
log.Error(ctx, errors.Wrap(ctx, err, "failed after retrying silently"), o11yOutcome)
} else {
log.Error(ctx, err, o11yOutcome)
}
}
}
}
// Return the error we received from the callback.
return outcome, err
}
package eventadapter
import (
"context"
"fmt"
"time"
"github.com/incident-io/core/server/lib/errors"
"github.com/incident-io/core/server/lib/log"
"github.com/incident-io/core/server/lib/md"
"github.com/incident-io/core/server/lib/safe"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/samber/lo"
"go.opentelemetry.io/otel/trace"
"gorm.io/gorm"
)
const (
PublishOutcomeBatched = "batched-not-yet-sent"
PublishOutcomeAsync = "async-pending"
)
var (
eventPublishedTotal = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "core_event_published_total",
Help: "Total number of times we've published an event.",
},
[]string{
"topic", // e.g. "escalation.changed"
"outcome", // e.g. "success", "error"
"package", // e.g. "app/escalator/executor"
},
)
)
// ErrPublishTimedOut is returned when we've timed out publishing a message to a topic.
type ErrPublishTimedOut struct {
cause error // deliberately don't allow recursing
}
func (e ErrPublishTimedOut) Error() string {
return fmt.Sprintf("timed out publishing to topic: %s", e.cause.Error())
}
// Publish is the entry point for publishing. It loads the appropriate publisher from the
// context (in production this will be the Google publisher) and then calls the
// publisher's Publish method.
func Publish(ctx context.Context, db *gorm.DB, ev Eventer) (string, error) {
return publish(ctx, db, ev, getCallerPackageName())
}
func publish(ctx context.Context, db *gorm.DB, ev Eventer, publishPackage string) (string, error) {
// If we're in batch mode, we should add the event to the batch and return immediately.
batch := GetBatch(ctx)
if batch != nil {
batch.Add(ev)
return PublishOutcomeBatched, nil
}
publisher := PublisherFromContext(ctx)
if publisher == nil {
return "", errors.New(nil, "no publisher found on context")
}
// Currently, we only validate events that go via Schedule(), so this validation does
// not trigger a hard failure. Once our logs show we have no soft failures, this should
// become a hard failure.
if err := ev.Validate(); err != nil {
log.Warn(ctx, errors.Wrap(ctx, err, "event failed validation during publish"))
}
// If we're in async mode, we want to publish the event asynchronously.
async := IsAsyncPublish(ctx)
// This is where we actually publish the message.
publish := func(ctx context.Context, parent trace.SpanContext) (messageID string, err error) {
// Tag the span with our parent, allowing us to link the publish to the original
// caller. If this is synchronous then we have a nil parent, which is normal.
ctx, span := md.StartNewRootSpanWithParentLink(
ctx, "eventadapter.Publish", parent, trace.SpanKindProducer,
md.Metadata{
"topic": ev.Name(),
"publisher": fmt.Sprintf("%T", publisher),
"package": publishPackage,
"kind": "publisher",
},
)
defer span.End()
defer func(startAt time.Time) {
if err != nil {
eventPublishedTotal.WithLabelValues(ev.Name(), "error", publishPackage).Inc()
} else {
eventPublishedTotal.WithLabelValues(ev.Name(), "success", publishPackage).Inc()
}
log.Info(ctx, "Published event", md.Metadata{
"event": "event_published",
"topic": ev.Name(),
"publish_mode": lo.If(async, "sync").Else("async"),
"publisher": fmt.Sprintf("%T", publisher),
"package": publishPackage,
"message_id": messageID,
"duration": time.Since(startAt).Seconds(),
"outcome": lo.If(err != nil, "error").Else("success"),
})
}(time.Now())
messageID, err = publisher.publish(ctx, db, ev, publishPackage)
if err != nil {
// If we've received a context canceled it implies either:
//
// 1. We timed out trying to publish messages into the underlying event system.
// 2. The context we've been provided to use for publishing
//
// In the case of (2) we don't want to return a timeout error, as people will read a
// publish timeout as our publishing system being slow rather than us having
// cancelled our attempt to publish. So in this case we return the error straight
// back up and rely on our system to suppress context cancelation errors.
//
// In (1) we definitely want to page, though, so we create a fresh custom error type
// and wrap it (generating a new stack trace at this moment, so that Sentry can show
// us what failed to publish) and tag it with the topic that's gone wrong.
//
// So: if the parent context is still active...
if ctx.Err() == nil {
// And the error we got from the child frame implies a timeout...
if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
return "", errors.WithMetadata(errors.Wrap(ctx, ErrPublishTimedOut{err}, "publishing"), md.Metadata{
"topic": ev.Name(),
})
}
}
return "", errors.Wrap(ctx, err, "publishing event")
}
return messageID, nil
}
// If we're async we want to publish the event separately from our current thread and
// against the background context. This function should return immediately so the caller
// can resume whatever they were doing.
if async {
var (
parent = trace.SpanContext{} // store a handle to the parent
ctx = md.New(context.Background()) // but create a fresh context from background
)
if span, ok := md.SpanFromContext(ctx); ok {
parent = span.SpanContext()
}
// If we're in tests we never want to do this async as we won't be able to expect on
// what we've published.
if _, ok := publisher.(*TestPublishSubscriber); ok {
_, err := publish(ctx, parent)
if err != nil {
panic(err) // in test mode
}
} else {
safe.Go(func() {
_, err := publish(ctx, parent)
if err != nil {
log.Error(ctx, errors.Wrap(ctx, err, "async publishing event"))
}
})
}
// Provide a message ID that explains this has been handled asynchronously.
return PublishOutcomeAsync, nil
}
// Otherwise we're synchronous and just need to publish.
messageID, err := publish(ctx, trace.SpanContext{})
if err != nil {
return "", err
}
return messageID, nil
}
const (
PublisherContextKey ctxKey = "eventadapter.Publisher"
)
// Publisher is the interface for publishing events.
type Publisher interface {
// publish takes the event and either:
// - publishes it straight away, returning the message ID
// - adds the event to a batch which will be published later (this happens by default when you're
// in a transaction and using pubsub; we'll only publish the batch once the transaction has committed)
//
// The *gorm.DB argument is unused, but is left here for use with any future outbox, to
// allow for transactional publishing, and to maintain a common interface with the old
// eventadapter.Publisher. We should consider removing this argument, and making it a
// field on a specific implementation of the publisher.
publish(ctx context.Context, db *gorm.DB, ev Eventer, publishPackage string) (string, error)
// Schedule schedules an event to be published at a later time.
Schedule(
ctx context.Context, db *gorm.DB, event EventerSchedulable, deliveryTime time.Time,
) (
taskName string, err error,
)
}
// PublisherFromContext returns the publisher that has been set on the context.
func PublisherFromContext(ctx context.Context) Publisher {
if svc, ok := ctx.Value(PublisherContextKey).(Publisher); ok {
return svc
}
return nil
}
// WithPublisher sets the publisher on the context.
func WithPublisher(ctx context.Context, publisher Publisher) context.Context {
return context.WithValue(ctx, PublisherContextKey, publisher)
}
package eventadapter
import "strings"
// This is the event registry where we store events whenever parts of the app subscribe to
// them.
var (
// All normal events (e.g. those that aren't schedulable).
Registry = map[string]Eventer{}
// registrySchedulable contains only schedulable eventers, keyed by the Google Cloud
// Tasks queue name, rather than the event name.
RegistrySchedulable = make(map[string]EventerSchedulable)
)
// Register adds an event so it becomes available to publish and subscribe to.
func Register(ev Eventer) {
_, ok := Registry[ev.Name()]
if ok {
panic("already registered event with the same name")
}
// Add to the registry.
Registry[ev.Name()] = ev
if ev, isSchedulable := ev.(EventerSchedulable); isSchedulable {
queueName := schedulableQueueName(ev)
_, ok := RegistrySchedulable[queueName]
if ok {
panic("already registered schedulable event with same name")
}
// Add to the schedulable registry.
RegistrySchedulable[queueName] = ev
}
}
// schedulableQueueName defines the Google Cloud Tasks queue name for this event, which
// must avoid the `.` character.
func schedulableQueueName(ev EventerSchedulable) string {
return strings.ReplaceAll(ev.Name(), ".", "--")
}
package eventadapter
import (
"context"
"encoding/json"
"reflect"
"strings"
"time"
"cloud.google.com/go/pubsub"
"github.com/incident-io/core/server/lib/errors"
"github.com/incident-io/core/server/lib/md"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/samber/lo"
"gorm.io/gorm"
)
// SubscribeHandler is what should be called to handle receiving an event. This is the
// type of function users of the event package will provide, while this package implements
// the translation of adapter data into the event types.
type SubscribeHandler[EV Eventer] func(
ctx context.Context, db *gorm.DB, ev *EV, eventMetadata EventMetadata,
) error
var (
subscriptionGoroutines = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: "core_event_subscription_worker_count",
Help: "Number of workers currently processing our subscriptions.",
},
[]string{
"topic", // e.g. "escalation.changed"
"subscriber", // e.g. "sync-message"
"package", // e.g. "app/escalator/executor"
"team", // e.g. "on-call"
},
)
subscriptionAlertBacklogThreshold = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "core_event_subscription_alert_backlog_threshold",
Help: "Threshold for backlog size which triggers an alert.",
},
[]string{
"topic", // e.g. "escalation.changed"
"subscriber", // e.g. "sync-message"
"alert_name", // e.g. "default"
"urgency", // e.g. "page"
},
)
subscriptionAlertAgeThresholdSeconds = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "core_event_subscription_alert_age_threshold_seconds",
Help: "Threshold for oldest unacked message in queue which triggers an alert in seconds.",
},
[]string{
"topic", // e.g. "escalation.changed"
"subscriber", // e.g. "sync-message"
"alert_name", // e.g. "default"
"urgency", // e.g. "page"
},
)
)
func RegisterSubscriptionServiceMetrics(registry *prometheus.Registry) {
registry.MustRegister(subscriptionAlertBacklogThreshold)
registry.MustRegister(subscriptionAlertAgeThresholdSeconds)
}
// Subscribe registers a callback to process an event for this subscription.
func Subscribe[EV Eventer](ctx context.Context, db *gorm.DB, subscriber Subscriber, handler SubscribeHandler[EV], params SubscribeParams) error {
if params.ErrorUrgency == "" {
params.ErrorUrgency = errors.UrgencyPage
}
if params.StaleThreshold == 0 {
return errors.New(nil, "StaleThreshold must be set")
}
if params.SubscriberID == "" {
return errors.New(nil, "SubscriberID must be set")
}
if handler == nil {
return errors.New(nil, "Handler must be set")
}
if strings.Contains(params.SubscriberID, ".") || strings.Contains(params.SubscriberID, "_") {
return errors.New(nil, "SubscriberID mustn't contain '.' or '_', use '-' instead. (received '%s')", params.SubscriberID)
}
params.sourcePackage = getCallerPackageName()
team, _, err := errors.InferMetadata(errors.New(ctx, "to generate a stack trace"))
if err != nil {
return errors.Wrap(ctx, err, "getting team from stack trace")
}
if team == nil {
team = lo.ToPtr("unknown")
}
// Capture a concrete type.
var evType EV
// This function has access to a concrete type, which means we have information at
// runtime that we can use to parse incoming data into.
//
// Once we register this into the subscriber we'll lose this (through downcasting into
// an event interface) so we must now create a new handler that handles the decoding for
// us.
wrappedHandler := func(ctx context.Context, deps Dependencies, subscribeParams SubscribeParams, data []byte, eventMetadata EventMetadata) error {
var ev EV
// Try to parse the payload into the event struct.
if err := json.Unmarshal(data, &ev); err != nil {
return errors.WithMetadata(errors.Wrap(ctx, err, "error unmarshalling event"), errors.MD{
"topic": evType.Name(),
"subscriber": params.SubscriberID,
})
}
// Wrap the handler in a call to process, ensuring we provide consistent error
// handling/telemetry.
_, err := process(
ctx, deps, subscribeParams, ev, eventMetadata, func(ctx context.Context) error {
return handler(ctx, db, &ev, eventMetadata)
},
)
if err != nil {
return errors.WithMetadata(errors.Wrap(ctx, err, "processing event"), errors.MD{
"topic": evType.Name(),
"subscriber": params.SubscriberID,
})
}
return nil
}
ctx = md.New(ctx, md.Metadata{
"topic": evType.Name(),
"subscriber": params.SubscriberID,
})
// Now subscribe!
err = subscriber.Subscribe(ctx, evType.Name(), wrappedHandler, params)
if err != nil {
return errors.Wrap(ctx, err, "subscribing to event")
}
receiveSettings := buildReceiveSettings(params)
// Bump our metric to track how many goroutines are processing this subscription.
subscriptionGoroutines.With(prometheus.Labels{
"topic": evType.Name(),
"subscriber": params.SubscriberID,
"package": params.sourcePackage,
"team": *team,
}).Set(float64(receiveSettings.MaxOutstandingMessages))
err = applyAlertMetrics(ctx, evType.Name(), params.SubscriberID, params.AlertParams)
if err != nil {
return errors.Wrap(ctx, err, "applying alert metrics for subscription")
}
return nil
}
// applyAlertMetrics sets the service level metrics that define when we will alert on a
// particular subscription. We set a metric for backlog and age for each alert, which
// we'll use in grafana to define our alerts for each subscription.
func applyAlertMetrics(ctx context.Context, topic string, subscriber string, params AlertParams) error {
// We'll default to Slack urgency for all alerts, unless higher level is specified
if params.Urgency == "" {
params.Urgency = AlertUrgencySlack
}
// Our default alert is 50 messages in the backlog with the oldest unacked message at 5
// minutes or older.
if params.Backlog == 0 {
params.Backlog = 50
}
if params.Age == 0 {
params.Age = 5 * time.Minute
}
subscriptionAlertBacklogThreshold.With(prometheus.Labels{
"topic": topic,
"subscriber": subscriber,
"alert_name": "default",
"urgency": string(params.Urgency),
}).Set(float64(params.Backlog))
subscriptionAlertAgeThresholdSeconds.With(prometheus.Labels{
"topic": topic,
"subscriber": subscriber,
"alert_name": "default",
"urgency": string(params.Urgency),
}).Set(params.Age.Seconds())
return nil
}
// SubscribeUntyped registers a callback to process an event for this subscription, but
// does it without using generics by dynamically reflecting on the event type.
//
// This is only useful if you don't have the type of the event at hand, otherwise use
// Subscribe instead.
func SubscribeUntyped(ctx context.Context, db *gorm.DB, subscriber Subscriber, eventName string, handler SubscribeHandler[Eventer], params SubscribeParams) error {
// Check this event is in our registry: if it's not there, it's likely we're dealing
// with an invalid event type.
registeredEvent, ok := Registry[eventName]
if !ok {
return errors.WithMetadata(errors.New(ctx, "registering unknown event"), errors.MD{
"topic": eventName,
"subscriber": params.SubscriberID,
})
}
// Some registered events are a pointer and some are not. Normalise to just always
// not-a-pointer type.
evType := reflect.TypeOf(registeredEvent)
if evType.Kind() == reflect.Pointer {
evType = evType.Elem()
}
// Now we've resolved the event type by reflection, we can wrap our handler and decode
// the incoming event payload ourselves.
wrappedHandler := func(ctx context.Context, deps Dependencies, subscribeParams SubscribeParams, data []byte, eventMetadata EventMetadata) error {
// Create a fresh struct, to avoid weird issues with lots of things using the same
// struct. This has a compiler-type of `any`, but when we pass it to
// `json.Unmarshall`, `reflect.ValueOf` will say that this is a `*Ev`, and so we will
// unmarshall correctly.
//
// We do *not* cast to `*EV` here, because `EV` won't always be a concrete type: it
// might be the `Eventer` interface. If that happens, `json.Unmarshal` will get
// confused and not be able to unmarshall properly.
unmarshalled := reflect.New(evType).Interface()
// Try to parse the payload into the event struct.
if err := json.Unmarshal(data, &unmarshalled); err != nil {
return errors.WithMetadata(errors.Wrap(ctx, err, "error unmarshalling event"), errors.MD{
"topic": evType.Name(),
"subscriber": params.SubscriberID,
})
}
ev := unmarshalled.(Eventer)
// Wrap the handler in a call to process, ensuring we provide consistent error
// handling/telemetry.
_, err := process(
ctx, deps, subscribeParams, ev, eventMetadata, func(ctx context.Context) error {
return handler(ctx, db, &ev, eventMetadata)
},
)
if err != nil {
return errors.WithMetadata(errors.Wrap(ctx, err, "processing event"), errors.MD{
"topic": evType.Name(),
"subscriber": params.SubscriberID,
})
}
return err
}
// Now subscribe!
err := subscriber.Subscribe(ctx, evType.Name(), wrappedHandler, params)
if err != nil {
return errors.WithMetadata(err, errors.MD{
"topic": evType.Name(),
"subscriber": params.SubscriberID,
})
}
return nil
}
func buildReceiveSettings(params SubscribeParams) *pubsub.ReceiveSettings {
receiveSettings := &pubsub.ReceiveSettings{}
gs := GoogleSubscriber{}
gs.applyDefaultSettings(receiveSettings)
for _, override := range params.ReceiveSettings {
override(receiveSettings)
}
return receiveSettings
}
// Subscriber is implemented by all subscribers.
type Subscriber interface {
// Subscribe sets the given handler to run whenever we receive an event on the given
// topic, for the subscriber ID provided in the params.
Subscribe(ctx context.Context, topicName string, handler subscribeHandler, params SubscribeParams) error
}
// subscribeHandler is a function to be called by the internal adapter implementation
// whenever a new event has been received.
//
// It is almost never the case that anyone outside of this package will need this
// function, as it uses interfaces instead of conrete types for the event. Instead, we
// will build a subscribe handler for each event type so we can get generic type safety.
type subscribeHandler func(
ctx context.Context, deps Dependencies, subscribeParams SubscribeParams, data []byte, eventMetadata EventMetadata,
) error
// SubscribeParams controls how a subscription behaves, including both provisioning in the
// adapter and runtime settings like a stale threshold.
type SubscribeParams struct {
// SubscriberID is the ID of the subscriber. It must be unique for the topic, and should
// avoid repeating the topic name.
SubscriberID string
// ErrorUrgency is what we set on errors from this subscription.
ErrorUrgency errors.Urgency
// StaleThreshold is the maximum amount of time we'll wait for a message to be processed
// before dropping it from the queue. This is configured in GCP against the
// subscription.
StaleThreshold time.Duration
// ReceiveSettings are the settings we'll use when receiving messages from the
// subscription.
ReceiveSettings []func(*pubsub.ReceiveSettings)
// AlertParams defines when we should alert for this subscriber, and whether we'll be paged.
AlertParams AlertParams
// sourcePackage is where the subscription handler is defined.
sourcePackage string
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment