Skip to content

Instantly share code, notes, and snippets.



Last active Oct 29, 2016
What would you like to do?
Simple Order-Saga example in RxJS (without Saga-Log)
const Rx = require('rx');
// simulate transactions
const bookCar = id => Rx.Observable.return({sagaId: id, 'Car A': true}).delay(1000);
const bookHotel = id => Rx.Observable.return({sagaId: id, 'Hotel B': true}).delay(1000);
const bookFlight = id => Rx.Observable.return({sagaId: id, 'Flight C': true}).delay(1000);
// simulate error
//const bookFlight = id => Rx.Observable.throw({sagaId: id, 'Flight C': true}).delay(1000);
// transaction compensation (idempotence)
const compensateBookCar = id => Rx.Observable.return({sagaId: id, 'Car A': false}).delay(1000);
const compensateBookHotel = id => Rx.Observable.return({sagaId: id, 'Hotel B': false}).delay(1000);
const compensateBookFlight = id => Rx.Observable.return({sagaId: id, 'Flight C': false}).delay(1000);
// starting
const sagaStream = Rx.Observable.return({ metadata: 'foo bar' });
startSaga('Booking 1', sagaStream)
.do(x => console.log(`Saga state: ${JSON.stringify(x)}`))
.doOnError(error => console.error(`Saga error: ${error.message}`))
.doOnCompleted(() => console.log('Saga completed'))
function startSaga(sagaId, initialStream) {
const stepStream = Rx.Observable.concat([bookCar(sagaId), bookHotel(sagaId), bookFlight(sagaId)]);
const compensationStream = Rx.Observable.merge([compensateBookCar(sagaId), compensateBookHotel(sagaId), compensateBookFlight(sagaId)]);
return initialStream
.map(x => Object.assign(x, {sagaId: sagaId}))
.scan((state, step) => Object.assign(state, step));
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment