Skip to content

Instantly share code, notes, and snippets.

@sjkp
Last active August 29, 2015 14:02
Show Gist options
  • Save sjkp/8205d540e1d88d26d078 to your computer and use it in GitHub Desktop.
Save sjkp/8205d540e1d88d26d078 to your computer and use it in GitHub Desktop.
Observable filter that blocks specific events with a specific Id from flowing through. The filter is activated when a EventAction.Nothing flows through and deactivated on the subsequent EventAction.Nothing with the same Id.
open System
type EventAction = Nothing | Something
type TimeEvent = {time: DateTime; Id: int; Action : EventAction}
let t,t2 = new System.Timers.Timer(1000.0), new System.Timers.Timer(5000.0)
let timer1Stream = t.Elapsed |> Observable.map(fun _ -> {time = DateTime.Now; Id = 1; Action = EventAction.Something})
let timer2Stream = t2.Elapsed |> Observable.map(fun _ -> {time = DateTime.Now; Id = 1; Action = EventAction.Nothing})
let mergedStream = Observable.merge timer1Stream timer2Stream
// If an EventAction.Nothing flows through the all subsequent Events with the same Id as the EventAction.Nothing will be let filter l e =
printfn "filter func says: %A List contains: %A" e l
let res = l |> Seq.filter(fun i -> i.Action = EventAction.Nothing) |> Seq.toList
match res |> Seq.exists(fun i -> i.Id = e.Id) with
| true ->
match e.Action with
| EventAction.Nothing -> res|>Seq.filter(fun i -> i.Id <> e.Id) |> Seq.toList
| EventAction.Something -> res
| false -> e::res
let result =
mergedStream
|> Observable.scan(filter) []
|> Observable.map(fun e -> e |> Seq.filter(fun i -> i.Action= EventAction.Something))
|> Observable.subscribe(fun e -> printfn "result: %A" e)
t.Start()
t2.Start()
t.Stop()
t2.Stop()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment