Last active
August 29, 2015 14:09
-
-
Save eulerfx/1ad2ecad0b1c64804914 to your computer and use it in GitHub Desktop.
Hopac-based seq merge
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
namespace Marvel.Hopac | |
open Hopac | |
open Hopac.Extra | |
open Hopac.Job.Infixes | |
open Hopac.Alt.Infixes | |
/// Different representation of EagerSeq<'a> | |
type HopacSeq<'a> = Job<HopacSeqStep<'a>> | |
and HopacSeqStep<'a> = | |
| Emit of 'a * HopacSeq<'a> | |
| Halt | |
module HopacSeq = | |
let empty<'a> : HopacSeq<'a> = | |
Halt |> Job.result | |
let emitOne a : HopacSeq<'a> = | |
Emit (a,empty) |> Job.result | |
let ofJob (j:Job<'a>) : HopacSeq<'a> = | |
j |>> fun a -> Emit (a, empty) | |
let rec map (f:'a -> 'b) (s:HopacSeq<'a>) : HopacSeq<'b> = | |
s |>> function | |
| Halt -> Halt | |
| Emit (a,tail) -> Emit (f a, map f tail) | |
let rec mapJob (f:'a -> Job<'b>) (s:HopacSeq<'a>) : HopacSeq<'b> = | |
s >>= function | |
| Halt -> empty | |
| Emit (a,tail) -> | |
f a |>> fun b -> Emit (b, mapJob f tail) | |
let mapi (f:int -> 'a -> 'b) (s:HopacSeq<'a>) : HopacSeq<'b> = | |
let rec go i a = | |
a |>> function | |
| Halt -> Halt | |
| Emit (a,tail) -> Emit (f i a, go (i + 1) tail) | |
go 0 s | |
let indexed (s:HopacSeq<'a>) : HopacSeq<int * 'a> = | |
mapi (fun i a -> i,a) s | |
let rec append (s1:HopacSeq<'a>) (s2:HopacSeq<'a>) : HopacSeq<'a> = | |
s1 >>= function | |
| Halt -> s2 | |
| Emit (a,tail) -> Emit (a, append tail s2) |> Job.result | |
let rec collectJob (f:'a -> HopacSeq<'b>) (s:HopacSeq<'a>) : HopacSeq<'b> = | |
s >>= function | |
| Halt -> empty | |
| Emit (a,tail) -> append (f a) (collectJob f tail) | |
let rec scanJob (f:'b -> 'a -> Job<'b>) (b:'b) (s:HopacSeq<'a>) : HopacSeq<'b> = | |
s >>= function | |
| Halt -> empty | |
| Emit (a,tail) -> f b a |>> fun b -> Emit (b, scanJob f b tail) | |
let rec foldJob (f:'b -> 'a -> Job<'b>) (b:'b) (s:HopacSeq<'a>) : Job<'b> = | |
s >>= function | |
| Halt -> b |> Job.result | |
| Emit (a,tail) -> f b a >>= fun b -> foldJob f b tail | |
let rec iterJob (f:'a -> Job<unit>) (s:HopacSeq<'a>) : Job<unit> = | |
s >>= function | |
| Halt -> Job.unit() | |
| Emit (a,tail) -> f a >>. iterJob f tail | |
let rec iter (f:'a -> unit) (s:HopacSeq<'a>) : Job<unit> = | |
s >>= function | |
| Halt -> Job.unit() | |
| Emit (a,tail) -> f a ; iter f tail | |
let first (s:HopacSeq<'a>) : Job<'a option> = | |
s |>> function | |
| Halt -> None | |
| Emit (a,_) -> Some a | |
let last (s:HopacSeq<'a>) : Job<'a option> = | |
let rec loop last s = | |
s >>= function | |
| Halt -> last |> Job.result | |
| Emit (a,tail) -> loop (Some a) tail | |
loop None s | |
let rec filterJob (f:'a -> Job<bool>) (s:HopacSeq<'a>) : HopacSeq<'a> = | |
s >>= function | |
| Halt -> Halt |> Job.result | |
| Emit (a,tail) -> | |
f a >>= fun b -> | |
if b then Emit (a,filterJob f tail) |> Job.result | |
else filterJob f tail | |
let rec chooseJob (f:'a -> Job<'b option>) (s:HopacSeq<'a>) : HopacSeq<'b> = | |
s >>= function | |
| Halt -> Halt |> Job.result | |
| Emit (a,tail) -> | |
f a >>= function | |
| Some b -> Emit (b, chooseJob f tail) |> Job.result | |
| None -> chooseJob f tail | |
let rec skip (count:int) (s:HopacSeq<'a>) : HopacSeq<'a> = | |
if count = 0 then s | |
else | |
s >>= function | |
| Halt -> Halt |> Job.result | |
| Emit (_,tail) -> skip (count - 1) tail | |
let rec skipWhileJob (p:'a -> Job<bool>) (s:HopacSeq<'a>) : HopacSeq<'a> = | |
s >>= function | |
| Halt -> empty | |
| Emit (a,tail) as s -> | |
p a >>= fun b -> | |
if b then skipWhileJob p tail | |
else s |> Job.result | |
let rec take (count:int) (s:HopacSeq<'a>) : HopacSeq<'a> = | |
if count = 0 then empty | |
else | |
s |>> function | |
| Halt -> Halt | |
| Emit (a,tail) -> | |
Emit (a, take (count - 1) tail) | |
let rec takeWhileJob (p:'a -> Job<bool>) (s:HopacSeq<'a>) : HopacSeq<'a> = | |
s >>= function | |
| Halt -> empty | |
| Emit (a,tail) -> | |
p a |>> fun b -> | |
if b then Emit (a, takeWhileJob p tail) | |
else Halt | |
let rec unfoldJob (f:'s -> Job<('a * 's) option>) (s:'s) : HopacSeq<'a> = | |
f s |>> function | |
| Some (a,s) -> Emit (a, unfoldJob f s) | |
| None -> Halt | |
let buffer (size:int) (s:HopacSeq<'a>) : HopacSeq<'a list> = | |
let rec loop (buffer:'a list) (s:HopacSeq<'a>) = | |
if (buffer.Length = size) then Emit (buffer |> List.rev, loop [] s) |> Job.result | |
else | |
s >>= function | |
| Halt -> | |
if buffer.Length > 0 then Emit (buffer |> List.rev, empty) |> Job.result | |
else empty | |
| Emit (a,tail) -> | |
loop (a::buffer) tail | |
loop [] s | |
let rec ofSeq (s:seq<'a>) : HopacSeq<'a> = | |
Job.using (s.GetEnumerator()) <| fun enum -> | |
let rec loop() = | |
if enum.MoveNext() then Emit (enum.Current, loop()) |> Job.result | |
else empty | |
loop() | |
// what is optimal way to perform non-deterministic merge? | |
let rec ofMVar (mv:MVar<'a option>) : HopacSeq<'a> = | |
MVar.take mv | |
|>> function | |
| Some a -> Emit (a, ofMVar mv) | |
| None -> Halt | |
let toMVar (s:HopacSeq<'a>) : Job<MVar<'a option>> = | |
MVar.create() >>= fun mv -> | |
let rec loop (a:HopacSeq<'a>) = | |
a >>= function | |
| Emit (a,tail) -> MVar.fill mv (Some a) >>. loop tail | |
| Halt -> MVar.fill mv None | |
Job.start (loop s) >>% mv | |
let private mergeMVar (mv1:MVar<'a>) (mv2:MVar<'a>) : Job<MVar<'a>> = | |
MVar.create() >>= fun mvOut -> | |
let go mv = Job.delay <| fun() -> | |
MVar.take mv >>= MVar.fill mvOut | |
Job.foreverServer (go mv1) >>. Job.foreverServer (go mv2) >>% mvOut | |
let rec merge (a:HopacSeq<'a>) (b:HopacSeq<'a>) : HopacSeq<'a> = | |
toMVar a >>= fun a -> toMVar b >>= mergeMVar a >>= ofMVar | |
Heh... In fact, my lazy version has a bug. Here is a correct version:
let ofSeq (s:seq<'a>) : HopacSeq<'a> = Job.delay <| fun () ->
let enum = new Disposer<_>(s.GetEnumerator ())
let rec loop () = Promise.Now.delayAsJob << Job.thunk <| fun () ->
if enum.Value.MoveNext () then
Emit (enum.Value.Current, loop ())
else
(enum :> IDisposable).Dispose ()
Halt
loop ()
The crucial difference being is that now it is made sure that the enumerator is used sequentially.
At any rate, thanks for asking about these sequences! Inspired to make improvements to Hopac!
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Actually your
ofSeq
work correctly as it goes through the sequence eagerly. Sorry about the false alarm!Here is a lazy version: