-
-
Save bradleyfalzon/a3e88d33e2597c834d83ba33865c6aa0 to your computer and use it in GitHub Desktop.
main.go for pubsub: new Subscription.Receive methods and problems processing one message at a time
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"fmt" | |
"log" | |
"runtime" | |
"time" | |
"cloud.google.com/go/pubsub" | |
"golang.org/x/net/context" | |
"google.golang.org/grpc" | |
"google.golang.org/grpc/codes" | |
) | |
func main() { | |
q, err := newGCPPubSubQueue(context.TODO(), "gopherci-dev", "") | |
if err != nil { | |
log.Fatal(err) | |
} | |
go q.receive(context.TODO()) // receive messages in a background thread | |
// with just this call "GCPPubSubQueue: I am doing the work for" is printed once | |
q.queue() | |
// after a few moments add another message to the queue | |
time.Sleep(5 * time.Second) | |
// with this call and the previous "GCPPubSubQueue: I am doing the work for" | |
// is printed three times every time, i expect twice (but will accept that in some | |
// cases it could be redelivered multiple times. | |
q.queue() | |
runtime.Goexit() // block forever | |
} | |
// GCPPubSubQueue is a queue using Google Compute Platform's PubSub product. | |
type GCPPubSubQueue struct { | |
ctx context.Context // stop listening when this context is cancelled | |
topic *pubsub.Topic | |
subscription *pubsub.Subscription | |
} | |
func newGCPPubSubQueue(ctx context.Context, projectID, topicName string) (*GCPPubSubQueue, error) { | |
q := &GCPPubSubQueue{ctx: ctx} | |
client, err := pubsub.NewClient(ctx, projectID) | |
if err != nil { | |
return nil, fmt.Errorf("NewGCPPubSubQueue: could not create client: %v", err) | |
} | |
log.Print("NewGCPPubSubQueue: creating topic") | |
q.topic, err = client.CreateTopic(ctx, "test-topic") | |
if code := grpc.Code(err); code != codes.OK && code != codes.AlreadyExists { | |
return nil, fmt.Errorf("NewGCPPubSubQueue: could not create topic: %v", err) | |
} | |
log.Print("NewGCPPubSubQueue: creating subscription") | |
q.subscription, err = client.CreateSubscription(ctx, "test-subscription", q.topic, 0, nil) | |
if code := grpc.Code(err); code != codes.OK && code != codes.AlreadyExists { | |
return nil, fmt.Errorf("NewGCPPubSubQueue: could not create subscrption: %v", err) | |
} | |
// this successfilly limits concurrency but causes the issue | |
q.subscription.ReceiveSettings.MaxOutstandingMessages = 1 | |
return q, nil | |
} | |
// queue pushes a message onto the PubSub queue | |
func (q *GCPPubSubQueue) queue() error { | |
res := q.topic.Publish(q.ctx, &pubsub.Message{Data: []byte{0x01}}) | |
msgID, err := res.Get(q.ctx) | |
if err != nil { | |
log.Printf("GCPPubSubQueue: failed publishing message error: %v", err) | |
} | |
log.Println("GCPPubSubQueue: published a message with a message ID:", msgID) | |
return nil | |
} | |
// receive receives messages from the queue | |
func (q *GCPPubSubQueue) receive(ctx context.Context) { | |
err := q.subscription.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) { | |
log.Printf("GCPPubSubQueue: processing ID %v, published at %v", msg.ID, msg.PublishTime) | |
// whether i ack this before or after the sleep, the issue still occurs | |
msg.Ack() | |
log.Printf("GCPPubSubQueue: ack'd ID %v", msg.ID) | |
log.Println("GCPPubSubQueue: I am doing the work for", msg.ID) | |
time.Sleep(15 * time.Second) | |
log.Printf("GCPPubSubQueue: successfully processed ID %v", msg.ID) | |
}) | |
log.Printf("GCPPubSubQueue: could not receive on subscription: %v", err) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment