Skip to content

Instantly share code, notes, and snippets.

@skymeyer
Created February 8, 2022 09:36
Show Gist options
  • Save skymeyer/14d6e45321574241d50647b2f03fb3b4 to your computer and use it in GitHub Desktop.
Save skymeyer/14d6e45321574241d50647b2f03fb3b4 to your computer and use it in GitHub Desktop.
cloudevents golang sdk - Pub/Sub with and without ordering key
package main
import (
"context"
"fmt"
"log"
"os"
"time"
"cloud.google.com/go/pubsub"
cepubsub "github.com/cloudevents/sdk-go/protocol/pubsub/v2"
ce "github.com/cloudevents/sdk-go/v2"
"github.com/google/uuid"
"github.com/kelseyhightower/envconfig"
)
// export PROJECTID=skymeyer-playground
// export TOPICID=test-ordering
// export SUBSCRIPTIONID=test-ordering-sub
type envConfig struct {
ProjectID string
TopicID string
SubscriptionID string
}
type Example struct {
Foo string
}
func receive(ctx context.Context, event ce.Event) error {
fmt.Printf("Event Context: %+v\n", event.Context)
data := &Example{}
if err := event.DataAs(data); err != nil {
fmt.Printf("Got Data Error: %s\n", err.Error())
}
fmt.Printf("Data processed: %+v\n", data)
fmt.Printf("----------------------------\n")
return nil
}
func main() {
ctx := context.Background()
var env envConfig
if err := envconfig.Process("", &env); err != nil {
log.Printf("[ERROR] Failed to process env var: %s", err)
os.Exit(1)
}
t, err := cepubsub.New(context.Background(),
cepubsub.WithProjectID(env.ProjectID),
cepubsub.WithTopicID(env.TopicID),
cepubsub.WithSubscriptionID(env.SubscriptionID),
cepubsub.WithReceiveSettings(&pubsub.ReceiveSettings{
MaxOutstandingMessages: 1, // This is key to not get parallel messages
}),
// Both topic and subscription are created in advance. Note:
// Ensure your subscription has message ordering enabled when it was created in GCP.
cepubsub.AllowCreateTopic(false),
cepubsub.AllowCreateSubscription(false),
)
if err != nil {
log.Fatalf("failed to create pubsub protocol, %s", err.Error())
}
c, err := ce.NewClient(t)
if err != nil {
log.Fatalf("failed to create client, %s", err.Error())
}
log.Printf("Producing messages without ordering key ...\n")
for i := 1; i <= 5; i++ {
// Create a new event with incremental counter
event := ce.NewEvent()
event.SetID(uuid.New().String())
event.SetType("example.stuff")
event.SetSource("example/stuff")
event.SetData(ce.ApplicationJSON, &Example{Foo: fmt.Sprintf("bar_%d", i)})
log.Printf("Producing message %d [%s]...\n", i, event.ID())
if res := c.Send(ctx, event); !ce.IsACK(res) {
log.Fatalf("failed sending %d: %+v", i, res)
}
}
log.Println("Wait a bit before starting receiver ...")
time.Sleep(5 * time.Second)
log.Println("Created client, listening...")
if err := c.StartReceiver(ctx, receive); err != nil {
log.Fatalf("failed to start pubsub receiver, %s", err.Error())
}
}
$ go run ./pubsub-no-order
2022/02/08 01:31:35 Producing messages without ordering key ...
2022/02/08 01:31:35 Producing message 1 [a3a23fe9-9f93-4363-8506-53a329a27026]...
2022/02/08 01:31:36 Producing message 2 [eb6d771c-5435-4d1c-875d-0d19bc764463]...
2022/02/08 01:31:36 Producing message 3 [2da1a55c-2bc6-48a2-a27d-22fc370a9d4e]...
2022/02/08 01:31:36 Producing message 4 [49301d40-c1f6-4165-ab1d-506deda160c5]...
2022/02/08 01:31:36 Producing message 5 [b71fe450-afe0-4106-93a4-765f75cf8682]...
2022/02/08 01:31:36 Wait a bit before starting receiver ...
2022/02/08 01:31:41 Created client, listening...
{"level":"info","ts":1644312701.750985,"logger":"fallback","caller":"v2@v2.8.0/protocol.go:195","msg":"starting subscriber for Topic \"\", Subscription \"test-ordering-sub\""}
{"level":"info","ts":1644312701.751117,"logger":"fallback","caller":"v2@v2.8.0/protocol.go:198","msg":"conn is&{false false skymeyer-playground 0xc000476200 <nil> test-ordering-sub <nil> {0 0} 0xc00012b0c0 <nil> <nil> false }"}
Event Context: Context Attributes,
specversion: 1.0
type: example.stuff
source: example/stuff
id: 2da1a55c-2bc6-48a2-a27d-22fc370a9d4e
datacontenttype: application/json
Data processed: &{Foo:bar_3}
----------------------------
Event Context: Context Attributes,
specversion: 1.0
type: example.stuff
source: example/stuff
id: b71fe450-afe0-4106-93a4-765f75cf8682
datacontenttype: application/json
Data processed: &{Foo:bar_5}
----------------------------
Event Context: Context Attributes,
specversion: 1.0
type: example.stuff
source: example/stuff
id: 49301d40-c1f6-4165-ab1d-506deda160c5
datacontenttype: application/json
Data processed: &{Foo:bar_4}
----------------------------
Event Context: Context Attributes,
specversion: 1.0
type: example.stuff
source: example/stuff
id: a3a23fe9-9f93-4363-8506-53a329a27026
datacontenttype: application/json
Data processed: &{Foo:bar_1}
----------------------------
Event Context: Context Attributes,
specversion: 1.0
type: example.stuff
source: example/stuff
id: eb6d771c-5435-4d1c-875d-0d19bc764463
datacontenttype: application/json
Data processed: &{Foo:bar_2}
----------------------------
package main
import (
"context"
"fmt"
"log"
"os"
"time"
"cloud.google.com/go/pubsub"
cepubsub "github.com/cloudevents/sdk-go/protocol/pubsub/v2"
ce "github.com/cloudevents/sdk-go/v2"
"github.com/google/uuid"
"github.com/kelseyhightower/envconfig"
"google.golang.org/api/option"
)
// export PROJECTID=skymeyer-playground
// export TOPICID=test-ordering
// export SUBSCRIPTIONID=test-ordering-sub
type envConfig struct {
ProjectID string
TopicID string
SubscriptionID string
}
type Example struct {
Foo string
}
func receive(ctx context.Context, event ce.Event) error {
fmt.Printf("Event Context: %+v\n", event.Context)
data := &Example{}
if err := event.DataAs(data); err != nil {
fmt.Printf("Got Data Error: %s\n", err.Error())
}
fmt.Printf("Data processed: %+v\n", data)
fmt.Printf("----------------------------\n")
return nil
}
func main() {
ctx := context.Background()
var env envConfig
if err := envconfig.Process("", &env); err != nil {
log.Printf("[ERROR] Failed to process env var: %s", err)
os.Exit(1)
}
// When producing messages with ordering, you probably want to send them to a specific region.
// This is only really required in case there are multiple producers spread across different
// regions to ensure all message with the same ordering key end up in the same region, see:
// https://cloud.google.com/pubsub/docs/ordering#receiving_messages_in_order
// https://cloud.google.com/pubsub/docs/reference/service_apis_overview#service_endpoints
psc, err := pubsub.NewClient(ctx, env.ProjectID,
option.WithEndpoint("us-central1-pubsub.googleapis.com:443"),
)
if err != nil {
log.Fatalf("failed setting up pubsub client: %+v", err)
}
t, err := cepubsub.New(context.Background(),
cepubsub.WithProjectID(env.ProjectID),
cepubsub.WithTopicID(env.TopicID),
cepubsub.WithSubscriptionID(env.SubscriptionID),
cepubsub.WithReceiveSettings(&pubsub.ReceiveSettings{
MaxOutstandingMessages: 1, // This is key to not get parallel messages
}),
// Both topic and subscription are created in advance. Note:
// Ensure your subscription has message ordering enabled when it was created in GCP.
cepubsub.AllowCreateTopic(false),
cepubsub.AllowCreateSubscription(false),
// Following settings are for the event producer specifically to allow producing messages
// with an ordering key passed along in the producer context (see below).
cepubsub.WithMessageOrdering(),
cepubsub.WithClient(psc), // see note above regarding regional endpoint client
)
if err != nil {
log.Fatalf("failed to create pubsub protocol, %s", err.Error())
}
c, err := ce.NewClient(t)
if err != nil {
log.Fatalf("failed to create client, %s", err.Error())
}
// Create an ordering key and pass it along to our producer context
orderKey := uuid.New().String()
pctx := cepubsub.WithOrderingKey(ctx, orderKey)
log.Printf("Producing messages with ordering key %q ...\n", orderKey)
for i := 1; i <= 5; i++ {
// Create a new event with incremental counter
event := ce.NewEvent()
event.SetID(uuid.New().String())
event.SetType("example.stuff")
event.SetSource("example/stuff")
event.SetData(ce.ApplicationJSON, &Example{Foo: fmt.Sprintf("bar_%d", i)})
log.Printf("Producing message %d [%s]...\n", i, event.ID())
if res := c.Send(pctx, event); !ce.IsACK(res) {
log.Fatalf("failed sending %d: %+v", i, res)
}
}
log.Println("Wait a bit before starting receiver ...")
time.Sleep(5 * time.Second)
log.Println("Created client, listening...")
if err := c.StartReceiver(ctx, receive); err != nil {
log.Fatalf("failed to start pubsub receiver, %s", err.Error())
}
}
$ go run ./pubsub-with-order
2022/02/08 01:32:00 Producing messages with ordering key "e55e405b-4f3b-40e6-9b6e-d6bee672b100" ...
2022/02/08 01:32:00 Producing message 1 [a59f3356-f0f6-4eeb-9ddc-54acc44e47a4]...
2022/02/08 01:32:02 Producing message 2 [06cd231c-6ccf-4a76-94c2-743784d7848a]...
2022/02/08 01:32:02 Producing message 3 [38361a02-f508-40e6-b7af-cc3004c787f9]...
2022/02/08 01:32:03 Producing message 4 [ca585f32-d382-4789-bcff-836d66a191ac]...
2022/02/08 01:32:03 Producing message 5 [cfa566f5-9bdb-4e38-9c58-c4f1ed7029bf]...
2022/02/08 01:32:03 Wait a bit before starting receiver ...
2022/02/08 01:32:08 Created client, listening...
{"level":"info","ts":1644312728.664626,"logger":"fallback","caller":"v2@v2.8.0/protocol.go:195","msg":"starting subscriber for Topic \"\", Subscription \"test-ordering-sub\""}
{"level":"info","ts":1644312728.664775,"logger":"fallback","caller":"v2@v2.8.0/protocol.go:198","msg":"conn is&{false false skymeyer-playground 0xc000616200 <nil> test-ordering-sub <nil> {0 0} 0xc000618240 <nil> <nil> true }"}
Event Context: Context Attributes,
specversion: 1.0
type: example.stuff
source: example/stuff
id: a59f3356-f0f6-4eeb-9ddc-54acc44e47a4
datacontenttype: application/json
Data processed: &{Foo:bar_1}
----------------------------
Event Context: Context Attributes,
specversion: 1.0
type: example.stuff
source: example/stuff
id: 06cd231c-6ccf-4a76-94c2-743784d7848a
datacontenttype: application/json
Data processed: &{Foo:bar_2}
----------------------------
Event Context: Context Attributes,
specversion: 1.0
type: example.stuff
source: example/stuff
id: 38361a02-f508-40e6-b7af-cc3004c787f9
datacontenttype: application/json
Data processed: &{Foo:bar_3}
----------------------------
Event Context: Context Attributes,
specversion: 1.0
type: example.stuff
source: example/stuff
id: ca585f32-d382-4789-bcff-836d66a191ac
datacontenttype: application/json
Data processed: &{Foo:bar_4}
----------------------------
Event Context: Context Attributes,
specversion: 1.0
type: example.stuff
source: example/stuff
id: cfa566f5-9bdb-4e38-9c58-c4f1ed7029bf
datacontenttype: application/json
Data processed: &{Foo:bar_5}
----------------------------
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment