Last active
August 29, 2015 14:09
-
-
Save eulerfx/2bcfd5e0ef67d4ae330e to your computer and use it in GitHub Desktop.
Push sequences with F# Hopac
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
namespace Marvel.Hopac | |
open System | |
open Hopac | |
open Hopac.Extra | |
open Hopac.Job.Infixes | |
/// An effectful observer. | |
type RxObs<'a> = 'a option -> Job<unit> | |
module RxObs = | |
let inline post a (obs:RxObs<'a>) = obs (Some a) | |
let inline stop (obs:RxObs<'a>) = obs None | |
let contramap (f:'b -> 'a) (o:RxObs<'a>) : RxObs<'b> = | |
o << Option.map f | |
let contramapJob (f:'b -> Job<'a>) (o:RxObs<'a>) : RxObs<'b> = | |
function | |
| Some b -> f b >>= (Some >> o) | |
| None -> o None | |
let filterJob (f:'a -> Job<bool>) (o:RxObs<'a>) : RxObs<'a> = | |
function | |
| Some a as x -> | |
f a >>= function | |
| true -> o x | |
| false -> Job.unit() | |
| None -> o None | |
let skipWhileJob (f:'a -> Job<bool>) (o:RxObs<'a>) : RxObs<'a> = | |
let skipped = ref false | |
function | |
| Some a -> job { | |
if !skipped then do! o (Some a) | |
else | |
let! skip = f a | |
if not skip then | |
skipped := true | |
do! o (Some a) } | |
| None -> o None | |
let contrachooseJob (f:'b -> Job<'a option>) (o:RxObs<'a>) : RxObs<'b> = | |
function | |
| Some b -> f b >>= o | |
| None -> o None | |
let ofSync (obs:IObserver<'a>) : RxObs<'a> = | |
function | |
| Some a -> obs.OnNext a ; Job.unit() | |
| None -> obs.OnCompleted() ; Job.unit() | |
let mergeCon (o1:RxObs<'a>) (o2:RxObs<'a>) : RxObs<'a> = | |
fun a -> Job.conIgnore [ o1 a ; o2 a ] | |
let mergeSeq (o1:RxObs<'a>) (o2:RxObs<'a>) : RxObs<'a> = | |
fun a -> Job.seqIgnore [ o1 a ; o2 a ] | |
/// Creates a syncrhonous observer which schedules notifcations to a woker queue. | |
// let toSync maxQueueSize workers (obs:RxJob<'a>) : System.IObserver<'a> = | |
// let post,stop = Async.toSyncStop maxQueueSize workers obs | |
// { new System.IObserver<'a> with | |
// member __.OnCompleted() = post None ; stop() | |
// member __.OnNext a = post (Some a) | |
// member __.OnError _ = post None } | |
/// An observable stream represented as a subscription function which pushes values to the observer. | |
type Rx<'a> = RxObs<'a> -> Job<unit> | |
module Rx = | |
let inline create o : Rx<'a> = o | |
let inline add (o:Rx<'a>) (obs:RxObs<'a>) = o obs | |
let inline subs (f:'a -> Job<unit>) (o:Rx<'a>) = add o <| function Some a -> f a | None -> Job.unit() | |
let empty<'a> : Rx<'a> = | |
create <| fun obs -> obs None | |
let singleton a : Rx<'a> = | |
create <| fun obs -> | |
obs (Some a) >>. obs None | |
let delay (f:unit -> Rx<'a>) : Rx<'a> = | |
create <| fun obs -> f() obs | |
let interval (i:TimeSpan) : Rx<unit> = | |
let x = Some() | |
create <| fun (obs:RxObs<unit>) -> | |
let go = Job.delay <| fun() -> | |
Timer.Global.sleep i >>. obs x | |
Job.foreverServer go | |
let map (f:'a -> 'b) (o:Rx<'a>) : Rx<'b> = | |
o << RxObs.contramap f | |
let mapJob (f:'a -> Job<'b>) (o:Rx<'a>) : Rx<'b> = | |
o << RxObs.contramapJob f | |
let join (oo:Rx<Rx<'a>>) : Rx<'a> = | |
create <| fun (obs:RxObs<'a>) -> | |
add oo <| function | |
| None -> obs None | |
| Some o -> o obs | |
let bind (f:'a -> Rx<'b>) (o:Rx<'a>) : Rx<'b> = | |
map f o |> join | |
let bindJob (f:'a -> Rx<'b>) (a:Job<'a>) : Rx<'b> = | |
create <| fun (obs:RxObs<'b>) -> a >>= fun a -> f a obs | |
let filterJob (f:'a -> Job<bool>) (o:Rx<'a>) : Rx<'a> = | |
create <| fun (obs:RxObs<'a>) -> | |
add o <| RxObs.filterJob f obs | |
let chooseJob (f:'a -> Job<'b option>) (o:Rx<'a>) : Rx<'b> = | |
create <| fun (obs:RxObs<'b>) -> | |
add o <| RxObs.contrachooseJob f obs | |
let scanJob (f:'b -> 'a -> Job<'b>) (b:'b) (o:Rx<'a>) : Rx<'b> = | |
fun (obs:RxObs<'b>) -> | |
MVar.createFull b >>= fun mv -> | |
add o <| function | |
| None -> obs None | |
| Some a -> | |
MVar.take mv >>= fun b -> f b a >>= MVar.fill mv >>. obs (Some b) | |
let last (o:Rx<'a>) : Job<'a option> = | |
MVar.createFull None >>= fun mv -> | |
IVar.create() >>= fun last -> | |
add o <| function | |
| Some _ as a -> MVar.fill mv a | |
| None -> MVar.take mv >>= IVar.fill last | |
>>. IVar.read last | |
let first (rx:Rx<'a>) : Job<'a option> = | |
IVar.create() >>= fun first -> | |
add rx <| function | |
| Some _ as a -> IVar.tryFill first a | |
| None -> IVar.tryFill first (None) | |
>>. IVar.read first | |
let merge (rx1:Rx<'a>) (rx2:Rx<'a>) : Rx<'a> = | |
fun (obs:RxObs<'a>) -> | |
rx1 obs <*> rx2 obs >>% () | |
let ofHopacSeq (s:HopacSeq<'a>) : Rx<'a> = | |
create <| fun (obs:RxObs<'a>) -> | |
HopacSeq.iterJob (Some >> obs) s >>. obs None |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment