Skip to content

Instantly share code, notes, and snippets.

@eulerfx
Last active August 29, 2015 14:09
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save eulerfx/1ad2ecad0b1c64804914 to your computer and use it in GitHub Desktop.
Save eulerfx/1ad2ecad0b1c64804914 to your computer and use it in GitHub Desktop.
Hopac-based seq merge
namespace Marvel.Hopac
open Hopac
open Hopac.Extra
open Hopac.Job.Infixes
open Hopac.Alt.Infixes
/// Different representation of EagerSeq<'a>
type HopacSeq<'a> = Job<HopacSeqStep<'a>>
and HopacSeqStep<'a> =
| Emit of 'a * HopacSeq<'a>
| Halt
module HopacSeq =
let empty<'a> : HopacSeq<'a> =
Halt |> Job.result
let emitOne a : HopacSeq<'a> =
Emit (a,empty) |> Job.result
let ofJob (j:Job<'a>) : HopacSeq<'a> =
j |>> fun a -> Emit (a, empty)
let rec map (f:'a -> 'b) (s:HopacSeq<'a>) : HopacSeq<'b> =
s |>> function
| Halt -> Halt
| Emit (a,tail) -> Emit (f a, map f tail)
let rec mapJob (f:'a -> Job<'b>) (s:HopacSeq<'a>) : HopacSeq<'b> =
s >>= function
| Halt -> empty
| Emit (a,tail) ->
f a |>> fun b -> Emit (b, mapJob f tail)
let mapi (f:int -> 'a -> 'b) (s:HopacSeq<'a>) : HopacSeq<'b> =
let rec go i a =
a |>> function
| Halt -> Halt
| Emit (a,tail) -> Emit (f i a, go (i + 1) tail)
go 0 s
let indexed (s:HopacSeq<'a>) : HopacSeq<int * 'a> =
mapi (fun i a -> i,a) s
let rec append (s1:HopacSeq<'a>) (s2:HopacSeq<'a>) : HopacSeq<'a> =
s1 >>= function
| Halt -> s2
| Emit (a,tail) -> Emit (a, append tail s2) |> Job.result
let rec collectJob (f:'a -> HopacSeq<'b>) (s:HopacSeq<'a>) : HopacSeq<'b> =
s >>= function
| Halt -> empty
| Emit (a,tail) -> append (f a) (collectJob f tail)
let rec scanJob (f:'b -> 'a -> Job<'b>) (b:'b) (s:HopacSeq<'a>) : HopacSeq<'b> =
s >>= function
| Halt -> empty
| Emit (a,tail) -> f b a |>> fun b -> Emit (b, scanJob f b tail)
let rec foldJob (f:'b -> 'a -> Job<'b>) (b:'b) (s:HopacSeq<'a>) : Job<'b> =
s >>= function
| Halt -> b |> Job.result
| Emit (a,tail) -> f b a >>= fun b -> foldJob f b tail
let rec iterJob (f:'a -> Job<unit>) (s:HopacSeq<'a>) : Job<unit> =
s >>= function
| Halt -> Job.unit()
| Emit (a,tail) -> f a >>. iterJob f tail
let rec iter (f:'a -> unit) (s:HopacSeq<'a>) : Job<unit> =
s >>= function
| Halt -> Job.unit()
| Emit (a,tail) -> f a ; iter f tail
let first (s:HopacSeq<'a>) : Job<'a option> =
s |>> function
| Halt -> None
| Emit (a,_) -> Some a
let last (s:HopacSeq<'a>) : Job<'a option> =
let rec loop last s =
s >>= function
| Halt -> last |> Job.result
| Emit (a,tail) -> loop (Some a) tail
loop None s
let rec filterJob (f:'a -> Job<bool>) (s:HopacSeq<'a>) : HopacSeq<'a> =
s >>= function
| Halt -> Halt |> Job.result
| Emit (a,tail) ->
f a >>= fun b ->
if b then Emit (a,filterJob f tail) |> Job.result
else filterJob f tail
let rec chooseJob (f:'a -> Job<'b option>) (s:HopacSeq<'a>) : HopacSeq<'b> =
s >>= function
| Halt -> Halt |> Job.result
| Emit (a,tail) ->
f a >>= function
| Some b -> Emit (b, chooseJob f tail) |> Job.result
| None -> chooseJob f tail
let rec skip (count:int) (s:HopacSeq<'a>) : HopacSeq<'a> =
if count = 0 then s
else
s >>= function
| Halt -> Halt |> Job.result
| Emit (_,tail) -> skip (count - 1) tail
let rec skipWhileJob (p:'a -> Job<bool>) (s:HopacSeq<'a>) : HopacSeq<'a> =
s >>= function
| Halt -> empty
| Emit (a,tail) as s ->
p a >>= fun b ->
if b then skipWhileJob p tail
else s |> Job.result
let rec take (count:int) (s:HopacSeq<'a>) : HopacSeq<'a> =
if count = 0 then empty
else
s |>> function
| Halt -> Halt
| Emit (a,tail) ->
Emit (a, take (count - 1) tail)
let rec takeWhileJob (p:'a -> Job<bool>) (s:HopacSeq<'a>) : HopacSeq<'a> =
s >>= function
| Halt -> empty
| Emit (a,tail) ->
p a |>> fun b ->
if b then Emit (a, takeWhileJob p tail)
else Halt
let rec unfoldJob (f:'s -> Job<('a * 's) option>) (s:'s) : HopacSeq<'a> =
f s |>> function
| Some (a,s) -> Emit (a, unfoldJob f s)
| None -> Halt
let buffer (size:int) (s:HopacSeq<'a>) : HopacSeq<'a list> =
let rec loop (buffer:'a list) (s:HopacSeq<'a>) =
if (buffer.Length = size) then Emit (buffer |> List.rev, loop [] s) |> Job.result
else
s >>= function
| Halt ->
if buffer.Length > 0 then Emit (buffer |> List.rev, empty) |> Job.result
else empty
| Emit (a,tail) ->
loop (a::buffer) tail
loop [] s
let rec ofSeq (s:seq<'a>) : HopacSeq<'a> =
Job.using (s.GetEnumerator()) <| fun enum ->
let rec loop() =
if enum.MoveNext() then Emit (enum.Current, loop()) |> Job.result
else empty
loop()
// what is optimal way to perform non-deterministic merge?
let rec ofMVar (mv:MVar<'a option>) : HopacSeq<'a> =
MVar.take mv
|>> function
| Some a -> Emit (a, ofMVar mv)
| None -> Halt
let toMVar (s:HopacSeq<'a>) : Job<MVar<'a option>> =
MVar.create() >>= fun mv ->
let rec loop (a:HopacSeq<'a>) =
a >>= function
| Emit (a,tail) -> MVar.fill mv (Some a) >>. loop tail
| Halt -> MVar.fill mv None
Job.start (loop s) >>% mv
let private mergeMVar (mv1:MVar<'a>) (mv2:MVar<'a>) : Job<MVar<'a>> =
MVar.create() >>= fun mvOut ->
let go mv = Job.delay <| fun() ->
MVar.take mv >>= MVar.fill mvOut
Job.foreverServer (go mv1) >>. Job.foreverServer (go mv2) >>% mvOut
let rec merge (a:HopacSeq<'a>) (b:HopacSeq<'a>) : HopacSeq<'a> =
toMVar a >>= fun a -> toMVar b >>= mergeMVar a >>= ofMVar
@polytypic
Copy link

Basically yes. I'd say that the other result is just not thrown away. While the merge is being run lazily, each started promise is kept around (not thrown away) until the promise becomes determined.

@eulerfx
Copy link
Author

eulerfx commented Nov 16, 2014

Thanks! I now have much clearer understanding of the similarities and differences between the Hopac/CML model and F# async.

@polytypic
Copy link

No problem! Actually, thinking about this, I think I should reintroduce lazy promises, which make it convenient to memoize jobs. You could then easily write a memoize: HopacSeq<'x> -> HopacSeq<'x> function. Looking at the old code for lazy promises, it seems like I had a performance bug (a totally unnecessary allocation) in the old implementation...

@polytypic
Copy link

FYI, I reintroduced lazy promises to Hopac. I haven't yet released binaries. I will likely not do that in a couple of days. With lazy promises you could implement memoize as follows:

let rec memoize (xs: HopacSeq<'x>) =
  (xs |>> function
      | Halt -> Halt
      | Emit (x, xs) -> Emit (x, memoize xs))
  |> Promise.Now.delay :> HopacSeq<_>

Also, here is a bit more concise version of merge:

let rec mergeEither aP bP =
  mergeFirst aP bP <|> mergeFirst bP aP
and mergeFirst aP bP =
  (aP >>=? function
    | Halt -> upcast bP
    | Emit (x, a) ->
      Job.result (Emit (x, mergeEither bP (Promise.Now.delay a))))
let merge (a: HopacSeq<'a>) (b: HopacSeq<'a>) : HopacSeq<'a> =
  mergeEither (Promise.Now.delay a) (Promise.Now.delay b)

Look at the last line of the mergeFirst function above. This version intentionally swaps the a and b streams so that the stream that didn't manage to produce a value is given "priority" (evaluated on the left hand side of <|>) on the next round.

@polytypic
Copy link

One more thing. I haven't looked carefully through all of the HopacSeq combinators, but I happened to notice that the ofSeq combinator

  let rec ofSeq (s:seq<'a>) : HopacSeq<'a> =
    Job.using (s.GetEnumerator()) <| fun enum ->
      let rec loop() =
        if enum.MoveNext() then Emit (enum.Current, loop()) |> Job.result
        else empty
      loop()

doesn't quite work correctly. What happens is that the enum is disposed once the first result is produced.

@polytypic
Copy link

Actually your ofSeq work correctly as it goes through the sequence eagerly. Sorry about the false alarm!

Here is a lazy version:

  type Disposer<'x when 'x :> IDisposable> =
    val Value: 'x
    interface IDisposable with
      override this.Dispose () =
        this.Value.Dispose ()
        GC.SuppressFinalize this
    override this.Finalize () =
      this.Value.Dispose ()
    new (value: 'x) = {Value = value}

  let ofSeq (s:seq<'a>) : HopacSeq<'a> = Job.delay <| fun () ->
    let enum = new Disposer<_>(s.GetEnumerator ())
    let rec loop () = Job.thunk <| fun () ->
      if enum.Value.MoveNext () then
        Emit (enum.Value.Current, loop ())
      else
        (enum :> IDisposable).Dispose ()
        Halt
    loop ()

@polytypic
Copy link

Heh... In fact, my lazy version has a bug. Here is a correct version:

  let ofSeq (s:seq<'a>) : HopacSeq<'a> = Job.delay <| fun () ->
    let enum = new Disposer<_>(s.GetEnumerator ())
    let rec loop () = Promise.Now.delayAsJob << Job.thunk <| fun () ->
      if enum.Value.MoveNext () then
        Emit (enum.Value.Current, loop ())
      else
        (enum :> IDisposable).Dispose ()
        Halt
    loop ()

The crucial difference being is that now it is made sure that the enumerator is used sequentially.

At any rate, thanks for asking about these sequences! Inspired to make improvements to Hopac!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment