Skip to content

Instantly share code, notes, and snippets.

@prestonvasquez
Created September 15, 2023 21:51
Show Gist options
  • Save prestonvasquez/30f15e9f84c76a63e6bf818663a18309 to your computer and use it in GitHub Desktop.
Save prestonvasquez/30f15e9f84c76a63e6bf818663a18309 to your computer and use it in GitHub Desktop.
package main
import (
"context"
"sync"
"time"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)
func main() {
ctx := context.Background()
opts := options.Client().ApplyURI("mongodb://localhost:27017")
client, err := mongo.Connect(ctx, opts)
if err != nil {
panic(err)
}
collection := client.Database("test").Collection("coll")
pipeline := mongo.Pipeline{bson.D{{
Key: "$match",
Value: bson.D{
{Key: "fullDocument.env", Value: "x"},
{Key: "fullDocument.requestUUID", Value: "y"},
}},
}}
option := options.ChangeStream().SetFullDocument(options.UpdateLookup).SetMaxAwaitTime(5 * time.Second)
changeStream, err := collection.Watch(ctx, pipeline, option)
if err != nil {
panic(err)
}
defer func() {
if err := changeStream.Close(ctx); err != nil {
panic(err)
}
}()
doc := bson.D{{Key: "num", Value: 1}}
_, err = collection.InsertOne(ctx, doc)
if err != nil {
panic(err)
}
wg := sync.WaitGroup{}
wg.Add(2)
go func() {
defer wg.Done()
for changeStream.Next(context.Background()) {
}
}()
// Can't do this:
go func() {
defer wg.Done()
changeStream.Close(ctx)
}()
// Must do this:
//defer changeStream.Close(ctx)
wg.Wait()
}
@xkeyideal
Copy link

func Test_MongoWatch(t *testing.T) {
	clientOptions := options.Client().ApplyURI("mongodb://localhost:27017")
	clientOptions.ReadPreference = readpref.PrimaryPreferred()
	clientOptions.SetMaxPoolSize(12)
	clientOptions.SetMaxConnIdleTime(10 * time.Second)
	clientOptions.SetConnectTimeout(10 * time.Second)

	ctx, cancel := context.WithTimeout(context.Background(), constant.MongodbOpTimeout)
	defer cancel()
	client, err := mongo.Connect(ctx, clientOptions)
	if err != nil {
		panic(err)
	}

	ctx2, cancel2 := context.WithTimeout(context.Background(), constant.MongodbOpTimeout)
	defer cancel2()
	err = client.Ping(ctx2, readpref.PrimaryPreferred())
	if err != nil {
		panic(err)
	}

	collection := client.Database("test").Collection("test_coll")

	type runnerChangeEvent struct {
		OperationType string              `bson:"operationType"`
		ClusterTime   primitive.Timestamp `bson:"clusterTime"`
		FullDocument  any                 `bson:"fullDocument"`
	}

	watchFn := func(ctx context.Context, ch chan runnerChangeEvent) (*mongo.ChangeStream, error) {
		pipeline := mongo.Pipeline{bson.D{{
			Key: "$match",
			Value: bson.D{
				{Key: "fullDocument.env", Value: "dev"},
				//{Key: "fullDocument.requestUUID", Value: "y"},
			}},
		}}

		option := options.ChangeStream().SetFullDocument(options.UpdateLookup).SetMaxAwaitTime(5 * time.Second)

		changeStream, err := collection.Watch(ctx, pipeline, option)
		if err != nil {
			return nil, err
		}

		go func() {
			for changeStream.Next(context.Background()) {
				if changeStream.Err() != nil || changeStream.ID() == 0 {
					log.Println("change stream closed")
					break
				}

				ce := runnerChangeEvent{}
				changeStream.Decode(&ce)

				// https://www.mongodb.com/docs/manual/reference/change-events/
				// sdk上报的消息只可能存在insert或update
				if ce.OperationType != "insert" && ce.OperationType != "update" {
					continue
				}

				ch <- ce
			}
		}()

		return changeStream, nil
	}

	watchctx := context.Background()
	ch := make(chan runnerChangeEvent, 10)
	stream, err := watchFn(watchctx, ch)
	if err != nil {
		panic(err)
	}

	go func() {
		for i := 0; i < 10; i++ {
			doc := bson.D{
				{Key: "env", Value: "dev"},
				{Key: "num", Value: i + 1},
			}
			_, err = collection.InsertOne(context.Background(), doc)
			if err != nil {
				panic(err)
			}

			time.Sleep(2 * time.Second)
		}
	}()

	cnt := 0
loop:
	for {
		select {
		case resp := <-ch:
			log.Println(resp.FullDocument)
			cnt++
			if cnt >= 6 {
				break loop
			}
		}
	}

	stream.Close(watchctx)

	signals := make(chan os.Signal, 1)
	signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM, syscall.SIGKILL)

	log.Println(<-signals)
}

I want to break watch loop at any time. Please review my codes.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment