Create a gist now

Instantly share code, notes, and snippets.

What would you like to do?
juju-core/mstate/watcher
// New returns a new Watcher.
func New(logc *mgo.Collection) *Watcher
// A Watcher can watch any number of documents and collections for
// changes by monitoring a mgo/txn change log collection. The change
// log collection must have been created as a capped collection for
// this to work correctly.
type Watcher struct {
// private only
}
// A Note is sent whenever the given collection and document id were seen
// as part of a transaction that has been successfully applied.
// The document and revision number may not necessarily have changed.
// See the mgo/txn package documentation for details.
type Note struct {
C string // Collection name
Id interface{} // Document _id
Revno int64 // Revision number
}
// Add includes coll and id into the documents monitored by w. Any time
// the given document is seen as part of an applied transaction, a
// notification will be delivered to ch. Values sent to the channel must
// necessarily be consumed, or the whole watcher will block.
func (w *Watcher) Add(coll string, id interface{}, ch <-chan Note) {}
// Remove prevents w from sending further notifications about coll and id
// to the given channel. Other channels may still receive notifications
// about this document.
func (w *Watcher) Remove(coll string, id interface{}, ch <-chan Note) {}
// loop periodically queries the change log collection for new entries
// and delivers the monitored documents to the respective channels.
func (w *Watcher) loop() {
// The loop is based on this query:
//
// iter := logc.Find(nil).Batch(100).Sort("-$natural").Iter()
//
// It works with the following high-level steps:
//
// 1) On the very first iteration of the loop, pick the first item from
// iter as newest seen id and skip until (6)
// 2) Go over iter while the entry id doesn't match the newest seen id,
// append entries to buffer
// 3) Iterate over buffered entries in *reverse* order (oldest first)
// 4) Any entry wanted by one or more channels, deliver a Note for it
// 5) The same select that attempts to deliver must receive requests
// from Add and Remove, and process them (this will potentially
// cancel the delivery)
// 6) Once all changes are done, save the newest seen id (last item in
// the reverse iteration) and get into a second select that includes
// a time.After(rescanDelay) that must also allow for Add and Remove
// requests
// 7) Go to (2) once delay is done
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment