Skip to content

Instantly share code, notes, and snippets.

@sugamon
Created January 11, 2020 07:16
Show Gist options
  • Save sugamon/2ddc5083de6c6841648fd5c9fa648574 to your computer and use it in GitHub Desktop.
Save sugamon/2ddc5083de6c6841648fd5c9fa648574 to your computer and use it in GitHub Desktop.
pubsub apiv1 subscriber
package pubsub
import (
"context"
pubsub "cloud.google.com/go/pubsub/apiv1"
"google.golang.org/api/option"
pubsubpb "google.golang.org/genproto/googleapis/pubsub/v1"
)
type Subscriber struct {
Client *pubsub.SubscriberClient
}
type PullRequestConfig struct {
// フォーマットは`projects/{project}/subscriptions/{sub}`
Subscription string
// リクエストに対して返されるメッセージの最大数。上限は1000
MaxMessages int32
}
type ProcessMessage = func(context.Context, *pubsubpb.ReceivedMessage) error
func NewSubscriber(ctx context.Context, opts ...option.ClientOption) (*Subscriber, error) {
client, err := pubsub.NewSubscriberClient(ctx, opts...)
if err != nil {
return nil, err
}
return &Subscriber{
Client: client,
}, nil
}
// Pull 1回だけメッセージをPullして指定の処理(ProcessMessage関数)を実行する
func (s *Subscriber) Pull(ctx context.Context, config *PullRequestConfig, procMsg ProcessMessage) error {
defer s.Client.Close()
req := &pubsubpb.PullRequest{
Subscription: config.Subscription,
MaxMessages: config.MaxMessages,
}
res, err := s.Client.Pull(ctx, req)
if err != nil {
return err
}
for _, rMsg := range res.ReceivedMessages {
if err := procMsg(ctx, rMsg); err != nil {
return err
}
// 確認応答
err := s.Client.Acknowledge(ctx, &pubsubpb.AcknowledgeRequest{
Subscription: config.Subscription,
AckIds: []string{rMsg.AckId},
})
if err != nil {
return err
}
}
return nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment