Skip to content

Instantly share code, notes, and snippets.

@mfellner
Created December 4, 2016 23:52
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mfellner/22256ca4077b56f932f4c3ca9c1a6005 to your computer and use it in GitHub Desktop.
Save mfellner/22256ca4077b56f932f4c3ca9c1a6005 to your computer and use it in GitHub Desktop.
koa-router-rx.js
// @flow
import Koa from 'koa'
import Router from 'koa-router'
import { Observable, Subject } from 'rxjs/Rx'
type Epic = (observable: Observable) => Observable
const epic1 = observable =>
observable.map(({ctx}) => {
ctx.body = 'test'
return ctx.origin + ':epic1'
})
const epic2 = observable =>
observable.map(x => x + ':epic2')
const epicFail = observable =>
observable.mergeMapTo(Observable.throw(new Error('fail!')))
function combineEpics(...epics) {
return observable => Observable.combineLatest(...epics.map(epic => epic(observable)))
}
function foldEpics(...epics) {
return observable => epics.reduce((obs, epic) => epic(obs), observable)
}
class RxRouter {
router: Router;
registry: {
[path: string]: {
[method: string]: {
request: Subject;
response: Subject;
};
};
};
constructor() {
this.router = new Router()
this.registry = {}
}
register(name: string, path: string, method: string) {
const request = new Subject()
const response = new Subject()
this.router.register(path, [method], (ctx, next) => {
const p = response.concatAll().first().toPromise()
request.next(Observable.of({ctx, next}))
return p
}, {
name
})
this.registry[path] = Object.assign({}, this.registry[path] || {}, {[method]: {request, response}})
}
subscribe(path: string, method: string, callback: (o: Observable, r: Subject) => any) {
const {request, response} = this.registry[path][method]
request.subscribe(observable => callback(observable, response))
}
subscribeResponse(path: string, method: string, callback: (o: Observable, r: Subject) => any) {
const {request, response} = this.registry[path][method]
request.subscribe(observable => response.next(callback(observable)))
}
subscribeEpicResponse(path: string, method: string, epic: Epic) {
const {request, response} = this.registry[path][method]
request.subscribe(observable => response.next(epic(observable)))
}
routes() {
return this.router.routes()
}
}
// const proxyRouter = new Proxy(new Router(), {
// get: function(target, property, receiver) {
// }
// })
const app = new Koa()
const router = new RxRouter()
const epic = foldEpics(epic1, epic2)//, epicFail)
router.register('test', '/test', 'GET')
// router.subscribe('/test', 'GET', (observable, resp) => resp.next(epic(observable)))
// router.subscribeResponse('/test', 'GET', observable => epic(observable))
router.subscribeEpicResponse('/test', 'GET', epic)
router.subscribe('/test', 'GET', () => console.log('on request?'))
router.subscribe('/test', 'GET', () => console.log('on request!'))
// router.subscribeResponse(() => Observable.of('resp!'))
app.use(router.routes())
app.listen(3333)
console.log('listening on http://localhost:3333')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment