Skip to content

Instantly share code, notes, and snippets.

View cwharris's full-sized avatar
🖖

Christopher Harris cwharris

🖖
View GitHub Profile
var Rx = require('rx'),
log = console.log.bind(console);
Rx.window = Rx; // #hack to make rx-dom work
require('rx-dom');
var prop = 'which',
map = {
37: 'west',
38: 'north',
@cwharris
cwharris / Observable.iterate.js
Last active August 29, 2015 13:57
An idea regarding back-pressure for iterable sources.
Rx.Observable.iterate = function (enumerable, maxConcurrent, workSelector) {
return Rx.Observable.create(function (o) {
var i = 0,
items = [],
enumerator = enumerable.getEnumerator(),
done = new Rx.Subject(),
more = function () {
if (enumerator.moveNext()) {
return true;
@cwharris
cwharris / delayMinimumInterval.js
Created December 10, 2013 20:19
Rx.Observable.prototype.delayMinimumInterval: ensures a minimum interval between notifications.
Rx.Observable.prototype.delayMinimumInterval = function (relativeTime, scheduler) {
if (scheduler === undefined) scheduler = Rx.Scheduler.timeout;
var latest = scheduler.now();
return this
.selectMany(function (x) {
var now = new Date(scheduler.now());
var next = new Date(latest);
next.setMilliseconds(next.getMilliseconds() + relativeTime);
@cwharris
cwharris / switch-expression-ish.js
Last active December 28, 2015 01:39
For those of you who hate living without switch-expressions.
var n = Math.floor(Math.random() * 4);
var result = n === 1 ? "one"
: n === 2 ? "two"
: n === 3 ? "three"
: "missing-no";
class Program
{
static void Main(string[] args)
{
Func<int, int, int> add = (a, b) => a + b;
var obs1 = Observable.Empty<int>().Scan(add);
var obs2 = Observable.Empty<int>().Scan(0, add);
var obs3 = Observable.Range(1, 3).Scan(add);
@cwharris
cwharris / BatchAsync.cs
Last active January 26, 2022 18:55
If you can understand this, you have a pretty good understanding of Rx, Async, Generics, and Extension Methods.
public static IObservable<TResult> BatchAsync<T, TResponse, TResult>(
this IObservable<T> source,
int count,
Func<IEnumerable<T>, CancellationToken, Task<TResponse>> process,
Func<TResponse, IEnumerable<TResult>> resultSelector
)
{
return source
.Buffer(count)
.SelectMany(batch =>
@cwharris
cwharris / FlowLogic.js
Last active December 23, 2015 06:59
Free drink on checkin. `pendingPours` will be incremented each time a user checks into Four Square. :)
var Rx = require('rx'),
Rx = require('./rx.helpers')
;
var FlowLogic = (function () {
Rx.Internals.inherits(FlowLogic, Rx.Observable);
// pendingPours is a behavior subject.
// solenoid is an observer. It represents the valve allowing drink flow.
@cwharris
cwharris / example.js
Created September 2, 2013 23:14
Rx.Observable.latestOn
var model = new Rx.Subject();
var click = new Rx.Subject();
model.latestOn(click, function (model, e) { return [model, e]; })
.subscribe(function (x) {
console.log(x);
});
@cwharris
cwharris / join-pattern.js
Created September 2, 2013 20:18
What should this be doing exactly?
var Rx = require('rx');
var a = new Rx.Subject();
var b = new Rx.Subject();
var c = new Rx.Subject();
var resA = Rx.Observable
.when(
a.and(b)
.then(function (a, b) { return a + b; })
@cwharris
cwharris / autoPublish-1.js
Last active December 22, 2015 02:59
autoPublish
Rx.Observable.prototype.autoPublish = function () {
var source = this.publish(),
subscribers = 0,
connection = null,
dispose = function () {
if (--subscribers === 0) { connection.dispose(); }
};
return Rx.Observable.createWithDisposable(function (o) {
if (++subscribers === 1) { connection = source.connect(); }