Skip to content

Instantly share code, notes, and snippets.

View WatchChangeAggregator.java
@Override
public void handleRemoteEvent(RemoteEvent event) {
...
ImmutableSortedMap<DocumentKey, Document> changes = localStore.applyRemoteEvent(event);
emitNewSnapsAndNotifyLocalStore(changes, event);
}
View WatchChange.java
watchStream = datastore.createWatchStream(
new WatchStream.Callback() {
...
@Override
public void onWatchChange(SnapshotVersion snapshotVersion, WatchChange watchChange) {
handleWatchChange(snapshotVersion, watchChange);
}
...
})
View SyncEngine.java
public void writeMutations(List<Mutation> mutations, TaskCompletionSource<Void> userTask) {
// Write mutations locally using LocalStore
LocalWriteResult result = localStore.writeLocally(mutations);
...
// Dispatch changes to EventManager who updates active query listeners
emitNewSnapsAndNotifyLocalStore(result.getChanges(), /*remoteEvent=*/ null);
// Tell RemoteStore there're mutations to be sent to the backend
remoteStore.fillWritePipeline();
}
View EventManager.java
public int addQueryListener(QueryListener queryListener) {
Query query = queryListener.getQuery();
QueryListenersInfo queryInfo = queries.get(query);
boolean firstListen = queryInfo == null;
if (firstListen) {
// QueryListenersInfo is only created if no identical query is registered
queryInfo = new QueryListenersInfo();
queries.put(query, queryInfo);
}
View AndroidConnectivityMonitor.java
public void fillWritePipeline() {
...
while (canAddToWritePipeline()) {
MutationBatch batch = localStore.getNextMutationBatch(lastBatchIdRetrieved);
...
addToWritePipeline(batch);
}
...
}
View DocumentCache.java
// MemoryRemoteDocumentCache.java
while (iterator.hasNext()) {
...
if (!query.matches(doc)) {
continue;
}
result = result.insert(doc.getKey(), doc.clone());
}
View LocalStore.java
// LocalDocumentsView.java
Document getDocument(DocumentKey key) {
// 1. Get all mutations for the given document
List<MutationBatch> batches = mutationQueue.getAllMutationBatchesAffectingDocumentKey(key);
return getDocument(key, batches);
}
private Document getDocument(DocumentKey key, List<MutationBatch> inBatches) {
// 2. Fetch the remote version of the given document from the local cache
View Persistence.java
abstract MutationQueue getMutationQueue(User user);
abstract TargetCache getTargetCache();
abstract RemoteDocumentCache getRemoteDocumentCache();
abstract IndexManager getIndexManager();
abstract BundleCache getBundleCache();
abstract DocumentOverlay getDocumentOverlay(User user);
abstract void runTransaction(String action, Runnable operation);
// other methods..
View MutationQueue.java
class MemoryMutationQueue {
private final List<MutationBatch> queue;
@Override
public boolean isEmpty() {
return queue.isEmpty();
}
}
class SQLiteMutationQueue {
View ViewSnapshot#getDocuments.java
@NonNull
public List<DocumentSnapshot> getDocuments() {
List<DocumentSnapshot> res = new ArrayList<>(snapshot.getDocuments().size());
for (Document doc : snapshot.getDocuments()) {
res.add(convertDocument(doc));
}
return res;
}