Skip to content

Instantly share code, notes, and snippets.

@eulerfx
Last active August 29, 2015 14:09
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save eulerfx/1ad2ecad0b1c64804914 to your computer and use it in GitHub Desktop.
Save eulerfx/1ad2ecad0b1c64804914 to your computer and use it in GitHub Desktop.
Hopac-based seq merge
namespace Marvel.Hopac
open Hopac
open Hopac.Extra
open Hopac.Job.Infixes
open Hopac.Alt.Infixes
/// Different representation of EagerSeq<'a>
type HopacSeq<'a> = Job<HopacSeqStep<'a>>
and HopacSeqStep<'a> =
| Emit of 'a * HopacSeq<'a>
| Halt
module HopacSeq =
let empty<'a> : HopacSeq<'a> =
Halt |> Job.result
let emitOne a : HopacSeq<'a> =
Emit (a,empty) |> Job.result
let ofJob (j:Job<'a>) : HopacSeq<'a> =
j |>> fun a -> Emit (a, empty)
let rec map (f:'a -> 'b) (s:HopacSeq<'a>) : HopacSeq<'b> =
s |>> function
| Halt -> Halt
| Emit (a,tail) -> Emit (f a, map f tail)
let rec mapJob (f:'a -> Job<'b>) (s:HopacSeq<'a>) : HopacSeq<'b> =
s >>= function
| Halt -> empty
| Emit (a,tail) ->
f a |>> fun b -> Emit (b, mapJob f tail)
let mapi (f:int -> 'a -> 'b) (s:HopacSeq<'a>) : HopacSeq<'b> =
let rec go i a =
a |>> function
| Halt -> Halt
| Emit (a,tail) -> Emit (f i a, go (i + 1) tail)
go 0 s
let indexed (s:HopacSeq<'a>) : HopacSeq<int * 'a> =
mapi (fun i a -> i,a) s
let rec append (s1:HopacSeq<'a>) (s2:HopacSeq<'a>) : HopacSeq<'a> =
s1 >>= function
| Halt -> s2
| Emit (a,tail) -> Emit (a, append tail s2) |> Job.result
let rec collectJob (f:'a -> HopacSeq<'b>) (s:HopacSeq<'a>) : HopacSeq<'b> =
s >>= function
| Halt -> empty
| Emit (a,tail) -> append (f a) (collectJob f tail)
let rec scanJob (f:'b -> 'a -> Job<'b>) (b:'b) (s:HopacSeq<'a>) : HopacSeq<'b> =
s >>= function
| Halt -> empty
| Emit (a,tail) -> f b a |>> fun b -> Emit (b, scanJob f b tail)
let rec foldJob (f:'b -> 'a -> Job<'b>) (b:'b) (s:HopacSeq<'a>) : Job<'b> =
s >>= function
| Halt -> b |> Job.result
| Emit (a,tail) -> f b a >>= fun b -> foldJob f b tail
let rec iterJob (f:'a -> Job<unit>) (s:HopacSeq<'a>) : Job<unit> =
s >>= function
| Halt -> Job.unit()
| Emit (a,tail) -> f a >>. iterJob f tail
let rec iter (f:'a -> unit) (s:HopacSeq<'a>) : Job<unit> =
s >>= function
| Halt -> Job.unit()
| Emit (a,tail) -> f a ; iter f tail
let first (s:HopacSeq<'a>) : Job<'a option> =
s |>> function
| Halt -> None
| Emit (a,_) -> Some a
let last (s:HopacSeq<'a>) : Job<'a option> =
let rec loop last s =
s >>= function
| Halt -> last |> Job.result
| Emit (a,tail) -> loop (Some a) tail
loop None s
let rec filterJob (f:'a -> Job<bool>) (s:HopacSeq<'a>) : HopacSeq<'a> =
s >>= function
| Halt -> Halt |> Job.result
| Emit (a,tail) ->
f a >>= fun b ->
if b then Emit (a,filterJob f tail) |> Job.result
else filterJob f tail
let rec chooseJob (f:'a -> Job<'b option>) (s:HopacSeq<'a>) : HopacSeq<'b> =
s >>= function
| Halt -> Halt |> Job.result
| Emit (a,tail) ->
f a >>= function
| Some b -> Emit (b, chooseJob f tail) |> Job.result
| None -> chooseJob f tail
let rec skip (count:int) (s:HopacSeq<'a>) : HopacSeq<'a> =
if count = 0 then s
else
s >>= function
| Halt -> Halt |> Job.result
| Emit (_,tail) -> skip (count - 1) tail
let rec skipWhileJob (p:'a -> Job<bool>) (s:HopacSeq<'a>) : HopacSeq<'a> =
s >>= function
| Halt -> empty
| Emit (a,tail) as s ->
p a >>= fun b ->
if b then skipWhileJob p tail
else s |> Job.result
let rec take (count:int) (s:HopacSeq<'a>) : HopacSeq<'a> =
if count = 0 then empty
else
s |>> function
| Halt -> Halt
| Emit (a,tail) ->
Emit (a, take (count - 1) tail)
let rec takeWhileJob (p:'a -> Job<bool>) (s:HopacSeq<'a>) : HopacSeq<'a> =
s >>= function
| Halt -> empty
| Emit (a,tail) ->
p a |>> fun b ->
if b then Emit (a, takeWhileJob p tail)
else Halt
let rec unfoldJob (f:'s -> Job<('a * 's) option>) (s:'s) : HopacSeq<'a> =
f s |>> function
| Some (a,s) -> Emit (a, unfoldJob f s)
| None -> Halt
let buffer (size:int) (s:HopacSeq<'a>) : HopacSeq<'a list> =
let rec loop (buffer:'a list) (s:HopacSeq<'a>) =
if (buffer.Length = size) then Emit (buffer |> List.rev, loop [] s) |> Job.result
else
s >>= function
| Halt ->
if buffer.Length > 0 then Emit (buffer |> List.rev, empty) |> Job.result
else empty
| Emit (a,tail) ->
loop (a::buffer) tail
loop [] s
let rec ofSeq (s:seq<'a>) : HopacSeq<'a> =
Job.using (s.GetEnumerator()) <| fun enum ->
let rec loop() =
if enum.MoveNext() then Emit (enum.Current, loop()) |> Job.result
else empty
loop()
// what is optimal way to perform non-deterministic merge?
let rec ofMVar (mv:MVar<'a option>) : HopacSeq<'a> =
MVar.take mv
|>> function
| Some a -> Emit (a, ofMVar mv)
| None -> Halt
let toMVar (s:HopacSeq<'a>) : Job<MVar<'a option>> =
MVar.create() >>= fun mv ->
let rec loop (a:HopacSeq<'a>) =
a >>= function
| Emit (a,tail) -> MVar.fill mv (Some a) >>. loop tail
| Halt -> MVar.fill mv None
Job.start (loop s) >>% mv
let private mergeMVar (mv1:MVar<'a>) (mv2:MVar<'a>) : Job<MVar<'a>> =
MVar.create() >>= fun mvOut ->
let go mv = Job.delay <| fun() ->
MVar.take mv >>= MVar.fill mvOut
Job.foreverServer (go mv1) >>. Job.foreverServer (go mv2) >>% mvOut
let rec merge (a:HopacSeq<'a>) (b:HopacSeq<'a>) : HopacSeq<'a> =
toMVar a >>= fun a -> toMVar b >>= mergeMVar a >>= ofMVar
@polytypic
Copy link

Heh... In fact, my lazy version has a bug. Here is a correct version:

  let ofSeq (s:seq<'a>) : HopacSeq<'a> = Job.delay <| fun () ->
    let enum = new Disposer<_>(s.GetEnumerator ())
    let rec loop () = Promise.Now.delayAsJob << Job.thunk <| fun () ->
      if enum.Value.MoveNext () then
        Emit (enum.Value.Current, loop ())
      else
        (enum :> IDisposable).Dispose ()
        Halt
    loop ()

The crucial difference being is that now it is made sure that the enumerator is used sequentially.

At any rate, thanks for asking about these sequences! Inspired to make improvements to Hopac!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment