Skip to content

Instantly share code, notes, and snippets.

@eulerfx
Last active August 29, 2015 14:09
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save eulerfx/1ad2ecad0b1c64804914 to your computer and use it in GitHub Desktop.
Save eulerfx/1ad2ecad0b1c64804914 to your computer and use it in GitHub Desktop.
Hopac-based seq merge
namespace Marvel.Hopac
open Hopac
open Hopac.Extra
open Hopac.Job.Infixes
open Hopac.Alt.Infixes
/// Different representation of EagerSeq<'a>
type HopacSeq<'a> = Job<HopacSeqStep<'a>>
and HopacSeqStep<'a> =
| Emit of 'a * HopacSeq<'a>
| Halt
module HopacSeq =
let empty<'a> : HopacSeq<'a> =
Halt |> Job.result
let emitOne a : HopacSeq<'a> =
Emit (a,empty) |> Job.result
let ofJob (j:Job<'a>) : HopacSeq<'a> =
j |>> fun a -> Emit (a, empty)
let rec map (f:'a -> 'b) (s:HopacSeq<'a>) : HopacSeq<'b> =
s |>> function
| Halt -> Halt
| Emit (a,tail) -> Emit (f a, map f tail)
let rec mapJob (f:'a -> Job<'b>) (s:HopacSeq<'a>) : HopacSeq<'b> =
s >>= function
| Halt -> empty
| Emit (a,tail) ->
f a |>> fun b -> Emit (b, mapJob f tail)
let mapi (f:int -> 'a -> 'b) (s:HopacSeq<'a>) : HopacSeq<'b> =
let rec go i a =
a |>> function
| Halt -> Halt
| Emit (a,tail) -> Emit (f i a, go (i + 1) tail)
go 0 s
let indexed (s:HopacSeq<'a>) : HopacSeq<int * 'a> =
mapi (fun i a -> i,a) s
let rec append (s1:HopacSeq<'a>) (s2:HopacSeq<'a>) : HopacSeq<'a> =
s1 >>= function
| Halt -> s2
| Emit (a,tail) -> Emit (a, append tail s2) |> Job.result
let rec collectJob (f:'a -> HopacSeq<'b>) (s:HopacSeq<'a>) : HopacSeq<'b> =
s >>= function
| Halt -> empty
| Emit (a,tail) -> append (f a) (collectJob f tail)
let rec scanJob (f:'b -> 'a -> Job<'b>) (b:'b) (s:HopacSeq<'a>) : HopacSeq<'b> =
s >>= function
| Halt -> empty
| Emit (a,tail) -> f b a |>> fun b -> Emit (b, scanJob f b tail)
let rec foldJob (f:'b -> 'a -> Job<'b>) (b:'b) (s:HopacSeq<'a>) : Job<'b> =
s >>= function
| Halt -> b |> Job.result
| Emit (a,tail) -> f b a >>= fun b -> foldJob f b tail
let rec iterJob (f:'a -> Job<unit>) (s:HopacSeq<'a>) : Job<unit> =
s >>= function
| Halt -> Job.unit()
| Emit (a,tail) -> f a >>. iterJob f tail
let rec iter (f:'a -> unit) (s:HopacSeq<'a>) : Job<unit> =
s >>= function
| Halt -> Job.unit()
| Emit (a,tail) -> f a ; iter f tail
let first (s:HopacSeq<'a>) : Job<'a option> =
s |>> function
| Halt -> None
| Emit (a,_) -> Some a
let last (s:HopacSeq<'a>) : Job<'a option> =
let rec loop last s =
s >>= function
| Halt -> last |> Job.result
| Emit (a,tail) -> loop (Some a) tail
loop None s
let rec filterJob (f:'a -> Job<bool>) (s:HopacSeq<'a>) : HopacSeq<'a> =
s >>= function
| Halt -> Halt |> Job.result
| Emit (a,tail) ->
f a >>= fun b ->
if b then Emit (a,filterJob f tail) |> Job.result
else filterJob f tail
let rec chooseJob (f:'a -> Job<'b option>) (s:HopacSeq<'a>) : HopacSeq<'b> =
s >>= function
| Halt -> Halt |> Job.result
| Emit (a,tail) ->
f a >>= function
| Some b -> Emit (b, chooseJob f tail) |> Job.result
| None -> chooseJob f tail
let rec skip (count:int) (s:HopacSeq<'a>) : HopacSeq<'a> =
if count = 0 then s
else
s >>= function
| Halt -> Halt |> Job.result
| Emit (_,tail) -> skip (count - 1) tail
let rec skipWhileJob (p:'a -> Job<bool>) (s:HopacSeq<'a>) : HopacSeq<'a> =
s >>= function
| Halt -> empty
| Emit (a,tail) as s ->
p a >>= fun b ->
if b then skipWhileJob p tail
else s |> Job.result
let rec take (count:int) (s:HopacSeq<'a>) : HopacSeq<'a> =
if count = 0 then empty
else
s |>> function
| Halt -> Halt
| Emit (a,tail) ->
Emit (a, take (count - 1) tail)
let rec takeWhileJob (p:'a -> Job<bool>) (s:HopacSeq<'a>) : HopacSeq<'a> =
s >>= function
| Halt -> empty
| Emit (a,tail) ->
p a |>> fun b ->
if b then Emit (a, takeWhileJob p tail)
else Halt
let rec unfoldJob (f:'s -> Job<('a * 's) option>) (s:'s) : HopacSeq<'a> =
f s |>> function
| Some (a,s) -> Emit (a, unfoldJob f s)
| None -> Halt
let buffer (size:int) (s:HopacSeq<'a>) : HopacSeq<'a list> =
let rec loop (buffer:'a list) (s:HopacSeq<'a>) =
if (buffer.Length = size) then Emit (buffer |> List.rev, loop [] s) |> Job.result
else
s >>= function
| Halt ->
if buffer.Length > 0 then Emit (buffer |> List.rev, empty) |> Job.result
else empty
| Emit (a,tail) ->
loop (a::buffer) tail
loop [] s
let rec ofSeq (s:seq<'a>) : HopacSeq<'a> =
Job.using (s.GetEnumerator()) <| fun enum ->
let rec loop() =
if enum.MoveNext() then Emit (enum.Current, loop()) |> Job.result
else empty
loop()
// what is optimal way to perform non-deterministic merge?
let rec ofMVar (mv:MVar<'a option>) : HopacSeq<'a> =
MVar.take mv
|>> function
| Some a -> Emit (a, ofMVar mv)
| None -> Halt
let toMVar (s:HopacSeq<'a>) : Job<MVar<'a option>> =
MVar.create() >>= fun mv ->
let rec loop (a:HopacSeq<'a>) =
a >>= function
| Emit (a,tail) -> MVar.fill mv (Some a) >>. loop tail
| Halt -> MVar.fill mv None
Job.start (loop s) >>% mv
let private mergeMVar (mv1:MVar<'a>) (mv2:MVar<'a>) : Job<MVar<'a>> =
MVar.create() >>= fun mvOut ->
let go mv = Job.delay <| fun() ->
MVar.take mv >>= MVar.fill mvOut
Job.foreverServer (go mv1) >>. Job.foreverServer (go mv2) >>% mvOut
let rec merge (a:HopacSeq<'a>) (b:HopacSeq<'a>) : HopacSeq<'a> =
toMVar a >>= fun a -> toMVar b >>= mergeMVar a >>= ofMVar
@polytypic
Copy link

I think that PAC sequences are very tricky to get right, because there are so many semantic choices. Could you tell more about the way these sequences should behave? Should they be memoized, ephemeral, lazy, eager, bounded, unbounded, parallel, ...? It may also be possible to want to have different combinators behave differently (e.g. map lazy & merge eager or map eager & merge lazy or ...).

Note that a Job<'x> value is much like a Thunk<'x> = unit -> 'x value. So, most of the combinators above basically produce a kind of lazy, ephemeral sequences. That may be exactly what is wanted, but I know I've personally gotten these things wrong many times.

@eulerfx
Copy link
Author

eulerfx commented Nov 16, 2014

My only requirements are that the sequence is lazy and bounded. The use case is reading from a stream database such as Kafka or EventStore. The underlying API allows the reading of a slice of events of a specific size, from a specific offset.

The way I understand it, the sequence isn't memoized is due to Thunk<'x> and this is what seems to get in the way of using selective communication to do the merge - if one of the inputs is selected, the other one may still produce a value, which should be emitted directly to the output sequence as opposed to being ignored.

If I were to use an IVar for each "step" of the sequence much like EagerSeq, would that be better suited to merging using selective communication?

A way to get around the merge issue completely is to use a push-based model:

// observer
type RxObs<'a> = 'a option -> Job<unit>

// observable (subscription function)
type Rx<'a> = RxObs<'a> -> Job<unit>

let merge (rx1:Rx<'a>) (rx2:Rx<'a>) : Rx<'a> = 
  fun (obs:RxObs<'a>) ->
    rx1 obs <*> rx2 obs >>% ()

Full source: https://gist.github.com/eulerfx/2bcfd5e0ef67d4ae330e

@polytypic
Copy link

Here is one kind of merge:

let rec merge (a:HopacSeq<'a>) (b:HopacSeq<'a>) : HopacSeq<'a> =
  Promise.startAsAlt a >>= fun aP ->
  Promise.startAsAlt b >>= fun bP ->
  mergeAB aP bP
and mergeAB aP bP =
  (aP >>=? function
    | Halt -> upcast bP
    | Emit (x, a) -> Job.result (Emit (x, mergeB a bP))) <|>
  (bP >>=? function
    | Halt -> upcast aP
    | Emit (x, b) -> Job.result (Emit (x, mergeA aP b)))
and mergeB a bP =
  Promise.startAsAlt a >>= fun aP -> mergeAB aP bP
and mergeA aP b =
  Promise.startAsAlt b >>= fun bP -> mergeAB aP bP

The idea here is that a merged seq, once demanded, starts evaluating both of the sequences in parallel as promises. The one that becomes ready first is then produced. In this version, the other promise is still kept and reused in the next merge so that the elements are evaluated only once.

Note that a Hopac job does not necessarily behave non-deterministically. Hopac jobs are run cooperatively and there is a large subset of Hopac that basically behaves deterministically.

Consider changing Promise.startAsAlt to Promise.queueAsAlt.

@polytypic
Copy link

Regarding use of IVars. They would work. IVars can do everything that Promises can and more, but require more intricate programming. I think in this case Promises (I should have named them Futures originally) lead to simpler code—assuming the above merge is basically what you were looking for?

@eulerfx
Copy link
Author

eulerfx commented Nov 16, 2014

Ahh ok, I was missing the Promise.startAsAlt and Promise.queueAsAlt, thanks.

What I meant by non-deterministically is that its uncertain which of the two input sequences will produce a value first.

Also, just so I understand this properly, in mergeAB when either aP or bP is selected via <|>, the result of the other is memoized because we're using a Promise?

@polytypic
Copy link

Basically yes. I'd say that the other result is just not thrown away. While the merge is being run lazily, each started promise is kept around (not thrown away) until the promise becomes determined.

@eulerfx
Copy link
Author

eulerfx commented Nov 16, 2014

Thanks! I now have much clearer understanding of the similarities and differences between the Hopac/CML model and F# async.

@polytypic
Copy link

No problem! Actually, thinking about this, I think I should reintroduce lazy promises, which make it convenient to memoize jobs. You could then easily write a memoize: HopacSeq<'x> -> HopacSeq<'x> function. Looking at the old code for lazy promises, it seems like I had a performance bug (a totally unnecessary allocation) in the old implementation...

@polytypic
Copy link

FYI, I reintroduced lazy promises to Hopac. I haven't yet released binaries. I will likely not do that in a couple of days. With lazy promises you could implement memoize as follows:

let rec memoize (xs: HopacSeq<'x>) =
  (xs |>> function
      | Halt -> Halt
      | Emit (x, xs) -> Emit (x, memoize xs))
  |> Promise.Now.delay :> HopacSeq<_>

Also, here is a bit more concise version of merge:

let rec mergeEither aP bP =
  mergeFirst aP bP <|> mergeFirst bP aP
and mergeFirst aP bP =
  (aP >>=? function
    | Halt -> upcast bP
    | Emit (x, a) ->
      Job.result (Emit (x, mergeEither bP (Promise.Now.delay a))))
let merge (a: HopacSeq<'a>) (b: HopacSeq<'a>) : HopacSeq<'a> =
  mergeEither (Promise.Now.delay a) (Promise.Now.delay b)

Look at the last line of the mergeFirst function above. This version intentionally swaps the a and b streams so that the stream that didn't manage to produce a value is given "priority" (evaluated on the left hand side of <|>) on the next round.

@polytypic
Copy link

One more thing. I haven't looked carefully through all of the HopacSeq combinators, but I happened to notice that the ofSeq combinator

  let rec ofSeq (s:seq<'a>) : HopacSeq<'a> =
    Job.using (s.GetEnumerator()) <| fun enum ->
      let rec loop() =
        if enum.MoveNext() then Emit (enum.Current, loop()) |> Job.result
        else empty
      loop()

doesn't quite work correctly. What happens is that the enum is disposed once the first result is produced.

@polytypic
Copy link

Actually your ofSeq work correctly as it goes through the sequence eagerly. Sorry about the false alarm!

Here is a lazy version:

  type Disposer<'x when 'x :> IDisposable> =
    val Value: 'x
    interface IDisposable with
      override this.Dispose () =
        this.Value.Dispose ()
        GC.SuppressFinalize this
    override this.Finalize () =
      this.Value.Dispose ()
    new (value: 'x) = {Value = value}

  let ofSeq (s:seq<'a>) : HopacSeq<'a> = Job.delay <| fun () ->
    let enum = new Disposer<_>(s.GetEnumerator ())
    let rec loop () = Job.thunk <| fun () ->
      if enum.Value.MoveNext () then
        Emit (enum.Value.Current, loop ())
      else
        (enum :> IDisposable).Dispose ()
        Halt
    loop ()

@polytypic
Copy link

Heh... In fact, my lazy version has a bug. Here is a correct version:

  let ofSeq (s:seq<'a>) : HopacSeq<'a> = Job.delay <| fun () ->
    let enum = new Disposer<_>(s.GetEnumerator ())
    let rec loop () = Promise.Now.delayAsJob << Job.thunk <| fun () ->
      if enum.Value.MoveNext () then
        Emit (enum.Value.Current, loop ())
      else
        (enum :> IDisposable).Dispose ()
        Halt
    loop ()

The crucial difference being is that now it is made sure that the enumerator is used sequentially.

At any rate, thanks for asking about these sequences! Inspired to make improvements to Hopac!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment