Skip to content

Instantly share code, notes, and snippets.

@bradleyfalzon
Created March 20, 2017 07:03
Show Gist options
  • Save bradleyfalzon/a3e88d33e2597c834d83ba33865c6aa0 to your computer and use it in GitHub Desktop.
Save bradleyfalzon/a3e88d33e2597c834d83ba33865c6aa0 to your computer and use it in GitHub Desktop.
main.go for pubsub: new Subscription.Receive methods and problems processing one message at a time
package main
import (
"fmt"
"log"
"runtime"
"time"
"cloud.google.com/go/pubsub"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
)
func main() {
q, err := newGCPPubSubQueue(context.TODO(), "gopherci-dev", "")
if err != nil {
log.Fatal(err)
}
go q.receive(context.TODO()) // receive messages in a background thread
// with just this call "GCPPubSubQueue: I am doing the work for" is printed once
q.queue()
// after a few moments add another message to the queue
time.Sleep(5 * time.Second)
// with this call and the previous "GCPPubSubQueue: I am doing the work for"
// is printed three times every time, i expect twice (but will accept that in some
// cases it could be redelivered multiple times.
q.queue()
runtime.Goexit() // block forever
}
// GCPPubSubQueue is a queue using Google Compute Platform's PubSub product.
type GCPPubSubQueue struct {
ctx context.Context // stop listening when this context is cancelled
topic *pubsub.Topic
subscription *pubsub.Subscription
}
func newGCPPubSubQueue(ctx context.Context, projectID, topicName string) (*GCPPubSubQueue, error) {
q := &GCPPubSubQueue{ctx: ctx}
client, err := pubsub.NewClient(ctx, projectID)
if err != nil {
return nil, fmt.Errorf("NewGCPPubSubQueue: could not create client: %v", err)
}
log.Print("NewGCPPubSubQueue: creating topic")
q.topic, err = client.CreateTopic(ctx, "test-topic")
if code := grpc.Code(err); code != codes.OK && code != codes.AlreadyExists {
return nil, fmt.Errorf("NewGCPPubSubQueue: could not create topic: %v", err)
}
log.Print("NewGCPPubSubQueue: creating subscription")
q.subscription, err = client.CreateSubscription(ctx, "test-subscription", q.topic, 0, nil)
if code := grpc.Code(err); code != codes.OK && code != codes.AlreadyExists {
return nil, fmt.Errorf("NewGCPPubSubQueue: could not create subscrption: %v", err)
}
// this successfilly limits concurrency but causes the issue
q.subscription.ReceiveSettings.MaxOutstandingMessages = 1
return q, nil
}
// queue pushes a message onto the PubSub queue
func (q *GCPPubSubQueue) queue() error {
res := q.topic.Publish(q.ctx, &pubsub.Message{Data: []byte{0x01}})
msgID, err := res.Get(q.ctx)
if err != nil {
log.Printf("GCPPubSubQueue: failed publishing message error: %v", err)
}
log.Println("GCPPubSubQueue: published a message with a message ID:", msgID)
return nil
}
// receive receives messages from the queue
func (q *GCPPubSubQueue) receive(ctx context.Context) {
err := q.subscription.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
log.Printf("GCPPubSubQueue: processing ID %v, published at %v", msg.ID, msg.PublishTime)
// whether i ack this before or after the sleep, the issue still occurs
msg.Ack()
log.Printf("GCPPubSubQueue: ack'd ID %v", msg.ID)
log.Println("GCPPubSubQueue: I am doing the work for", msg.ID)
time.Sleep(15 * time.Second)
log.Printf("GCPPubSubQueue: successfully processed ID %v", msg.ID)
})
log.Printf("GCPPubSubQueue: could not receive on subscription: %v", err)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment