Skip to content

Instantly share code, notes, and snippets.

@rebcabin
Created April 2, 2012 00:48
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rebcabin/2279799 to your computer and use it in GitHub Desktop.
Save rebcabin/2279799 to your computer and use it in GitHub Desktop.
Implementation of generateWithTime from the Reactive Framework in Mathematica
ClearAll[generateWithTime, createObserver, observer, observable,
disposable, task];
generateWithTime[
initialState_,
condition_,
resultSelector_,
timeSelector_,
iterate_] :=
Module[{
state = initialState,
lastTaskForCleanup,
runNextTask,
observable = Unique[],
disposed = False,
disposable = Unique[]},
observable["Subscribe"] = Function[observer,
Module[{
(* unpack the observer *)
hasOnNext = ValueQ[observer["OnNext"]],
OnNext = observer["OnNext"],
hasOnCompleted = ValueQ[observer["OnCompleted"]],
OnCompleted = observer["OnCompleted"],
hasOnError = ValueQ[observer["OnError"]],
OnError = observer["OnError"],
defaultCatchResult = Unique[]
},
ClearAll[runNextTask];(* protect from asynch GC *)
(* see http://mathematica.stackexchange.com/questions/3807/
module-variable-scoping-in-scheduled-tasks *)
If[Not[hasOnNext], Throw["Observer must have OnNext"]];
runNextTask = Function[
Module[{oldState = state, proceed, catchResult},
(Quiet@RemoveScheduledTask@lastTaskForCleanup;
catchResult = Catch[(* catch errors creating task *)
lastTaskForCleanup =
CreateScheduledTask[
catchResult = Catch[
proceed = condition[oldState];
If[proceed && Not[disposed],
(OnNext[resultSelector[oldState]];
state = iterate[oldState];
runNextTask[]),
(Quiet@RemoveScheduledTask@lastTaskForCleanup;
If[hasOnCompleted, OnCompleted[]])];
defaultCatchResult];(* end of catch *)
If[(* catch errors in OnNext, proceed, iterat *)
catchResult =!= defaultCatchResult && hasOnError,
(OnError[catchResult];
Quiet@RemoveScheduledTask@lastTaskForCleanup)],
{timeSelector[oldState]}];
defaultCatchResult];
If[(* error in timeSelector *)
catchResult =!= defaultCatchResult && hasOnError,
(OnError[catchResult];
Quiet@RemoveScheduledTask@lastTaskForCleanup),
StartScheduledTask@lastTaskForCleanup]
)(* End of Module internal to runNextTask *)
](* return value of runNextTask *)
]];(* end of def runNextTask *)
runNextTask[];
disposable["Dispose"] = Function[disposed = True];
disposable
];(* end of def Subscribe *)
observable
](* end of def generateWithTime *);
createObserver[onNext_, onCompleted_: False, onError_: False] :=
Module[{o = Unique[]},
o["OnNext"] = onNext;
If[onCompleted =!= False, o["OnCompleted"] = onCompleted];
If[onError =!= False, o["OnError"] = onError];
o];
observable =
generateWithTime[0,
# < 3 &,
If[# === 2, Throw[#], #] &,
0.10 &,
# + 1 &];
observer = createObserver[
(* OnNext *)Print["Rx ONNEXT: " <> ToString@#] &,
(* OnCompleted *)Print["Rx DONE!"] &,
(* OnError *)Print["Rx ERROR: " <> ToString@#] &];
disposable = observable["Subscribe"][observer];
(*task=CreateScheduledTask[
(disposable["Dispose"][];
RemoveScheduledTask[task]),
{0.25}];
StartScheduledTask[task];*)
Print[ScheduledTasks[] // Length];
RemoveScheduledTask[ScheduledTasks[]]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment