Skip to content

Instantly share code, notes, and snippets.

@machty
Created November 26, 2016 21:54
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save machty/a6bb609c99c2cc3f4513884de6d5f8af to your computer and use it in GitHub Desktop.
Save machty/a6bb609c99c2cc3f4513884de6d5f8af to your computer and use it in GitHub Desktop.
ember-concurrency observable integration thoughts

Observable integration

Lots of FPR libraries use Observables as their central async/concurrency primitive, so it'd be nice if ember-concurrency could integrate with these libraries.

Before delving into how this might be possible, here's a quick compare/contrast between e-c Tasks and Observables:

  • Modeling Streams
    • both e-c tasks and observables can async streams of zero or more values followed by an optional success/error termination
      • observables emit values using onNext(), onCompleted(), and onError()
      • tasks emit values using emit(), and returning a value from a generator fn is an implicit emit(), and throwing an error is an implicit onError
  • Implicit State
    • Observables don't expose the state of a subscription in an externally observable way; in other words, if you wanted to bind some UI to the state of an observable, you'd have to wrap the underlying observable to emit values at the beginning and end of a subscriptions lifecycle. This is a performant and highly composable default, but it means you have to wire up more things yourself to derive the state you need to drive you UI.
    • e-c Tasks try to expose as much bindable/observable state as possible, including someTask.isRunning/isIdle, and the references from a Task to various taskInstances, e.g. .last / .lastCompleted. This is a less performant default, but e-c tasks aren't optimized for extremely performance intensive workloads, but rather exposing a maximum amount of derived state in a conventional way to solve common UI problems with minimal code.
  • Operator Composability
    • The Observable reigns supreme when it comes to functional composability; observables can be mapped, buffered, delayed, flatMapped, etc., with such alacrity that it'll make your head spin.
    • e-c tasks are not nearly as composable as Observables, generally speaking, mostly because the decision has already been made that it's more important to couple tasks to their host objects and yield the benefits of conventional lifecycle constraints, Structured Concurrency, and tons of implicit UI state for conventional UI development. Observable Aficionados will miss nice patterns like mapping over overlapping buffers to smooth out numeric values from a noisy stream, and lots of other computations that are easier to express in Observable land than e-c land. Whereas e-c makes you think in terms of (non-)overlapping rectangular lifespans, Observables go one level below and make you think in terms of marble diagrams.
    • There's no clear winner here, since it's really just a matter of tradeoffs, but I would LOVE it if I could bridge the gap between e-c tasks and observables so that, when needed, I could reach for an observable operator for some more advanced cases when expressing such an operation would involve a hacky, clumsy generator function.

Hand-wavy Proposal

import { task, emit } from 'ember-concurrency';

export Ember.Object.extend({
		someFn() {
	    let countUp = this.get('countUp');
		
		  // produces a cold observable that calls
		  // countUp() with no args when subscribe()d.
		  let coldObs = myTask[Symbol.observable]();
		  // ... or myTask.toObservable();
		  
		  // Produce an observable that calls
		  // countUp.perform(1,2,3) when subscribed;
		  let boundColdObs = countUp.toObservable(1,2,3);
		  
		  // Alternatively we can add something like
		  // .bind() to tasks to produce a tasks that
		  // performs with some of its params already bound,
		  // but I have doubts considering I haven't run into
		  // too many use cases for this outside of
		  // Observable intergration.


      let taskInstance = countUp.perform(4,5,6);
      
      // Since a TaskInstance by definition is
      // already running, it must return a hot
      // observable.
      // Question: should it replay the last emitted value?
      //           I feel like it should.
      let hotObs = taskInstance[Symbol.observable]();
		  
		  let xformedObs = 
				  coldObs.buffer(2,1)
				     .map(([a,b]) => a + b);
				     
		  // Classic observable subscription
		  xformedObs.subscribe(aPlusB => {
        console.log("a + b is ", aPlusB);
      });
      
      // Subscribe a task to handle events emitted
      // from the observable. This demonstrates how
      // tasks can both exist on both ends of observables,
      // both as producers and consumers of values,
      // preserving the `isRunning`/`isIdle` implicit
      // state and all the niceties that come w tasks.
      this.get('observerTask').subscribe(xformedObs);
      
      // Alternatively, you can just pass the observable
      // into task.perform and let the task decide how it
      // wants to subscribe.
      this.get('obsSubscriber').perform(xformedObs);
    },
		
		countUp: task(function * (one, two, three) {
		  let i = 0;
	    while(true) {
	      emit(++i);
	      yield timeout(1000);
	    }
		}),
		
		observerTask: task(function * (aPlusB) {
	    yield timeout(1000);
	    console.log("a + b is ", aPlusB);
		}),
		
		obsSubscriber: task(function * (obs) {
		  // subscribe is provided by e-c
		  let ai = yield subscribe(obs);
			for (let v of yield ai) {
			  yield timeout(1000);
			  console.log(`got value ${v}`);
			}
		}),

API Problems / Outstanding Questions:

While it seems like we're on the right path (where tasks can be subscribed to observables in the same way tasks can be subscribed to various lifecycle / UI events (e.g. 'init', 'didInsertElement', 'click')) there's an unresolved question of how task modifiers (.restartable/.drop/etc) should work:

	observerTask: task(function * (aPlusB) {
   ...
	}).restartable(),

If someone subscribes this task to an observable (via the proposed API this.get('observerTask').subscribe(observable)), then:

  1. What is restartable? Attempts to .perform()/.subscribe() a task? Or perhaps each value emitted by the observable should restart observerTask? If it's each value, should the cancelation of prior running task instances (due to restartable) cause the parent "task" (the observable subscription) to be canceled? Or would we make an exception for observable subscriptions?
  2. Furthermore, what is a TaskInstance? It used to just be that every .perform() returned a new TaskInstance, which corresponded to a single run through the task generator function; but what is the value returned from task.subscribe()? Is it still a TaskInstance, even though in the course of the subscription the task generator fn might be executed multiple times? If we decide that .subscribe() returns something else, some kind of subscription object, then we still have to answer the question: does observerTask.cancelAll() also cancel subscriptions, or just individual attempts to .perform() a task?

I'm not sure these issues/questions can even be resolved.

Alternative: subscribe()

Basic example:

	someTask: task(function * (observable) {
	  let values = yield subscribe(observable);
	  let firstValue = yield values;
	  // firstValue === { done: false, value: someValue }
		for (let v of yield values) {
		  yield timeout(1000);
		  console.log(`got value ${v}`);
		}
	}),
  • Create a subscription using yield subscribe(obs)
  • "Loop over" async values using for(let x of yield sub) pattern (e-c

AH HAH I am a moron: let v of yield values is going to yield values only once. The for loop is going to expect that it's an iterator that responds to .next() / .return(). To actually await values in a simple way, we'll need to manually do it or wait for async iterators (https://github.com/tc39/proposal-async-iteration).

Crap, so what's the alternative? Maybe a hybrid?

	someTask: task(function * (observable) {
	  yield subscribe(observable, function * () {
	  }).restartable();
	  // this inner iterating task restarts whenever
	  // a new value is emitted from the observable.
	}).restartable(),
	// someTask restarts whenever someone calls .perform()
	// on it, as per usual.

Encapsulated tasks?

	someTask: task({ 
	  perform: function * (observable) {
	    yield this.get('processEvent').subscribe(observable);
	  }),
	  
	  processEvent: task(function * (event) {
	  }).restartable(),
	}).restartable(),

TL;DR FML

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment