Skip to content

Instantly share code, notes, and snippets.

@tfogo
Created March 26, 2019 11:24
Show Gist options
  • Save tfogo/90c9ef145364f5ee84d051cbdf82c4c6 to your computer and use it in GitHub Desktop.
Save tfogo/90c9ef145364f5ee84d051cbdf82c4c6 to your computer and use it in GitHub Desktop.
Example use of Change Streams in Go
package main
import (
"context"
"fmt"
"log"
"os"
"time"
fuzz "github.com/google/gofuzz" // Used for creating random todo items
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)
type todo struct {
Item string
Done bool
}
type documentKey struct {
ID primitive.ObjectID `bson:"_id"`
}
type changeID struct {
Data string `bson:"_data"`
}
type namespace struct {
Db string `bson:"db"`
Coll string `bson:"coll"`
}
// This is an example change event struct for inserts.
// It does not include all possible change event fields.
// You should consult the change event documentation for more info:
// https://docs.mongodb.com/manual/reference/change-events/
type changeEvent struct {
ID changeID `bson:"_id"`
OperationType string `bson:"operationType"`
ClusterTime primitive.Timestamp `bson:"clusterTime"`
FullDocument todo `bson:"fullDocument"`
DocumentKey documentKey `bson:"documentKey"`
Ns namespace `bson:"ns"`
}
func main() {
// Set client options
clientOptions := options.Client().ApplyURI(os.Getenv("MONGODB_URI"))
// Connect to MongoDB
client, err := mongo.Connect(context.TODO(), clientOptions)
if err != nil {
log.Fatal(err)
}
// Check the connection
err = client.Ping(context.TODO(), nil)
if err != nil {
log.Fatal(err)
}
fmt.Println("Connected to MongoDB!")
// Get a handle for your collection
collection := client.Database("test").Collection("todo")
// Watches the todo collection and prints out any changed documents
go watch(collection)
// Inserts random todo items at two second intervals
insert(collection)
}
func watch(collection *mongo.Collection) {
// Watch the todo collection
cs, err := collection.Watch(context.TODO(), mongo.Pipeline{})
if err != nil {
fmt.Println(err.Error())
}
// Whenever there is a new change event, decode the change event and print some information about it
for cs.Next(context.TODO()) {
var changeEvent changeEvent
err := cs.Decode(&changeEvent)
if err != nil {
log.Fatal(err)
}
fmt.Printf("Change Event: %v\nTodo Item:\n %v\nDone:\n %v\n\n", changeEvent.OperationType, changeEvent.FullDocument.Item, changeEvent.FullDocument.Done)
}
}
func insert(collection *mongo.Collection) {
f := fuzz.New()
for {
t := todo{}
f.Fuzz(&t.Item)
f.Fuzz(&t.Done)
_, err := collection.InsertOne(context.TODO(), t)
if err != nil {
log.Fatal(err)
}
time.Sleep(2 * time.Second)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment