Skip to content

Instantly share code, notes, and snippets.

@ns-xliu
Last active August 6, 2021 18:14
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ns-xliu/c0ac06ea107efbdbb86f389f0a8b85ea to your computer and use it in GitHub Desktop.
Save ns-xliu/c0ac06ea107efbdbb86f389f0a8b85ea to your computer and use it in GitHub Desktop.
Transaction Events Delivered through Pub/Sub Lite Sample Code
// Copyright 2021 Netskope Inc
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"bytes"
"compress/gzip"
"context"
"flag"
"fmt"
"io/ioutil"
"log"
"sync/atomic"
"time"
"cloud.google.com/go/pubsub"
"cloud.google.com/go/pubsublite/pscompat"
)
// Sample Pub/Sub Lite Subscriber to Netskope Transaction Events.
func main() {
projectID := flag.String("project_id", "data-qe", "Cloud Project ID")
zone := flag.String("zone", "us-west2-b", "Cloud Zone where the topic resides, e.g. us-central1-a")
subscriptionID := flag.String("subscription_id", "test-subscription", "Existing Pub/Sub Lite subscription")
timeout := flag.Duration("timeout", 10000*time.Second, "The duration to receive messages")
flag.Parse()
subscriptionPath := fmt.Sprintf("projects/%s/locations/%s/subscriptions/%s", *projectID, *zone, *subscriptionID)
settings := pscompat.ReceiveSettings{
MaxOutstandingBytes: 10 * 1024 * 1024,
MaxOutstandingMessages: 1000,
}
// Create the subscriber client.
ctx := context.Background()
subscriber, err := pscompat.NewSubscriberClientWithSettings(ctx, subscriptionPath, settings)
if err != nil {
log.Fatalf("pscompat.NewSubscriberClientWithSettings error: %v", err)
}
listenTimeout, cancel := context.WithTimeout(ctx, *timeout)
defer cancel()
var receiveCount int32
// Receive blocks until the context is cancelled or an error occurs.
log.Printf("Listening on subscription: %s for time %v...\n", subscriptionPath, *timeout)
if err := subscriber.Receive(listenTimeout, func(ctx context.Context, msg *pubsub.Message) {
atomic.AddInt32(&receiveCount, 1)
metadata, err := pscompat.ParseMessageMetadata(msg.ID)
if err != nil {
log.Fatalf("Failed to parse %q: %v", msg.ID, err)
}
dt := time.Now().UTC()
log.Printf("Received #%d msg at %s (partition=%d, offset=%d, size=%d, msgID=%s)\n",
receiveCount, dt.Format("2006-01-02 15:04:05"), metadata.Partition, metadata.Offset, len(msg.Data), msg.ID)
log.Printf("Attribute Content-Encoding: %s\n", msg.Attributes["Content-Encoding"])
log.Printf("Attribute Log-Count: %s\n", msg.Attributes["Log-Count"])
log.Printf("Attribute Fields: %s\n", msg.Attributes["Fields"])
gr, _ := gzip.NewReader(bytes.NewBuffer(msg.Data))
txlog, _ := ioutil.ReadAll(gr)
log.Printf("Transaction Events Unzipped:\n%v\n", string(txlog))
msg.Ack()
}); err != nil {
log.Fatalf("SubscriberClient.Receive error: %v", err)
}
log.Printf("Received %d messages\n", receiveCount)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment