Skip to content

Instantly share code, notes, and snippets.

@rebcabin
Created April 4, 2012 15:37
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rebcabin/2302901 to your computer and use it in GitHub Desktop.
Save rebcabin/2302901 to your computer and use it in GitHub Desktop.
(*
Cold GenerateWithTime
Rx.Observable.GenerateWithTime = function(
initialState, // : State
condition, // : State -> bool
resultSelector, // : State -> Result
timeSelector, // : State -> int
iterate) // : State -> State
A cold observable is one that starts to produce values only when an observer subscribes to it. An observable is a function that returns an object with a Subscribe method. Represent objects as symbols with string keys (just like JavaScript).
The Subscribe function takes an observer as input. An observer is an object exposing one, two, or three functions: onNext[value], onCompleted, and onError[error].
*)
ClearAll[createObserver];
createObserver[onNext_, onCompleted_: False, onError_: False] :=
Module[{o = Unique[]},
o["OnNext"] = onNext;
If[onCompleted =!= False, o["OnCompleted"] = onCompleted];
If[onError =!= False, o["OnError"] = onError];
o];
(*
Generate returns a restartable, interleavable observable:
*)
ClearAll[generateWithTime];
generateWithTime[
initialState_,
condition_,
resultSelector_,
timeSelector_,
iterate_] :=
Module[{
observable = Unique[],
(* Mathematica catch wraps the try block,
so is more like a combo of C # catch & finally.
The Catch must return something.
The default case signals that no Throw was evaluated. *)
defaultCatchResult = Unique[],
},
observable["Subscribe"] = Function[observer,
Module[{
(* unpack the observer *)
(* ValueQ is Mathematica equivalent to JavaScript testing against \
undefined. If ValueQ[expr] is true,
then the expression has a value other than itself. *)
hasOnNext = ValueQ[observer["OnNext"]],
OnNext = observer["OnNext"],
hasOnCompleted = ValueQ[observer["OnCompleted"]],
OnCompleted = observer["OnCompleted"],
hasOnError = ValueQ[observer["OnError"]],
OnError = observer["OnError"],
(* set up non-interleaving task structure *)
lastTaskForCleanup,
runNextTask,
(* initialize stuff when observer subscribes *)
state = initialState,
disposed = False,
disposable = Unique[]
},
(* Don't catch this throw here. This is a bad call of generateWithTime *)
If[Not[hasOnNext], Throw["Observer must have OnNext"]];
(* The following Clear is a trick that makes runNextTask invulnerable to \
garbage collection in the time window between two invocations. *)
(* see http://mathematica.stackexchange.com/questions/3807/module-
variable-scoping-in-scheduled-tasks *)
ClearAll[runNextTask];
(* "runNextTask" is a function that does work and then schedules another \
invocation of itself. *)
runNextTask = Function[
Module[{oldState = state, proceed, catchResult},
(*
Clean up the last task -- the one that scheduled the current \
invocation *)
(Quiet@RemoveScheduledTask@lastTaskForCleanup;
(* catch errors in creating the task and in the user-supplied time-
selector function *)
catchResult = Catch[
(*
create a task to do the work on the current invocation and save it \
for later cleanup by the next invocation *)
lastTaskForCleanup =
CreateScheduledTask[
(* catch errors in the observer's OnNext, OnCompleted,
and in the user-supplied condition, resultSelector,
and iterate functions *)
catchResult = Catch[
(*
find out if we should do this iteration and all following \
iterations -- invoke caller-supplied "condition" function. *)
proceed = condition[oldState];
If[proceed && Not[disposed],
(*
if we should go then call observer's OnNext with result of \
caller-supplied resultSelector[currentState] *)
(OnNext[resultSelector[oldState]];
(* update state via user-supplied lambda *)
state = iterate[oldState];
(* recurse to create the next task *)
runNextTask[]),
(* else, if we should NOT go, clean up,
call user's OnCompleted, and don't schedule next task *)
(Quiet@RemoveScheduledTask@lastTaskForCleanup;
If[hasOnCompleted, OnCompleted[]])];
(* if nothing was thrown,
return the following as the value of the Catch *)
(* this is the end of the INNER catch block *)
defaultCatchResult];
(* check if something was thrown *)
If[
catchResult =!= defaultCatchResult && hasOnError,
(* if something was thrown, call OnError and cleaup.
If observer's OnError throws, let that escape; so be it *)
(OnError[catchResult];
Quiet@RemoveScheduledTask@lastTaskForCleanup)],
(* the following is how long to wait before starting the task.
If it throws, catch in our outer Catch block *)
{timeSelector[oldState]}];
(* if nothing was thrown,
return the following as the value of the Catch *)
(* this is the end of the OUTER catch block *)
defaultCatchResult];
(* Check for throw by timeSelector *)
If[
catchResult =!= defaultCatchResult && hasOnError,
(* if caller-
supplied timeSelector threw or something else went wrong,
call OnError and cleaup. If OnError throws, let it escape;
so be it *)
(OnError[catchResult];
Quiet@RemoveScheduledTask@lastTaskForCleanup),
(*
This version of generateWithTime does not start the first task \
immediately,
but rather after the delay specified by the result of calling the \
user's time selector on the initial state value.
That gives the user the opportunity to race the task and call \
dispose before the first observation is created. *)
StartScheduledTask @ lastTaskForCleanup]
)(* End of Module internal to runNextTask *)
](* return value of runNextTask *)
];(* end of def runNextTask *)
(* start everything up by calling runNextTask the first time *)
runNextTask[];
(* create a disposable that can asynchronously set the flag that's \
checked in the inner loop. *)
disposable["Dispose"] = Function[disposed = True];
disposable]];(* end of def Subscribe; return disposable *)
observable](* end of generateWithTime; return observable *)
(*
Unit Tests
The following should display traces of two interleaved observers.
*)
ClearAll[observer, observable, disposable, task];
observable =
generateWithTime[
(* Initial State *)0,
(* Continue Condition *)# < 3 &,
(* Result Selector *)# &,
(* Time Selector *)0.10 &,
(* State Updater *)# + 1 &];
observer1 = createObserver[
(* OnNext *)Print["Rx 1 ONNEXT: " <> ToString@#] &,
(* OnCompleted *)Print["Rx 1 DONE!"] &,
(* OnError *)Print["Rx 1 ERROR: " <> ToString@#] &];
observer2 = createObserver[
(* OnNext *)Print["Rx 2 ONNEXT: " <> ToString@#] &,
(* OnCompleted *)Print["Rx 2 DONE!"] &,
(* OnError *)Print["Rx 2 ERROR: " <> ToString@#] &];
{observable["Subscribe"][observer1], observable["Subscribe"][observer2]}
(*
Out[169]= {$552, $553}
*)
(* now prove it's restartable *)
observable["Subscribe"][observer2]
(*
Out[170]= $554
*)
ClearAll[observer, observable, disposable, task];
observable =
generateWithTime[
(* Initial State *)0,
(* Continue Condition *)# < 3 &,
(* Result Selector *)If[# === 2, Throw[#], #] &,
(* Time Selector *)0.10 &,
(* State Updater *)# + 1 &];
observer = createObserver[
(* OnNext *)Print["Rx ONNEXT: " <> ToString@#] &,
(* OnCompleted *)Print["Rx DONE!"] &,
(* OnError *)Print["Rx ERROR: " <> ToString@#] &];
(* expect two ONNEXT messages and then an ERROR *)
disposable = observable["Subscribe"][observer];
(* unless you run the following task; then expect two ONNEXT messages and \
then a DONE *)
(*task=CreateScheduledTask[
(disposable["Dispose"][];
RemoveScheduledTask[task]),
{0.25}];
StartScheduledTask[task];*)
(*
The following should always print a zero. Otherwise, generateWithTime or the unit test is leaking tasks.
*)
Print[ScheduledTasks[] // Length];
RemoveScheduledTask[ScheduledTasks[]];
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment