Skip to content

Instantly share code, notes, and snippets.

@sdebaun
Last active March 3, 2016 13:23
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
Star You must be signed in to star a gist
Save sdebaun/1be10522b37712d6c626 to your computer and use it in GitHub Desktop.
/*
I have a queue that clients submit tasks to, this backend worker consumes them
firebaseDriver: provides a function that generates a one-element observable of a firebase location
queueConsumerDriver: provides a source of all incoming requests, consumes a sink that takes responses
knows via the uid in the response what channel to send it back on
*/
import {makeFirebaseDriver, makeQueueConsumerDriver} from 'cyclic-fire'
const main = sources => {
const peripheralInfoNeededforRequest$ = sources.queue$
.flatMap(({fooKey}) => sources.firebase('Foo',fooKey))
const infoForOtherRequest$ = sources.queue$
.flatMap(({barKey}) => sources.firebase('Bar',barKey))
// use switchPath on req.domain to route to handler?
// similar to routing with cyclic-router
// handlers can switchPath on other req keys to break it down further
const aResponses$ = sources.queue$
.filter(({domain}) => domain === 'ThingA')
.zip(peripheralInfoNeededforRequest$)
.map((req:{uid},peripheral) => {uid, my:'response',something:peripheral.something})
const bResponses$ = sources.queue$
.filter(({domain}) => domain === 'ThingB')
.zip(infoForOtherRequest$)
.map((req:{uid},info) => {uid, my:'response',somethingElse:info.somethingElse})
return {
queue$: Observable.merge(aResponses$,bResponses$)
}
}
fb = new Firebase('...')
run(main, {
queue$: makeQueueConsumerDriver(fb.child('!queue')), // where reqs and res go
firebase: makeFirebaseDriver(fb), // rest of the data
})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment