Skip to content

Instantly share code, notes, and snippets.

@chumaumenze
Created February 23, 2022 21:21
Show Gist options
  • Save chumaumenze/b8c73200deea6aaddedc8013dd1af11a to your computer and use it in GitHub Desktop.
Save chumaumenze/b8c73200deea6aaddedc8013dd1af11a to your computer and use it in GitHub Desktop.
Mongo Change Stream in Golang
module mongo-changestream
go 1.17
require (
github.com/icrowley/fake v0.0.0-20180203215853-4178557ae428
go.mongodb.org/mongo-driver v1.8.3
)
package main
import (
"context"
"fmt"
"log"
"time"
"github.com/icrowley/fake"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/x/bsonx/bsoncore"
)
type Todo struct {
Item string `bson:"item,omitempty"`
Done bool `bson:"done,omitempty"`
}
type DocumentKey struct {
ID primitive.ObjectID `bson:"_id"`
}
type ChangeID struct {
Token string `bson:"_data,omitempty"`
}
type Namespace struct {
Database string `bson:"db,omitempty"`
Collection string `bson:"coll,omitempty"`
}
type TruncArray struct {
Field string `bson:"field,omitempty"`
NewSize uint `bson:"newSize,omitempty"`
}
type UpdateDescription struct {
UpdatedFields bsoncore.Document `bson:"updatedFields,omitempty"`
RemovedFields []string `bson:"removedFields,omitempty"`
TruncatedArrays []TruncArray `bson:"truncatedArrays,omitempty"`
}
type SessionIdentifier struct {
ID string `bson:"id,omitempty"`
UID []byte `bson:"uid,omitempty"`
}
// ChangeEvent is an example change event struct.
// It does not include all possible change event fields.
// You should consult the change event documentation for more info:
// https://docs.mongodb.com/manual/reference/change-events/
type ChangeEvent struct {
ChangeID ChangeID `bson:"_id"`
OperationType string `bson:"operationType"`
ClusterTime primitive.Timestamp `bson:"clusterTime,omitempty"`
FullDocument *Todo `bson:"fullDocument,omitempty"`
UpdateDescription *UpdateDescription `bson:"updateDescription,omitempty"`
DocumentKey *DocumentKey `bson:"documentKey,omitempty"`
Namespace *Namespace `bson:"ns,omitempty"`
To *Namespace `bson:"to,omitempty"`
TransactionNumber *int64 `bson:"txnNumber,omitempty"`
SessionIdentifier *SessionIdentifier `bson:"lsid,omitempty"`
ServiceName string `bson:"service,omitempty"`
}
// Change works with replicas only. No localhost/standalone
const dbUrl = "mongodb+srv://username:password@mycluster.mongodb.net/test"
func main() {
// Set client options
clientOptions := options.Client().ApplyURI(dbUrl)
// Connect to MongoDB
client, err := mongo.Connect(context.TODO(), clientOptions)
if err != nil {
log.Fatal(err)
}
// Check the connection
err = client.Ping(context.TODO(), nil)
if err != nil {
log.Fatal(err)
}
fmt.Println("Connected to MongoDB!")
// Get a handle for your collection
databaseName := "test"
collectionName := "todo"
// Watches the todo collection and prints out any changed documents
go watch(client, databaseName, collectionName)
// Inserts random todo items at two second intervals
insert(client.Database("test").Collection("todo2"))
// Uncoment the following lines, then make additional change to MongoDB
for {
fmt.Println("Waiting...!")
time.Sleep(2 * time.Second)
}
}
func insert(collection *mongo.Collection) {
for {
t := Todo{
Item: fake.EmailSubject(),
Done: fake.Day()%2 == 0,
}
fmt.Printf("New Todo: %+v", t)
_, err := collection.InsertOne(context.TODO(), t)
if err != nil {
log.Fatal(err)
}
time.Sleep(2 * time.Second)
}
}
func watch(client *mongo.Client, dbName string, colName string) {
auditDatabase := "audit"
pipeline := mongo.Pipeline{
bson.D{
// Do some aggreation
{"$addFields", bson.M{"test": "myNewField"}},
},
}
ctx := context.Background()
opts := options.FindOne()
opts.SetSort(bson.D{{"$natural", -1}})
// Get resume token
lastAuditDoc := &ChangeEvent{}
err := client.Database(auditDatabase).Collection(colName).FindOne(
ctx, bson.D{{"ns.coll", colName}}, opts).Decode(&lastAuditDoc)
cso := options.ChangeStream()
if err == nil {
resumeToken := lastAuditDoc.ChangeID.Token
cso.SetResumeAfter(bson.D{{"_data", resumeToken}})
fmt.Println("Previous Resume Token: ", resumeToken)
}
// Start/Resume collection watch
var cur *mongo.ChangeStream
cur, err = client.Database(dbName).Watch(ctx, pipeline, cso)
if err != nil {
log.Println(err)
}
defer cur.Close(ctx)
// Get change stream events
for cur.Next(ctx) {
elem := &ChangeEvent{}
if err := cur.Decode(elem); err != nil {
log.Println(err)
}
fmt.Printf("New Change Event: %+v\n", elem)
collectionName := elem.Namespace.Collection
session, _ := client.StartSession()
_ = session.StartTransaction()
err = mongo.WithSession(ctx, session, func(sc mongo.SessionContext) error {
_, _ = client.Database(auditDatabase).Collection(collectionName).InsertOne(ctx, elem)
if err != nil {
_ = session.AbortTransaction(sc)
return err
}
_ = session.CommitTransaction(sc)
return nil
})
session.EndSession(ctx)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment