Last active
August 29, 2015 14:09
-
-
Save eulerfx/1ad2ecad0b1c64804914 to your computer and use it in GitHub Desktop.
Hopac-based seq merge
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
namespace Marvel.Hopac | |
open Hopac | |
open Hopac.Extra | |
open Hopac.Job.Infixes | |
open Hopac.Alt.Infixes | |
/// Different representation of EagerSeq<'a> | |
type HopacSeq<'a> = Job<HopacSeqStep<'a>> | |
and HopacSeqStep<'a> = | |
| Emit of 'a * HopacSeq<'a> | |
| Halt | |
module HopacSeq = | |
let empty<'a> : HopacSeq<'a> = | |
Halt |> Job.result | |
let emitOne a : HopacSeq<'a> = | |
Emit (a,empty) |> Job.result | |
let ofJob (j:Job<'a>) : HopacSeq<'a> = | |
j |>> fun a -> Emit (a, empty) | |
let rec map (f:'a -> 'b) (s:HopacSeq<'a>) : HopacSeq<'b> = | |
s |>> function | |
| Halt -> Halt | |
| Emit (a,tail) -> Emit (f a, map f tail) | |
let rec mapJob (f:'a -> Job<'b>) (s:HopacSeq<'a>) : HopacSeq<'b> = | |
s >>= function | |
| Halt -> empty | |
| Emit (a,tail) -> | |
f a |>> fun b -> Emit (b, mapJob f tail) | |
let mapi (f:int -> 'a -> 'b) (s:HopacSeq<'a>) : HopacSeq<'b> = | |
let rec go i a = | |
a |>> function | |
| Halt -> Halt | |
| Emit (a,tail) -> Emit (f i a, go (i + 1) tail) | |
go 0 s | |
let indexed (s:HopacSeq<'a>) : HopacSeq<int * 'a> = | |
mapi (fun i a -> i,a) s | |
let rec append (s1:HopacSeq<'a>) (s2:HopacSeq<'a>) : HopacSeq<'a> = | |
s1 >>= function | |
| Halt -> s2 | |
| Emit (a,tail) -> Emit (a, append tail s2) |> Job.result | |
let rec collectJob (f:'a -> HopacSeq<'b>) (s:HopacSeq<'a>) : HopacSeq<'b> = | |
s >>= function | |
| Halt -> empty | |
| Emit (a,tail) -> append (f a) (collectJob f tail) | |
let rec scanJob (f:'b -> 'a -> Job<'b>) (b:'b) (s:HopacSeq<'a>) : HopacSeq<'b> = | |
s >>= function | |
| Halt -> empty | |
| Emit (a,tail) -> f b a |>> fun b -> Emit (b, scanJob f b tail) | |
let rec foldJob (f:'b -> 'a -> Job<'b>) (b:'b) (s:HopacSeq<'a>) : Job<'b> = | |
s >>= function | |
| Halt -> b |> Job.result | |
| Emit (a,tail) -> f b a >>= fun b -> foldJob f b tail | |
let rec iterJob (f:'a -> Job<unit>) (s:HopacSeq<'a>) : Job<unit> = | |
s >>= function | |
| Halt -> Job.unit() | |
| Emit (a,tail) -> f a >>. iterJob f tail | |
let rec iter (f:'a -> unit) (s:HopacSeq<'a>) : Job<unit> = | |
s >>= function | |
| Halt -> Job.unit() | |
| Emit (a,tail) -> f a ; iter f tail | |
let first (s:HopacSeq<'a>) : Job<'a option> = | |
s |>> function | |
| Halt -> None | |
| Emit (a,_) -> Some a | |
let last (s:HopacSeq<'a>) : Job<'a option> = | |
let rec loop last s = | |
s >>= function | |
| Halt -> last |> Job.result | |
| Emit (a,tail) -> loop (Some a) tail | |
loop None s | |
let rec filterJob (f:'a -> Job<bool>) (s:HopacSeq<'a>) : HopacSeq<'a> = | |
s >>= function | |
| Halt -> Halt |> Job.result | |
| Emit (a,tail) -> | |
f a >>= fun b -> | |
if b then Emit (a,filterJob f tail) |> Job.result | |
else filterJob f tail | |
let rec chooseJob (f:'a -> Job<'b option>) (s:HopacSeq<'a>) : HopacSeq<'b> = | |
s >>= function | |
| Halt -> Halt |> Job.result | |
| Emit (a,tail) -> | |
f a >>= function | |
| Some b -> Emit (b, chooseJob f tail) |> Job.result | |
| None -> chooseJob f tail | |
let rec skip (count:int) (s:HopacSeq<'a>) : HopacSeq<'a> = | |
if count = 0 then s | |
else | |
s >>= function | |
| Halt -> Halt |> Job.result | |
| Emit (_,tail) -> skip (count - 1) tail | |
let rec skipWhileJob (p:'a -> Job<bool>) (s:HopacSeq<'a>) : HopacSeq<'a> = | |
s >>= function | |
| Halt -> empty | |
| Emit (a,tail) as s -> | |
p a >>= fun b -> | |
if b then skipWhileJob p tail | |
else s |> Job.result | |
let rec take (count:int) (s:HopacSeq<'a>) : HopacSeq<'a> = | |
if count = 0 then empty | |
else | |
s |>> function | |
| Halt -> Halt | |
| Emit (a,tail) -> | |
Emit (a, take (count - 1) tail) | |
let rec takeWhileJob (p:'a -> Job<bool>) (s:HopacSeq<'a>) : HopacSeq<'a> = | |
s >>= function | |
| Halt -> empty | |
| Emit (a,tail) -> | |
p a |>> fun b -> | |
if b then Emit (a, takeWhileJob p tail) | |
else Halt | |
let rec unfoldJob (f:'s -> Job<('a * 's) option>) (s:'s) : HopacSeq<'a> = | |
f s |>> function | |
| Some (a,s) -> Emit (a, unfoldJob f s) | |
| None -> Halt | |
let buffer (size:int) (s:HopacSeq<'a>) : HopacSeq<'a list> = | |
let rec loop (buffer:'a list) (s:HopacSeq<'a>) = | |
if (buffer.Length = size) then Emit (buffer |> List.rev, loop [] s) |> Job.result | |
else | |
s >>= function | |
| Halt -> | |
if buffer.Length > 0 then Emit (buffer |> List.rev, empty) |> Job.result | |
else empty | |
| Emit (a,tail) -> | |
loop (a::buffer) tail | |
loop [] s | |
let rec ofSeq (s:seq<'a>) : HopacSeq<'a> = | |
Job.using (s.GetEnumerator()) <| fun enum -> | |
let rec loop() = | |
if enum.MoveNext() then Emit (enum.Current, loop()) |> Job.result | |
else empty | |
loop() | |
// what is optimal way to perform non-deterministic merge? | |
let rec ofMVar (mv:MVar<'a option>) : HopacSeq<'a> = | |
MVar.take mv | |
|>> function | |
| Some a -> Emit (a, ofMVar mv) | |
| None -> Halt | |
let toMVar (s:HopacSeq<'a>) : Job<MVar<'a option>> = | |
MVar.create() >>= fun mv -> | |
let rec loop (a:HopacSeq<'a>) = | |
a >>= function | |
| Emit (a,tail) -> MVar.fill mv (Some a) >>. loop tail | |
| Halt -> MVar.fill mv None | |
Job.start (loop s) >>% mv | |
let private mergeMVar (mv1:MVar<'a>) (mv2:MVar<'a>) : Job<MVar<'a>> = | |
MVar.create() >>= fun mvOut -> | |
let go mv = Job.delay <| fun() -> | |
MVar.take mv >>= MVar.fill mvOut | |
Job.foreverServer (go mv1) >>. Job.foreverServer (go mv2) >>% mvOut | |
let rec merge (a:HopacSeq<'a>) (b:HopacSeq<'a>) : HopacSeq<'a> = | |
toMVar a >>= fun a -> toMVar b >>= mergeMVar a >>= ofMVar | |
One more thing. I haven't looked carefully through all of the HopacSeq
combinators, but I happened to notice that the ofSeq
combinator
let rec ofSeq (s:seq<'a>) : HopacSeq<'a> =
Job.using (s.GetEnumerator()) <| fun enum ->
let rec loop() =
if enum.MoveNext() then Emit (enum.Current, loop()) |> Job.result
else empty
loop()
doesn't quite work correctly. What happens is that the enum
is disposed once the first result is produced.
Actually your ofSeq
work correctly as it goes through the sequence eagerly. Sorry about the false alarm!
Here is a lazy version:
type Disposer<'x when 'x :> IDisposable> =
val Value: 'x
interface IDisposable with
override this.Dispose () =
this.Value.Dispose ()
GC.SuppressFinalize this
override this.Finalize () =
this.Value.Dispose ()
new (value: 'x) = {Value = value}
let ofSeq (s:seq<'a>) : HopacSeq<'a> = Job.delay <| fun () ->
let enum = new Disposer<_>(s.GetEnumerator ())
let rec loop () = Job.thunk <| fun () ->
if enum.Value.MoveNext () then
Emit (enum.Value.Current, loop ())
else
(enum :> IDisposable).Dispose ()
Halt
loop ()
Heh... In fact, my lazy version has a bug. Here is a correct version:
let ofSeq (s:seq<'a>) : HopacSeq<'a> = Job.delay <| fun () ->
let enum = new Disposer<_>(s.GetEnumerator ())
let rec loop () = Promise.Now.delayAsJob << Job.thunk <| fun () ->
if enum.Value.MoveNext () then
Emit (enum.Value.Current, loop ())
else
(enum :> IDisposable).Dispose ()
Halt
loop ()
The crucial difference being is that now it is made sure that the enumerator is used sequentially.
At any rate, thanks for asking about these sequences! Inspired to make improvements to Hopac!
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
FYI, I reintroduced lazy promises to Hopac. I haven't yet released binaries. I will likely not do that in a couple of days. With lazy promises you could implement
memoize
as follows:Also, here is a bit more concise version of
merge
:Look at the last line of the
mergeFirst
function above. This version intentionally swaps thea
andb
streams so that the stream that didn't manage to produce a value is given "priority" (evaluated on the left hand side of<|>
) on the next round.