public
Created

Implementation of generateWithTime from the Reactive Framework in Mathematica

  • Download Gist
RxInMathematica
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
ClearAll[generateWithTime, createObserver, observer, observable,
disposable, task];
generateWithTime[
initialState_,
condition_,
resultSelector_,
timeSelector_,
iterate_] :=
Module[{
state = initialState,
lastTaskForCleanup,
runNextTask,
observable = Unique[],
disposed = False,
disposable = Unique[]},
observable["Subscribe"] = Function[observer,
Module[{
(* unpack the observer *)
hasOnNext = ValueQ[observer["OnNext"]],
OnNext = observer["OnNext"],
hasOnCompleted = ValueQ[observer["OnCompleted"]],
OnCompleted = observer["OnCompleted"],
hasOnError = ValueQ[observer["OnError"]],
OnError = observer["OnError"],
defaultCatchResult = Unique[]
},
ClearAll[runNextTask];(* protect from asynch GC *)
(* see http://mathematica.stackexchange.com/questions/3807/
module-variable-scoping-in-scheduled-tasks *)
If[Not[hasOnNext], Throw["Observer must have OnNext"]];
runNextTask = Function[
Module[{oldState = state, proceed, catchResult},
(Quiet@RemoveScheduledTask@lastTaskForCleanup;
catchResult = Catch[(* catch errors creating task *)
lastTaskForCleanup =
CreateScheduledTask[
catchResult = Catch[
proceed = condition[oldState];
If[proceed && Not[disposed],
(OnNext[resultSelector[oldState]];
state = iterate[oldState];
runNextTask[]),
(Quiet@RemoveScheduledTask@lastTaskForCleanup;
If[hasOnCompleted, OnCompleted[]])];
defaultCatchResult];(* end of catch *)
If[(* catch errors in OnNext, proceed, iterat *)
catchResult =!= defaultCatchResult && hasOnError,
(OnError[catchResult];
Quiet@RemoveScheduledTask@lastTaskForCleanup)],
{timeSelector[oldState]}];
defaultCatchResult];
If[(* error in timeSelector *)
catchResult =!= defaultCatchResult && hasOnError,
(OnError[catchResult];
Quiet@RemoveScheduledTask@lastTaskForCleanup),
StartScheduledTask@lastTaskForCleanup]
)(* End of Module internal to runNextTask *)
](* return value of runNextTask *)
]];(* end of def runNextTask *)
runNextTask[];
disposable["Dispose"] = Function[disposed = True];
disposable
];(* end of def Subscribe *)
observable
](* end of def generateWithTime *);
createObserver[onNext_, onCompleted_: False, onError_: False] :=
Module[{o = Unique[]},
o["OnNext"] = onNext;
If[onCompleted =!= False, o["OnCompleted"] = onCompleted];
If[onError =!= False, o["OnError"] = onError];
o];
observable =
generateWithTime[0,
# < 3 &,
If[# === 2, Throw[#], #] &,
0.10 &,
# + 1 &];
observer = createObserver[
(* OnNext *)Print["Rx ONNEXT: " <> ToString@#] &,
(* OnCompleted *)Print["Rx DONE!"] &,
(* OnError *)Print["Rx ERROR: " <> ToString@#] &];
disposable = observable["Subscribe"][observer];
(*task=CreateScheduledTask[
(disposable["Dispose"][];
RemoveScheduledTask[task]),
{0.25}];
StartScheduledTask[task];*)
 
Print[ScheduledTasks[] // Length];
RemoveScheduledTask[ScheduledTasks[]]

Please sign in to comment on this gist.

Something went wrong with that request. Please try again.