Created
April 3, 2012 23:41
-
-
Save rebcabin/2296390 to your computer and use it in GitHub Desktop.
Commented Reactive Mathematica with UnitTest
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
ClearAll[generateWithTime]; | |
generateWithTime[ | |
initialState_, | |
condition_, | |
resultSelector_, | |
timeSelector_, | |
iterate_] := | |
Module[{ | |
state = initialState, | |
lastTaskForCleanup, | |
runNextTask, | |
observable = Unique[], | |
disposed = False, | |
disposable = Unique[]}, | |
observable["Subscribe"] = Function[observer, | |
Module[{ | |
(* unpack the observer *) | |
(* ValueQ is Mathematica equivalent to JavaScript testing \ | |
against undefined. If ValueQ[expr] is true, | |
then the expression has a value other than itself. *) | |
hasOnNext = ValueQ[observer["OnNext"]], | |
OnNext = observer["OnNext"], | |
hasOnCompleted = ValueQ[observer["OnCompleted"]], | |
OnCompleted = observer["OnCompleted"], | |
hasOnError = ValueQ[observer["OnError"]], | |
OnError = observer["OnError"], | |
defaultCatchResult = Unique[] | |
}, | |
If[Not[hasOnNext], Throw["Observer must have OnNext"]]; | |
ClearAll[runNextTask];(* | |
protect "runNextTask" from asynch GC *) | |
(* see http://mathematica.stackexchange.com/questions/3807/ | |
module-variable-scoping-in-scheduled-tasks *) | |
(* "runNextTask" is a function that does work and then schedules \ | |
another invocation of itself. *) | |
runNextTask = Function[ | |
Module[{oldState = state, proceed, catchResult}, | |
(* | |
Clean up the last task -- the one that scheduled the current \ | |
invocation *) | |
(Quiet@RemoveScheduledTask@lastTaskForCleanup; | |
(* catch errors creating task and in the user-supplied time- | |
selector function *) | |
catchResult = Catch[ | |
(* | |
create a task to do the work on the current invocation and \ | |
save it for later cleanup by the next invocation *) | |
lastTaskForCleanup = | |
CreateScheduledTask[ | |
(* catch errors in the observer's OnNext, OnCompleted, | |
and in the user-supplied condition, resultSelector, | |
and iterate functions *) | |
catchResult = Catch[ | |
(* | |
find out if we should do this iteration and all \ | |
following iterations *) | |
proceed = condition[oldState]; | |
If[proceed && Not[disposed], | |
(* if we should go, | |
then call observer's OnNext with resultSelector[ | |
currentState] *) | |
(OnNext[resultSelector[oldState]]; | |
(* update the state via user-supplied lambda *) | |
state = iterate[oldState]; | |
(* recurse to create the next task *) | |
runNextTask[]), | |
(* else, if we should NOT go, clean up, | |
call user's OnCompleted, | |
and don't schedule next task *) | |
(Quiet@RemoveScheduledTask@lastTaskForCleanup; | |
If[hasOnCompleted, OnCompleted[]])]; | |
(* if nothing was thrown, | |
return the following as the value of the Catch *) | |
(* this is the end of the INNER catch block *) | |
defaultCatchResult]; | |
(* check if something was thrown *) | |
If[ | |
catchResult =!= defaultCatchResult && hasOnError, | |
(* if so, call OnError and cleaup. If OnError throws, | |
so be it *) | |
(OnError[catchResult]; | |
Quiet@RemoveScheduledTask@lastTaskForCleanup)], | |
(* | |
the following is how long to wait before starting the \ | |
task. If it throws, catch in our outer Catch block *) | |
{timeSelector[oldState]}]; | |
(* if nothing was thrown, | |
return the following as the value of the Catch *) | |
(* this is the end of the OUTER catch block *) | |
defaultCatchResult]; | |
(* Check for throw by timeSelector *) | |
If[ | |
catchResult =!= defaultCatchResult && hasOnError, | |
(* if so, call OnError and cleaup. If OnError throws, | |
so be it *) | |
(OnError[catchResult]; | |
Quiet@RemoveScheduledTask@lastTaskForCleanup), | |
(* | |
This version of generateWithTime does not start the first \ | |
task immediately, | |
but rather after the delay specified by the result of \ | |
calling the user's time selector on the initial state value *) | |
StartScheduledTask@lastTaskForCleanup] | |
)(* End of Module internal to runNextTask *) | |
](* return value of runNextTask *) | |
]];(* end of def runNextTask *) | |
(* start everything up by calling runNextTask the first time *) | |
runNextTask[]; | |
(* create a disposable that can asynchronously set the flag \ | |
that's checked in the inner loop *) | |
disposable["Dispose"] = Function[disposed = True]; | |
disposable];(* end of def Subscribe *) | |
observable](* end of generateWithTime *) | |
ClearAll[observer, observable, disposable, task]; | |
observable = | |
generateWithTime[ | |
(* Initial State *)0, | |
(* Continue Condition *)# < 3 &, | |
(* Result Selector *)If[# === 2, Throw[#], #] &, | |
(* Time Selector *)0.10 &, | |
(* State Updater *)# + 1 &]; | |
observer = createObserver[ | |
(* OnNext *)Print["Rx ONNEXT: " <> ToString@#] &, | |
(* OnCompleted *)Print["Rx DONE!"] &, | |
(* OnError *)Print["Rx ERROR: " <> ToString@#] &]; | |
(* expect two ONNEXT messages and then an ERROR *) | |
disposable = observable["Subscribe"][observer]; | |
(* unless you run the following task; then expect two ONNEXT messages \ | |
and then a DONE *) | |
(*task=CreateScheduledTask[ | |
(disposable["Dispose"][]; | |
RemoveScheduledTask[task]), | |
{0.25}]; | |
StartScheduledTask[task];*) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment