Skip to content

Instantly share code, notes, and snippets.

@kwv
Last active March 15, 2018 02:28
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kwv/1320ca3ec590a8117262c675afa787a0 to your computer and use it in GitHub Desktop.
Save kwv/1320ca3ec590a8117262c675afa787a0 to your computer and use it in GitHub Desktop.
RXJS couchdb sync
// https://forum.ionicframework.com/t/ionic-2-best-way-to-use-observables-and-local-storage-couchdb-with-custom-server-db/51133/3
constructor() {
//observable that is notified of new records when connected and authenticated
this.pendingSync$ = new Observable(observer =>
this._pendingSyncObserver = observer).share();
//observable of all pending records for user
this.recordStoreObserver = new BehaviorSubject(undefined);
this.db = new pouch(RECORD_DB_NAME);
}
...
private notifyObservers = () => {
this.recordStoreObserver.next(this.pending);
}
//Helper to update array when a record is changed/inserted
private onUpdatedOrInserted = (newDoc) => {
const index = this.search(this.pending, newDoc._id);
const doc = this.pending[index];
if (doc && doc._id === newDoc._id) { // update
this.pending[index] = newDoc;
} else { // insert
this.pending.unshift(newDoc);
}
if ((RecordHelper.readyToSubmit(newDoc) && this._pendingSyncObserver)) {
this._pendingSyncObserver.next(newDoc);
}
if (RecordHelper.isFinalState(newDoc)) {
setTimeout(() => {
this.delete(newDoc);
}, RecordHelper.getPeriodForDoc(newDoc));
}
this.notifyObservers();
}
constructor(private networkMonitor: NetworkMonitor, private userData: UserData,
private repo: RecordLocalRepository) {
// monitors network connectivity
let networkSubject = this.networkMonitor.subject
.distinctUntilChanged()
.map((state) => { return state == NetworkStates.CONNECTED; });
// monitors authentication
let loginSubject = this.userData.subject
.distinctUntilChanged()
.map((state) => { return state == LoginStates.LOGGED_IN; });
// combines login && network into new observable. controls when we send
// records
this.readyForWork$ = Observable.combineLatest(
loginSubject,
networkSubject,
function(s1, s2) { return s1 && s2; }
)
.distinctUntilChanged()
.do((res) => console.log("readyforwork:::" + res))
.subscribe((x) => { this.startWork(x); }, (error) => {
console.log("observable error " + error);
});
}
// if ready, send changes when they happen
// else unsubscribe from the observer
private startWork = (ready: boolean) => {
if (ready) {
this.workWatcher = this.repo.pendingSync$
//!TODO test this -- should buffer any records into 10 second chunks
.bufferTime(10000)
.filter(list => list !== undefined)
.filter(list => list.length > 0)
.subscribe((res) => this.doSync(res), (error) => {
console.log("observable error " + error);
});
this.doSync();
} else if (this.workWatcher) {
this.workWatcher.unsubscribe();
}
}
//Sends all ready docs to repo API
private findWork = (items?) => {
if (items) {
if (items.length) {
return Promise.resolve(items);
} else {
return Promise.resolve(new Array(items));
}
} else {
return this.repo.fetchDocs((doc, emit) => {
if (RecordHelper.readyToSubmit(doc)) {
emit(doc);
}
});
}
}
private doSync = (items?) => {
this.findWork(items)
.then((records) => {
if (records.length > 0) {
/*
removed but something like this
*/
const requestoptions = new RequestOptions({
method: RequestMethod.Post,
url: url,
headers: headers,
body: JSON.stringify(records)
});
return this.http.request(new Request(requestoptions))
.timeout(10000, new Error("Unable to Connect"))
.map((res) => res.json());
}
});
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment