Skip to content

Instantly share code, notes, and snippets.

@eulerfx
Last active August 29, 2015 14:09
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save eulerfx/2bcfd5e0ef67d4ae330e to your computer and use it in GitHub Desktop.
Save eulerfx/2bcfd5e0ef67d4ae330e to your computer and use it in GitHub Desktop.
Push sequences with F# Hopac
namespace Marvel.Hopac
open System
open Hopac
open Hopac.Extra
open Hopac.Job.Infixes
/// An effectful observer.
type RxObs<'a> = 'a option -> Job<unit>
module RxObs =
let inline post a (obs:RxObs<'a>) = obs (Some a)
let inline stop (obs:RxObs<'a>) = obs None
let contramap (f:'b -> 'a) (o:RxObs<'a>) : RxObs<'b> =
o << Option.map f
let contramapJob (f:'b -> Job<'a>) (o:RxObs<'a>) : RxObs<'b> =
function
| Some b -> f b >>= (Some >> o)
| None -> o None
let filterJob (f:'a -> Job<bool>) (o:RxObs<'a>) : RxObs<'a> =
function
| Some a as x ->
f a >>= function
| true -> o x
| false -> Job.unit()
| None -> o None
let skipWhileJob (f:'a -> Job<bool>) (o:RxObs<'a>) : RxObs<'a> =
let skipped = ref false
function
| Some a -> job {
if !skipped then do! o (Some a)
else
let! skip = f a
if not skip then
skipped := true
do! o (Some a) }
| None -> o None
let contrachooseJob (f:'b -> Job<'a option>) (o:RxObs<'a>) : RxObs<'b> =
function
| Some b -> f b >>= o
| None -> o None
let ofSync (obs:IObserver<'a>) : RxObs<'a> =
function
| Some a -> obs.OnNext a ; Job.unit()
| None -> obs.OnCompleted() ; Job.unit()
let mergeCon (o1:RxObs<'a>) (o2:RxObs<'a>) : RxObs<'a> =
fun a -> Job.conIgnore [ o1 a ; o2 a ]
let mergeSeq (o1:RxObs<'a>) (o2:RxObs<'a>) : RxObs<'a> =
fun a -> Job.seqIgnore [ o1 a ; o2 a ]
/// Creates a syncrhonous observer which schedules notifcations to a woker queue.
// let toSync maxQueueSize workers (obs:RxJob<'a>) : System.IObserver<'a> =
// let post,stop = Async.toSyncStop maxQueueSize workers obs
// { new System.IObserver<'a> with
// member __.OnCompleted() = post None ; stop()
// member __.OnNext a = post (Some a)
// member __.OnError _ = post None }
/// An observable stream represented as a subscription function which pushes values to the observer.
type Rx<'a> = RxObs<'a> -> Job<unit>
module Rx =
let inline create o : Rx<'a> = o
let inline add (o:Rx<'a>) (obs:RxObs<'a>) = o obs
let inline subs (f:'a -> Job<unit>) (o:Rx<'a>) = add o <| function Some a -> f a | None -> Job.unit()
let empty<'a> : Rx<'a> =
create <| fun obs -> obs None
let singleton a : Rx<'a> =
create <| fun obs ->
obs (Some a) >>. obs None
let delay (f:unit -> Rx<'a>) : Rx<'a> =
create <| fun obs -> f() obs
let interval (i:TimeSpan) : Rx<unit> =
let x = Some()
create <| fun (obs:RxObs<unit>) ->
let go = Job.delay <| fun() ->
Timer.Global.sleep i >>. obs x
Job.foreverServer go
let map (f:'a -> 'b) (o:Rx<'a>) : Rx<'b> =
o << RxObs.contramap f
let mapJob (f:'a -> Job<'b>) (o:Rx<'a>) : Rx<'b> =
o << RxObs.contramapJob f
let join (oo:Rx<Rx<'a>>) : Rx<'a> =
create <| fun (obs:RxObs<'a>) ->
add oo <| function
| None -> obs None
| Some o -> o obs
let bind (f:'a -> Rx<'b>) (o:Rx<'a>) : Rx<'b> =
map f o |> join
let bindJob (f:'a -> Rx<'b>) (a:Job<'a>) : Rx<'b> =
create <| fun (obs:RxObs<'b>) -> a >>= fun a -> f a obs
let filterJob (f:'a -> Job<bool>) (o:Rx<'a>) : Rx<'a> =
create <| fun (obs:RxObs<'a>) ->
add o <| RxObs.filterJob f obs
let chooseJob (f:'a -> Job<'b option>) (o:Rx<'a>) : Rx<'b> =
create <| fun (obs:RxObs<'b>) ->
add o <| RxObs.contrachooseJob f obs
let scanJob (f:'b -> 'a -> Job<'b>) (b:'b) (o:Rx<'a>) : Rx<'b> =
fun (obs:RxObs<'b>) ->
MVar.createFull b >>= fun mv ->
add o <| function
| None -> obs None
| Some a ->
MVar.take mv >>= fun b -> f b a >>= MVar.fill mv >>. obs (Some b)
let last (o:Rx<'a>) : Job<'a option> =
MVar.createFull None >>= fun mv ->
IVar.create() >>= fun last ->
add o <| function
| Some _ as a -> MVar.fill mv a
| None -> MVar.take mv >>= IVar.fill last
>>. IVar.read last
let first (rx:Rx<'a>) : Job<'a option> =
IVar.create() >>= fun first ->
add rx <| function
| Some _ as a -> IVar.tryFill first a
| None -> IVar.tryFill first (None)
>>. IVar.read first
let merge (rx1:Rx<'a>) (rx2:Rx<'a>) : Rx<'a> =
fun (obs:RxObs<'a>) ->
rx1 obs <*> rx2 obs >>% ()
let ofHopacSeq (s:HopacSeq<'a>) : Rx<'a> =
create <| fun (obs:RxObs<'a>) ->
HopacSeq.iterJob (Some >> obs) s >>. obs None
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment