|
#!/usr/bin/env babel-node |
|
|
|
import Rx from 'rx' |
|
import log from 'better-log' |
|
import fetch from 'node-fetch' |
|
|
|
log.setConfig({depth: null}) |
|
|
|
// fake server |
|
|
|
let responsesIndex = 0 |
|
const responses = [ |
|
{ |
|
users: [ |
|
{id: '1', email: 'user1@gmail.com'}, |
|
{id: '2', email: 'user2@gmail.com'} |
|
] |
|
}, |
|
{ |
|
setEmail: {id: '2', email: 'user2@yahoo.com'} |
|
}, |
|
{ |
|
createUser: {id: '3', email: 'user3@gmail.com'} |
|
}, |
|
{ |
|
users: [ |
|
{id: '1', email: 'user1@gmail.com'}, |
|
{id: '2', email: 'user2@yahoo.com'}, |
|
{id: '3', email: 'user3@gmail.com'} |
|
] |
|
}, |
|
{ |
|
deleteUser: true |
|
} |
|
] |
|
|
|
function nextResponse(query) { |
|
return responses[responsesIndex++] |
|
} |
|
|
|
// helpers |
|
|
|
function traversal(tree, cb) { |
|
if (!tree) { |
|
return |
|
} |
|
|
|
if (Array.isArray(tree)) { |
|
tree = tree.map(t => traversal(t, cb)) |
|
|
|
} else if (typeof tree === 'object') { |
|
tree = Object.keys(tree) |
|
.reduce((acc, key) => { |
|
const t = tree[key] |
|
|
|
if (typeof t === 'object') { |
|
acc[key] = traversal(t, cb) |
|
} else { |
|
acc[key] = t |
|
} |
|
|
|
return acc |
|
}, {}) |
|
} |
|
|
|
tree = cb(tree) || tree |
|
|
|
return tree |
|
} |
|
|
|
function createArrayObservable(array) { |
|
if (array.length === 0) { |
|
return new Rx.Observable.just(array) |
|
} |
|
|
|
return Rx.Observable.combineLatest( |
|
...array.map(value => { |
|
if (value instanceof Rx.Observable) { |
|
return value |
|
} else { |
|
return Rx.Observable.just(value) |
|
} |
|
}), |
|
(...array) => array.filter(i => i !== undefined) |
|
) |
|
} |
|
|
|
function mergeObservableWithObject(object$, object) { |
|
const keys = Object.keys(object) |
|
|
|
const observableKeys = keys |
|
.filter(key => object[key] instanceof Rx.Observable) |
|
|
|
const staticKeys = keys |
|
.filter(key => observableKeys.indexOf(key) < 0) |
|
|
|
if (object$) { |
|
const prev = object$.getValue() |
|
const next = staticKeys |
|
.reduce((acc, key) => ( |
|
acc[key] = object[key], |
|
acc |
|
), {}) |
|
|
|
let equals = staticKeys |
|
.every(key => next[key] === prev[key]) |
|
|
|
if (!equals) { |
|
object$.onNext({...prev, ...next}) |
|
} |
|
} else { |
|
object$ = new Rx.BehaviorSubject( |
|
staticKeys |
|
.reduce((acc, key) => ( |
|
acc[key] = object[key], |
|
acc |
|
), {}) |
|
) |
|
} |
|
|
|
const combinedObject$ = Rx.Observable.combineLatest( |
|
object$, |
|
...observableKeys.map(key => object[key]), |
|
(object, ...array) => { |
|
if (!object) { |
|
return |
|
} |
|
|
|
return observableKeys |
|
.reduce((acc, key, index) => ( |
|
acc[key] = array[index], |
|
acc |
|
), object) |
|
} |
|
) |
|
|
|
return {object$, combinedObject$} |
|
} |
|
|
|
// relay |
|
|
|
const Relay = { |
|
_knowledge: {}, |
|
_querySubscribtions: [], |
|
|
|
async fetch(query) { |
|
const response = await this._runQuery(query) |
|
|
|
const observable = this._handleQueryResponse(response) |
|
|
|
const subscribtion = new QuerySubscribtion(query, observable, () => { |
|
this._querySubscribtions = this._querySubscribtions |
|
.filter(s => s !== subscribtion) |
|
}) |
|
|
|
this._querySubscribtions.push(subscribtion) |
|
|
|
return subscribtion |
|
}, |
|
|
|
async update(mutation) { |
|
if (mutation instanceof UpdateMutation) { |
|
await this._handleUpdateMutation(mutation) |
|
} else if (mutation instanceof DeleteMutation) { |
|
await this._handleDeleteMutation(mutation) |
|
} else if (mutation instanceof CreateMutation) { |
|
await this._handleCreateMutation(mutation) |
|
} else { |
|
throw new Error() |
|
} |
|
|
|
if (!mutation.getError()) { |
|
await* this._querySubscribtions |
|
.filter(s => mutation.isAffected(s.getQuery())) |
|
.map(s => this._reloadSubscribtion(s)) |
|
} |
|
|
|
return mutation |
|
}, |
|
|
|
async _runQuery(body) { |
|
await new Promise(resolve => setTimeout(resolve, 100)) |
|
|
|
return nextResponse(body) |
|
}, |
|
|
|
_handleQueryResponse(data) { |
|
const observerTree = traversal(data, obj => { |
|
if (Array.isArray(obj)) { |
|
return createArrayObservable(obj) |
|
|
|
} else if (obj.id) { |
|
const {object$, combinedObject$} = mergeObservableWithObject(this._knowledge[obj.id], obj) |
|
|
|
this._knowledge[obj.id] = object$ |
|
|
|
return combinedObject$ |
|
|
|
} else { |
|
return mergeObservableWithObject(undefined, obj).combinedObject$ |
|
} |
|
}) |
|
|
|
return observerTree |
|
}, |
|
|
|
async _handleUpdateMutation(mutation) { |
|
const optimisticReferences = mutation.getOptimisticReferences() |
|
const previous = [] |
|
|
|
for (const obj of optimisticReferences) { |
|
if (this._knowledge[obj.id]) { |
|
previous.push(this._knowledge[obj.id].getValue()) |
|
mergeObservableWithObject(this._knowledge[obj.id], obj) |
|
} |
|
} |
|
|
|
try { |
|
const response = await this._runQuery(mutation.getQuery()) |
|
|
|
mutation.setResponse(response) |
|
} catch (error) { |
|
for (const obj of previous) { |
|
this._knowledge[obj.id].onNext(obj) |
|
} |
|
|
|
mutation.setError(error) |
|
|
|
return |
|
} |
|
|
|
const references = mutation.getReferences() |
|
|
|
for (const obj of references) { |
|
const {object$, combinedObject$} = mergeObservableWithObject(this._knowledge[obj.id], obj) |
|
|
|
this._knowledge[obj.id] = object$ |
|
} |
|
}, |
|
|
|
async _handleDeleteMutation(mutation) { |
|
const optimisticReferences = mutation.getOptimisticReferences() |
|
const previous = [] |
|
|
|
for (const id of optimisticReferences) { |
|
if (this._knowledge[id]) { |
|
previous.push(this._knowledge[id].getValue()) |
|
this._knowledge[id].onNext() |
|
} |
|
} |
|
|
|
try { |
|
const response = await this._runQuery(mutation.getQuery()) |
|
|
|
mutation.setResponse(response) |
|
} catch (error) { |
|
for (const obj of previous) { |
|
this._knowledge[obj.id].onNext(obj) |
|
} |
|
|
|
mutation.setError(error) |
|
|
|
return |
|
} |
|
|
|
const references = mutation.getReferences() |
|
|
|
for (const id of references) { |
|
if (this._knowledge[id]) { |
|
this._knowledge[id].onCompleted() |
|
} |
|
} |
|
}, |
|
|
|
async _handleCreateMutation(mutation) { |
|
try { |
|
const response = await this._runQuery(mutation.getQuery()) |
|
|
|
mutation.setResponse(response) |
|
} catch (error) { |
|
mutation.setError(error) |
|
return |
|
} |
|
|
|
const references = mutation.getReferences() |
|
|
|
for (const obj of references) { |
|
const {object$, combinedObject$} = mergeObservableWithObject(this._knowledge[obj.id], obj) |
|
|
|
this._knowledge[obj.id] = object$ |
|
} |
|
}, |
|
|
|
async _reloadSubscribtion(s) { |
|
const response = await this._runQuery(s.getQuery()) |
|
const observable = this._handleQueryResponse(response) |
|
|
|
s.setObservable(observable) |
|
} |
|
} |
|
|
|
class QuerySubscribtion { |
|
constructor(query, observable, unpublish) { |
|
this._query = query |
|
this._unpublish = unpublish |
|
|
|
this._work = true |
|
this._observable$ = new Rx.BehaviorSubject(observable) |
|
this._observable = this._observable$ |
|
.takeWhile(() => this._work) |
|
.flatMapLatest(observable$ => observable$) |
|
} |
|
|
|
getQuery() { |
|
return this._query |
|
} |
|
|
|
setObservable(observable) { |
|
this._observable$.onNext(observable) |
|
} |
|
|
|
getObservable() { |
|
return this._observable |
|
} |
|
|
|
unsubscribe() { |
|
this._work = false |
|
this._unpublish() |
|
} |
|
} |
|
|
|
class Mutation { |
|
constructor() { |
|
this._response = undefined |
|
this._error = undefined |
|
} |
|
|
|
setResponse(response) { |
|
this._response = response |
|
} |
|
|
|
getResponse() { |
|
return this._response |
|
} |
|
|
|
setError(error) { |
|
this._error = error |
|
} |
|
|
|
getError() { |
|
return this._error |
|
} |
|
|
|
getReferences() { |
|
throw new Error('Not implemented') |
|
} |
|
|
|
isAffected() { |
|
return false |
|
} |
|
} |
|
|
|
class UpdateMutation extends Mutation {} |
|
|
|
class DeleteMutation extends Mutation {} |
|
|
|
class CreateMutation extends Mutation { |
|
isAffected() { |
|
return true |
|
} |
|
} |
|
|
|
// test |
|
|
|
class SetEmailMutation extends UpdateMutation { |
|
constructor({id, email}) { |
|
super() |
|
this.data = {id, email} |
|
} |
|
|
|
getQuery() { |
|
return ` |
|
mutation SetEmail { |
|
setEmail(userId: "${this.data.id}", email: "${this.data.email}") { |
|
id |
|
email |
|
} |
|
} |
|
` |
|
} |
|
|
|
getOptimisticReferences() { |
|
return [ |
|
this.data |
|
] |
|
} |
|
|
|
getReferences() { |
|
return [ |
|
this.getResponse().setEmail |
|
] |
|
} |
|
} |
|
|
|
class DeleteUserMutation extends DeleteMutation { |
|
constructor({id}) { |
|
super() |
|
this.data = {id} |
|
} |
|
|
|
getQuery() { |
|
return ` |
|
mutation DeleteUser { |
|
deleteUser(id: "${this.data.id}") |
|
} |
|
` |
|
} |
|
|
|
getOptimisticReferences() { |
|
return [ |
|
this.data.id |
|
] |
|
} |
|
|
|
getReferences() { |
|
return [ |
|
this.data.id |
|
] |
|
} |
|
} |
|
|
|
class CreateUserMutation extends CreateMutation { |
|
constructor({email}) { |
|
super() |
|
this.data = {email} |
|
} |
|
|
|
getQuery() { |
|
return ` |
|
mutation CreateUser { |
|
createUser(email: "${this.data.email}") { |
|
id |
|
email |
|
} |
|
} |
|
` |
|
} |
|
|
|
getReferences() { |
|
return [ |
|
this.getResponse().createUser |
|
] |
|
} |
|
} |
|
|
|
async function main() { |
|
try { |
|
const q1 = await Relay.fetch(` |
|
{ |
|
users { |
|
id |
|
email |
|
} |
|
} |
|
`) |
|
|
|
q1.getObservable().subscribe(r => log('q1', r)) |
|
|
|
log('\n===\n') |
|
|
|
const m1 = await Relay.update(new SetEmailMutation({id: '2', email: 'user2@yahoo.com'})) |
|
log('m1', m1.getResponse(), m1.getError()) |
|
|
|
log('\n===\n') |
|
|
|
const m2 = await Relay.update(new CreateUserMutation({email: 'user3@gmail.com'})) |
|
log('m2', m2.getResponse(), m2.getError()) |
|
|
|
log('\n===\n') |
|
|
|
const m3 = await Relay.update(new DeleteUserMutation({id: '1'})) |
|
log('m3', m3.getResponse(), m3.getError()) |
|
|
|
} catch (e) { |
|
console.log(e.stack) |
|
} |
|
} |
|
|
|
main() |