| // New returns a new Watcher. | |
| func New(logc *mgo.Collection) *Watcher | |
| // A Watcher can watch any number of documents and collections for | |
| // changes by monitoring a mgo/txn change log collection. The change | |
| // log collection must have been created as a capped collection for | |
| // this to work correctly. | |
| type Watcher struct { | |
| // private only | |
| } | |
| // A Note is sent whenever the given collection and document id were seen | |
| // as part of a transaction that has been successfully applied. | |
| // The document and revision number may not necessarily have changed. | |
| // See the mgo/txn package documentation for details. | |
| type Note struct { | |
| C string // Collection name | |
| Id interface{} // Document _id | |
| Revno int64 // Revision number | |
| } | |
| // Add includes coll and id into the documents monitored by w. Any time | |
| // the given document is seen as part of an applied transaction, a | |
| // notification will be delivered to ch. Values sent to the channel must | |
| // necessarily be consumed, or the whole watcher will block. | |
| func (w *Watcher) Add(coll string, id interface{}, ch <-chan Note) {} | |
| // Remove prevents w from sending further notifications about coll and id | |
| // to the given channel. Other channels may still receive notifications | |
| // about this document. | |
| func (w *Watcher) Remove(coll string, id interface{}, ch <-chan Note) {} | |
| // loop periodically queries the change log collection for new entries | |
| // and delivers the monitored documents to the respective channels. | |
| func (w *Watcher) loop() { | |
| // The loop is based on this query: | |
| // | |
| // iter := logc.Find(nil).Batch(100).Sort("-$natural").Iter() | |
| // | |
| // It works with the following high-level steps: | |
| // | |
| // 1) On the very first iteration of the loop, pick the first item from | |
| // iter as newest seen id and skip until (6) | |
| // 2) Go over iter while the entry id doesn't match the newest seen id, | |
| // append entries to buffer | |
| // 3) Iterate over buffered entries in *reverse* order (oldest first) | |
| // 4) Any entry wanted by one or more channels, deliver a Note for it | |
| // 5) The same select that attempts to deliver must receive requests | |
| // from Add and Remove, and process them (this will potentially | |
| // cancel the delivery) | |
| // 6) Once all changes are done, save the newest seen id (last item in | |
| // the reverse iteration) and get into a second select that includes | |
| // a time.After(rescanDelay) that must also allow for Add and Remove | |
| // requests | |
| // 7) Go to (2) once delay is done | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment