Skip to content

Instantly share code, notes, and snippets.

@nmccready
Last active February 28, 2019 16:58
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save nmccready/8027e3a8e168cad6836e0f53c40340bd to your computer and use it in GitHub Desktop.
Save nmccready/8027e3a8e168cad6836e0f53c40340bd to your computer and use it in GitHub Desktop.
RxJS Operator Simplification / Creation 6.x
export const basic = (subscriber) => (origSource) =>
origSource.lift({
call(dest, source) {
return source.subscribe(subscriber(dest));
}
});
export const through = (cb) =>
basic((dest) => {
return {
next(msg) {
cb(msg, dest);
},
error: dest.error,
complete: dest.complete
};
});
export const bufferThrough = (cb) => {
const buffer = [];
const throughCb = cb(buffer);
// allow closure for handling state w/ buffer
/*
bufferThrough((buffer) => {
let counter = 0 // etc...
return (msg, dest) => {
}
})
*/
return through(throughCb);
};
@nmccready
Copy link
Author

nmccready commented Feb 28, 2019

Reduces something like this ugliness

  const bufferTimelineCounter = () => (origSource) =>
    origSource.lift({
      call(dest, source) {
        const buffer = [];
        return source.subscribe({
          next(msg) {
            buffer.push(msg);
            dest.next(buffer);

            timelineCount += msg.body.data.timeline.length;
            if (timelineCount >= MAX_TAGS_THRESHOLD) {
              dest.complete();
            }
          },
          error: dest.error,
          complete: dest.complete
        });
      }
    });

to

      bufferThrough((buffer) => {
        let timelineCount = 0;
        return (msg, dest) => {
          buffer.push(msg);
          dest.next(buffer);

          timelineCount += msg.body.data.timeline.length;
          if (timelineCount >= MAX_TAGS_THRESHOLD) {
            dest.complete();
          }
        };
      })
    )

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment