Skip to content

Instantly share code, notes, and snippets.

@eulerfx eulerfx/EvetStore.fs
Last active Feb 20, 2016

What would you like to do?
EventStore pull-to-push subscription transition
let subscribeAsAsyncSeq (conn:IEventStoreConnection) (stream:string) (resolveLinks:bool) (bufferSize:int) (ct:CancellationToken) = asyncSeq {
use buffer = new System.Collections.Concurrent.BlockingCollection<ResolvedEvent>(bufferSize)
let inline onEvent subs (evt:ResolvedEvent) =
let inline onDrop (subs:EventStoreSubscription) (reason:SubscriptionDropReason) (ex:exn) =
printfn "SUBSCRIPTION DROPPED! last position=%i reason=%O ex=%O" (subs.LastEventNumber.GetValueOrDefault()) reason ex
let! subs = conn.SubscribeToStreamAsync(stream, resolveLinks, Action<_,_>(onEvent), Action<_,_,_>(onDrop), null) |> Async.AwaitTask
yield! buffer.GetConsumingEnumerable(ct) |> AsyncSeq.ofSeq
let subscribeFromAsAsyncSeq (conn:IEventStoreConnection) (stram:string) (resolveLinks:bool) (batchSize:int) (bufferSize:int) (from:int) (ct:CancellationToken) = asyncSeq {
// reads events forward from the specified position
let inline forwardAsAsyncSeq from =
forwardAsAsyncSeq conn stream resolveLinks batchSize from |> AsyncSeq.concatSeq
// start with pull, switching to push when pull completes making sure to not miss gaps
let rec append (from:int) (pull:AsyncSeq<ResolvedEvent>) (push:AsyncSeq<ResolvedEvent>) : AsyncSeq<ResolvedEvent> = asyncSeq {
let! pull = pull
match pull with
| Nil ->
// done pulling, switching to push...
let! push = push
match push with
| Nil -> ()
| Cons (h,t) ->
if (h.Event.EventNumber - from > 1) then
// gap detected, pull missing events then continue with push
yield! append (from + 1) (forwardAsAsyncSeq (from + 1)) (async.Return push)
// no gap detected, continuing with push, skipping events which were already read
yield! (async.Return push) |> AsyncSeq.skipWhile (fun evt -> evt.Event.EventNumber < from)
| Cons (h,t) ->
yield h
yield! append (h.Event.EventNumber) t push }
yield! append from (forwardAsAsyncSeq from) (subscribeAsAsyncSeq conn stream resolveLinks bufferSize ct)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.