Skip to content

Instantly share code, notes, and snippets.

@dmitryshelomanov
Created March 28, 2020 15:04
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dmitryshelomanov/e0425765c9c265f27b1f3f209a4deada to your computer and use it in GitHub Desktop.
Save dmitryshelomanov/e0425765c9c265f27b1f3f209a4deada to your computer and use it in GitHub Desktop.
// @flow
import { of, Subject, defer, Observable } from "rxjs"
import { map, catchError, flatMap, switchMap } from "rxjs/operators"
import {
type Event,
type Store,
createEvent,
createStore,
merge,
forward,
} from "effector"
import { mapTo } from "./operators"
export function createRxEffect<Params, Done, E = Error>(
api: (Params) => Promise<Done>,
mapper?: (Observable<Done>) => Observable<Done> = (v) => v,
takeEvery?: boolean = true,
) {
const run: Event<Params> = createEvent()
const doneData: Event<Done> = createEvent()
const failData: Event<E> = createEvent()
const finallyEvent: Event<
{ status: "done", result: Done } | { status: "fail", error: E },
> = createEvent()
const $pending: Store<boolean> = createStore(false)
const $error: Store<?E> = createStore(null)
const observer = new Subject<Params>()
const wrapper = takeEvery ? flatMap : switchMap
const source$ = observer.pipe(
wrapper((params) =>
mapper(defer(() => api(params))).pipe(
map((rs) => ({ isSuccess: true, data: rs })),
catchError((error) => of({ isSuccess: false, error })),
),
),
)
$pending
.on(run, () => true)
.on(merge([mapTo(doneData), mapTo(failData)]), () => false)
$error.reset(run, doneData).on(failData, (_, error) => error)
run.watch((params) => observer.next(params))
source$.subscribe((rs) => {
if (rs.isSuccess) {
doneData(rs.data)
}
if (!rs.isSuccess) {
failData(rs.error)
}
})
forward({
from: doneData,
to: finallyEvent.prepend((result) => ({ status: "done", result })),
})
forward({
from: failData,
to: finallyEvent.prepend((error) => ({ status: "fail", error })),
})
return {
doneData,
failData,
finally: finallyEvent,
run,
$pending,
$error,
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment