Created
February 23, 2022 21:21
-
-
Save chumaumenze/b8c73200deea6aaddedc8013dd1af11a to your computer and use it in GitHub Desktop.
Mongo Change Stream in Golang
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module mongo-changestream | |
go 1.17 | |
require ( | |
github.com/icrowley/fake v0.0.0-20180203215853-4178557ae428 | |
go.mongodb.org/mongo-driver v1.8.3 | |
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"context" | |
"fmt" | |
"log" | |
"time" | |
"github.com/icrowley/fake" | |
"go.mongodb.org/mongo-driver/bson" | |
"go.mongodb.org/mongo-driver/bson/primitive" | |
"go.mongodb.org/mongo-driver/mongo" | |
"go.mongodb.org/mongo-driver/mongo/options" | |
"go.mongodb.org/mongo-driver/x/bsonx/bsoncore" | |
) | |
type Todo struct { | |
Item string `bson:"item,omitempty"` | |
Done bool `bson:"done,omitempty"` | |
} | |
type DocumentKey struct { | |
ID primitive.ObjectID `bson:"_id"` | |
} | |
type ChangeID struct { | |
Token string `bson:"_data,omitempty"` | |
} | |
type Namespace struct { | |
Database string `bson:"db,omitempty"` | |
Collection string `bson:"coll,omitempty"` | |
} | |
type TruncArray struct { | |
Field string `bson:"field,omitempty"` | |
NewSize uint `bson:"newSize,omitempty"` | |
} | |
type UpdateDescription struct { | |
UpdatedFields bsoncore.Document `bson:"updatedFields,omitempty"` | |
RemovedFields []string `bson:"removedFields,omitempty"` | |
TruncatedArrays []TruncArray `bson:"truncatedArrays,omitempty"` | |
} | |
type SessionIdentifier struct { | |
ID string `bson:"id,omitempty"` | |
UID []byte `bson:"uid,omitempty"` | |
} | |
// ChangeEvent is an example change event struct. | |
// It does not include all possible change event fields. | |
// You should consult the change event documentation for more info: | |
// https://docs.mongodb.com/manual/reference/change-events/ | |
type ChangeEvent struct { | |
ChangeID ChangeID `bson:"_id"` | |
OperationType string `bson:"operationType"` | |
ClusterTime primitive.Timestamp `bson:"clusterTime,omitempty"` | |
FullDocument *Todo `bson:"fullDocument,omitempty"` | |
UpdateDescription *UpdateDescription `bson:"updateDescription,omitempty"` | |
DocumentKey *DocumentKey `bson:"documentKey,omitempty"` | |
Namespace *Namespace `bson:"ns,omitempty"` | |
To *Namespace `bson:"to,omitempty"` | |
TransactionNumber *int64 `bson:"txnNumber,omitempty"` | |
SessionIdentifier *SessionIdentifier `bson:"lsid,omitempty"` | |
ServiceName string `bson:"service,omitempty"` | |
} | |
// Change works with replicas only. No localhost/standalone | |
const dbUrl = "mongodb+srv://username:password@mycluster.mongodb.net/test" | |
func main() { | |
// Set client options | |
clientOptions := options.Client().ApplyURI(dbUrl) | |
// Connect to MongoDB | |
client, err := mongo.Connect(context.TODO(), clientOptions) | |
if err != nil { | |
log.Fatal(err) | |
} | |
// Check the connection | |
err = client.Ping(context.TODO(), nil) | |
if err != nil { | |
log.Fatal(err) | |
} | |
fmt.Println("Connected to MongoDB!") | |
// Get a handle for your collection | |
databaseName := "test" | |
collectionName := "todo" | |
// Watches the todo collection and prints out any changed documents | |
go watch(client, databaseName, collectionName) | |
// Inserts random todo items at two second intervals | |
insert(client.Database("test").Collection("todo2")) | |
// Uncoment the following lines, then make additional change to MongoDB | |
for { | |
fmt.Println("Waiting...!") | |
time.Sleep(2 * time.Second) | |
} | |
} | |
func insert(collection *mongo.Collection) { | |
for { | |
t := Todo{ | |
Item: fake.EmailSubject(), | |
Done: fake.Day()%2 == 0, | |
} | |
fmt.Printf("New Todo: %+v", t) | |
_, err := collection.InsertOne(context.TODO(), t) | |
if err != nil { | |
log.Fatal(err) | |
} | |
time.Sleep(2 * time.Second) | |
} | |
} | |
func watch(client *mongo.Client, dbName string, colName string) { | |
auditDatabase := "audit" | |
pipeline := mongo.Pipeline{ | |
bson.D{ | |
// Do some aggreation | |
{"$addFields", bson.M{"test": "myNewField"}}, | |
}, | |
} | |
ctx := context.Background() | |
opts := options.FindOne() | |
opts.SetSort(bson.D{{"$natural", -1}}) | |
// Get resume token | |
lastAuditDoc := &ChangeEvent{} | |
err := client.Database(auditDatabase).Collection(colName).FindOne( | |
ctx, bson.D{{"ns.coll", colName}}, opts).Decode(&lastAuditDoc) | |
cso := options.ChangeStream() | |
if err == nil { | |
resumeToken := lastAuditDoc.ChangeID.Token | |
cso.SetResumeAfter(bson.D{{"_data", resumeToken}}) | |
fmt.Println("Previous Resume Token: ", resumeToken) | |
} | |
// Start/Resume collection watch | |
var cur *mongo.ChangeStream | |
cur, err = client.Database(dbName).Watch(ctx, pipeline, cso) | |
if err != nil { | |
log.Println(err) | |
} | |
defer cur.Close(ctx) | |
// Get change stream events | |
for cur.Next(ctx) { | |
elem := &ChangeEvent{} | |
if err := cur.Decode(elem); err != nil { | |
log.Println(err) | |
} | |
fmt.Printf("New Change Event: %+v\n", elem) | |
collectionName := elem.Namespace.Collection | |
session, _ := client.StartSession() | |
_ = session.StartTransaction() | |
err = mongo.WithSession(ctx, session, func(sc mongo.SessionContext) error { | |
_, _ = client.Database(auditDatabase).Collection(collectionName).InsertOne(ctx, elem) | |
if err != nil { | |
_ = session.AbortTransaction(sc) | |
return err | |
} | |
_ = session.CommitTransaction(sc) | |
return nil | |
}) | |
session.EndSession(ctx) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment