ClearAll[generateWithTime, createObserver, observer, observable, | |
disposable, task]; | |
generateWithTime[ | |
initialState_, | |
condition_, | |
resultSelector_, | |
timeSelector_, | |
iterate_] := | |
Module[{ | |
state = initialState, | |
lastTaskForCleanup, | |
runNextTask, | |
observable = Unique[], | |
disposed = False, | |
disposable = Unique[]}, | |
observable["Subscribe"] = Function[observer, | |
Module[{ | |
(* unpack the observer *) | |
hasOnNext = ValueQ[observer["OnNext"]], | |
OnNext = observer["OnNext"], | |
hasOnCompleted = ValueQ[observer["OnCompleted"]], | |
OnCompleted = observer["OnCompleted"], | |
hasOnError = ValueQ[observer["OnError"]], | |
OnError = observer["OnError"], | |
defaultCatchResult = Unique[] | |
}, | |
ClearAll[runNextTask];(* protect from asynch GC *) | |
(* see http://mathematica.stackexchange.com/questions/3807/ | |
module-variable-scoping-in-scheduled-tasks *) | |
If[Not[hasOnNext], Throw["Observer must have OnNext"]]; | |
runNextTask = Function[ | |
Module[{oldState = state, proceed, catchResult}, | |
(Quiet@RemoveScheduledTask@lastTaskForCleanup; | |
catchResult = Catch[(* catch errors creating task *) | |
lastTaskForCleanup = | |
CreateScheduledTask[ | |
catchResult = Catch[ | |
proceed = condition[oldState]; | |
If[proceed && Not[disposed], | |
(OnNext[resultSelector[oldState]]; | |
state = iterate[oldState]; | |
runNextTask[]), | |
(Quiet@RemoveScheduledTask@lastTaskForCleanup; | |
If[hasOnCompleted, OnCompleted[]])]; | |
defaultCatchResult];(* end of catch *) | |
If[(* catch errors in OnNext, proceed, iterat *) | |
catchResult =!= defaultCatchResult && hasOnError, | |
(OnError[catchResult]; | |
Quiet@RemoveScheduledTask@lastTaskForCleanup)], | |
{timeSelector[oldState]}]; | |
defaultCatchResult]; | |
If[(* error in timeSelector *) | |
catchResult =!= defaultCatchResult && hasOnError, | |
(OnError[catchResult]; | |
Quiet@RemoveScheduledTask@lastTaskForCleanup), | |
StartScheduledTask@lastTaskForCleanup] | |
)(* End of Module internal to runNextTask *) | |
](* return value of runNextTask *) | |
]];(* end of def runNextTask *) | |
runNextTask[]; | |
disposable["Dispose"] = Function[disposed = True]; | |
disposable | |
];(* end of def Subscribe *) | |
observable | |
](* end of def generateWithTime *); | |
createObserver[onNext_, onCompleted_: False, onError_: False] := | |
Module[{o = Unique[]}, | |
o["OnNext"] = onNext; | |
If[onCompleted =!= False, o["OnCompleted"] = onCompleted]; | |
If[onError =!= False, o["OnError"] = onError]; | |
o]; | |
observable = | |
generateWithTime[0, | |
# < 3 &, | |
If[# === 2, Throw[#], #] &, | |
0.10 &, | |
# + 1 &]; | |
observer = createObserver[ | |
(* OnNext *)Print["Rx ONNEXT: " <> ToString@#] &, | |
(* OnCompleted *)Print["Rx DONE!"] &, | |
(* OnError *)Print["Rx ERROR: " <> ToString@#] &]; | |
disposable = observable["Subscribe"][observer]; | |
(*task=CreateScheduledTask[ | |
(disposable["Dispose"][]; | |
RemoveScheduledTask[task]), | |
{0.25}]; | |
StartScheduledTask[task];*) | |
Print[ScheduledTasks[] // Length]; | |
RemoveScheduledTask[ScheduledTasks[]] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment