Skip to content

Instantly share code, notes, and snippets.

@yish91
Created February 6, 2020 12:34
Show Gist options
  • Save yish91/5f3dbe33f229a3f36f13ebd6491c706e to your computer and use it in GitHub Desktop.
Save yish91/5f3dbe33f229a3f36f13ebd6491c706e to your computer and use it in GitHub Desktop.
Change Stream Watcher in Go
package main
import (
"context"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/x/bsonx/bsoncore"
"log"
"strings"
)
func ChangeStreamWatcher(client *mongo.Client, db *mongo.Database) {
auditDatabase := "audit"
token := "token"
var pipeline mongo.Pipeline
ctx := context.Background()
opts := options.FindOne()
opts.SetSort(bson.D{{"$natural", -1}})
tokenDoc := &bsoncore.Document{}
err := client.Database(auditDatabase).Collection(token).FindOne(context.TODO(), bson.D{{}}, opts).Decode(&tokenDoc)
var cur *mongo.ChangeStream
if err == nil {
resumeToken := strings.Trim(tokenDoc.Lookup("_data").String(), "\"")
cso := options.ChangeStream()
cso.SetResumeAfter(bson.D{{"_data", resumeToken}})
cur, err = db.Watch(ctx, pipeline, cso)
if err != nil {
log.Println(err)
}
} else {
cur, err = db.Watch(ctx, pipeline)
if err != nil {
log.Println(err)
}
}
defer cur.Close(ctx)
for cur.Next(ctx) {
elem := &bsoncore.Document{}
if err := cur.Decode(elem); err != nil {
log.Println(err)
}
rawCollectionName := elem.Lookup("ns").Document().Lookup("coll").String()
collectionName := strings.Trim(rawCollectionName, "\"")
session, _ := client.StartSession()
_ = session.StartTransaction()
err = mongo.WithSession(ctx, session, func(sc mongo.SessionContext) error {
_, _ = client.Database(auditDatabase).Collection(collectionName).InsertOne(ctx, elem)
if err != nil {
_ = session.AbortTransaction(sc)
return err
}
_, err = client.Database(auditDatabase).Collection(token).InsertOne(ctx, cur.ResumeToken())
if err != nil {
_ = session.AbortTransaction(sc)
return err
}
_ = session.CommitTransaction(sc)
return nil
})
session.EndSession(ctx)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment