Created February 23, 2022 21:21
Mongo Change Stream in Golang
module mongo-changestream
go 1.17
require ( v0.0.0-20180203215853-4178557ae428 v1.8.3
package main
import (
type Todo struct {
Item string `bson:"item,omitempty"`
Done bool `bson:"done,omitempty"`
type DocumentKey struct {
ID primitive.ObjectID `bson:"_id"`
type ChangeID struct {
Token string `bson:"_data,omitempty"`
type Namespace struct {
Database string `bson:"db,omitempty"`
Collection string `bson:"coll,omitempty"`
type TruncArray struct {
Field string `bson:"field,omitempty"`
NewSize uint `bson:"newSize,omitempty"`
type UpdateDescription struct {
UpdatedFields bsoncore.Document `bson:"updatedFields,omitempty"`
RemovedFields []string `bson:"removedFields,omitempty"`
TruncatedArrays []TruncArray `bson:"truncatedArrays,omitempty"`
type SessionIdentifier struct {
ID string `bson:"id,omitempty"`
UID []byte `bson:"uid,omitempty"`
// ChangeEvent is an example change event struct.
// It does not include all possible change event fields.
// You should consult the change event documentation for more info:
type ChangeEvent struct {
ChangeID ChangeID `bson:"_id"`
OperationType string `bson:"operationType"`
ClusterTime primitive.Timestamp `bson:"clusterTime,omitempty"`
FullDocument *Todo `bson:"fullDocument,omitempty"`
UpdateDescription *UpdateDescription `bson:"updateDescription,omitempty"`
DocumentKey *DocumentKey `bson:"documentKey,omitempty"`
Namespace *Namespace `bson:"ns,omitempty"`
To *Namespace `bson:"to,omitempty"`
TransactionNumber *int64 `bson:"txnNumber,omitempty"`
SessionIdentifier *SessionIdentifier `bson:"lsid,omitempty"`
ServiceName string `bson:"service,omitempty"`
// Change works with replicas only. No localhost/standalone
const dbUrl = "mongodb+srv://"
func main() {
// Set client options
clientOptions := options.Client().ApplyURI(dbUrl)
// Connect to MongoDB
client, err := mongo.Connect(context.TODO(), clientOptions)
if err != nil {
// Check the connection
err = client.Ping(context.TODO(), nil)
if err != nil {
fmt.Println("Connected to MongoDB!")
// Get a handle for your collection
databaseName := "test"
collectionName := "todo"
// Watches the todo collection and prints out any changed documents
go watch(client, databaseName, collectionName)
// Inserts random todo items at two second intervals
// Uncoment the following lines, then make additional change to MongoDB
for {
time.Sleep(2 * time.Second)
func insert(collection *mongo.Collection) {
for {
t := Todo{
Item: fake.EmailSubject(),
Done: fake.Day()%2 == 0,
fmt.Printf("New Todo: %+v", t)
_, err := collection.InsertOne(context.TODO(), t)
if err != nil {
time.Sleep(2 * time.Second)
func watch(client *mongo.Client, dbName string, colName string) {
auditDatabase := "audit"
pipeline := mongo.Pipeline{
// Do some aggreation
{"$addFields", bson.M{"test": "myNewField"}},
ctx := context.Background()
opts := options.FindOne()
opts.SetSort(bson.D{{"$natural", -1}})
// Get resume token
lastAuditDoc := &ChangeEvent{}
err := client.Database(auditDatabase).Collection(colName).FindOne(
ctx, bson.D{{"ns.coll", colName}}, opts).Decode(&lastAuditDoc)
cso := options.ChangeStream()
if err == nil {
resumeToken := lastAuditDoc.ChangeID.Token
cso.SetResumeAfter(bson.D{{"_data", resumeToken}})
fmt.Println("Previous Resume Token: ", resumeToken)
// Start/Resume collection watch
var cur *mongo.ChangeStream
cur, err = client.Database(dbName).Watch(ctx, pipeline, cso)
if err != nil {
defer cur.Close(ctx)
// Get change stream events
for cur.Next(ctx) {
elem := &ChangeEvent{}
if err := cur.Decode(elem); err != nil {
fmt.Printf("New Change Event: %+v\n", elem)
collectionName := elem.Namespace.Collection
session, _ := client.StartSession()
_ = session.StartTransaction()
err = mongo.WithSession(ctx, session, func(sc mongo.SessionContext) error {
_, _ = client.Database(auditDatabase).Collection(collectionName).InsertOne(ctx, elem)
if err != nil {
_ = session.AbortTransaction(sc)
return err
_ = session.CommitTransaction(sc)
return nil
