Skip to content

Instantly share code, notes, and snippets.

@rebcabin
Created April 3, 2012 23:41
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rebcabin/2296390 to your computer and use it in GitHub Desktop.
Save rebcabin/2296390 to your computer and use it in GitHub Desktop.
Commented Reactive Mathematica with UnitTest
ClearAll[generateWithTime];
generateWithTime[
initialState_,
condition_,
resultSelector_,
timeSelector_,
iterate_] :=
Module[{
state = initialState,
lastTaskForCleanup,
runNextTask,
observable = Unique[],
disposed = False,
disposable = Unique[]},
observable["Subscribe"] = Function[observer,
Module[{
(* unpack the observer *)
(* ValueQ is Mathematica equivalent to JavaScript testing \
against undefined. If ValueQ[expr] is true,
then the expression has a value other than itself. *)
hasOnNext = ValueQ[observer["OnNext"]],
OnNext = observer["OnNext"],
hasOnCompleted = ValueQ[observer["OnCompleted"]],
OnCompleted = observer["OnCompleted"],
hasOnError = ValueQ[observer["OnError"]],
OnError = observer["OnError"],
defaultCatchResult = Unique[]
},
If[Not[hasOnNext], Throw["Observer must have OnNext"]];
ClearAll[runNextTask];(*
protect "runNextTask" from asynch GC *)
(* see http://mathematica.stackexchange.com/questions/3807/
module-variable-scoping-in-scheduled-tasks *)
(* "runNextTask" is a function that does work and then schedules \
another invocation of itself. *)
runNextTask = Function[
Module[{oldState = state, proceed, catchResult},
(*
Clean up the last task -- the one that scheduled the current \
invocation *)
(Quiet@RemoveScheduledTask@lastTaskForCleanup;
(* catch errors creating task and in the user-supplied time-
selector function *)
catchResult = Catch[
(*
create a task to do the work on the current invocation and \
save it for later cleanup by the next invocation *)
lastTaskForCleanup =
CreateScheduledTask[
(* catch errors in the observer's OnNext, OnCompleted,
and in the user-supplied condition, resultSelector,
and iterate functions *)
catchResult = Catch[
(*
find out if we should do this iteration and all \
following iterations *)
proceed = condition[oldState];
If[proceed && Not[disposed],
(* if we should go,
then call observer's OnNext with resultSelector[
currentState] *)
(OnNext[resultSelector[oldState]];
(* update the state via user-supplied lambda *)
state = iterate[oldState];
(* recurse to create the next task *)
runNextTask[]),
(* else, if we should NOT go, clean up,
call user's OnCompleted,
and don't schedule next task *)
(Quiet@RemoveScheduledTask@lastTaskForCleanup;
If[hasOnCompleted, OnCompleted[]])];
(* if nothing was thrown,
return the following as the value of the Catch *)
(* this is the end of the INNER catch block *)
defaultCatchResult];
(* check if something was thrown *)
If[
catchResult =!= defaultCatchResult && hasOnError,
(* if so, call OnError and cleaup. If OnError throws,
so be it *)
(OnError[catchResult];
Quiet@RemoveScheduledTask@lastTaskForCleanup)],
(*
the following is how long to wait before starting the \
task. If it throws, catch in our outer Catch block *)
{timeSelector[oldState]}];
(* if nothing was thrown,
return the following as the value of the Catch *)
(* this is the end of the OUTER catch block *)
defaultCatchResult];
(* Check for throw by timeSelector *)
If[
catchResult =!= defaultCatchResult && hasOnError,
(* if so, call OnError and cleaup. If OnError throws,
so be it *)
(OnError[catchResult];
Quiet@RemoveScheduledTask@lastTaskForCleanup),
(*
This version of generateWithTime does not start the first \
task immediately,
but rather after the delay specified by the result of \
calling the user's time selector on the initial state value *)
StartScheduledTask@lastTaskForCleanup]
)(* End of Module internal to runNextTask *)
](* return value of runNextTask *)
]];(* end of def runNextTask *)
(* start everything up by calling runNextTask the first time *)
runNextTask[];
(* create a disposable that can asynchronously set the flag \
that's checked in the inner loop *)
disposable["Dispose"] = Function[disposed = True];
disposable];(* end of def Subscribe *)
observable](* end of generateWithTime *)
ClearAll[observer, observable, disposable, task];
observable =
generateWithTime[
(* Initial State *)0,
(* Continue Condition *)# < 3 &,
(* Result Selector *)If[# === 2, Throw[#], #] &,
(* Time Selector *)0.10 &,
(* State Updater *)# + 1 &];
observer = createObserver[
(* OnNext *)Print["Rx ONNEXT: " <> ToString@#] &,
(* OnCompleted *)Print["Rx DONE!"] &,
(* OnError *)Print["Rx ERROR: " <> ToString@#] &];
(* expect two ONNEXT messages and then an ERROR *)
disposable = observable["Subscribe"][observer];
(* unless you run the following task; then expect two ONNEXT messages \
and then a DONE *)
(*task=CreateScheduledTask[
(disposable["Dispose"][];
RemoveScheduledTask[task]),
{0.25}];
StartScheduledTask[task];*)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment