Skip to content

Instantly share code, notes, and snippets.

@eulerfx eulerfx/EvetStore.fs
Last active Feb 20, 2016

Embed
What would you like to do?
EventStore pull-to-push subscription transition
let subscribeAsAsyncSeq (conn:IEventStoreConnection) (stream:string) (resolveLinks:bool) (bufferSize:int) (ct:CancellationToken) = asyncSeq {
use buffer = new System.Collections.Concurrent.BlockingCollection<ResolvedEvent>(bufferSize)
let inline onEvent subs (evt:ResolvedEvent) =
buffer.Add(evt)
let inline onDrop (subs:EventStoreSubscription) (reason:SubscriptionDropReason) (ex:exn) =
printfn "SUBSCRIPTION DROPPED! last position=%i reason=%O ex=%O" (subs.LastEventNumber.GetValueOrDefault()) reason ex
()
let! subs = conn.SubscribeToStreamAsync(stream, resolveLinks, Action<_,_>(onEvent), Action<_,_,_>(onDrop), null) |> Async.AwaitTask
yield! buffer.GetConsumingEnumerable(ct) |> AsyncSeq.ofSeq
}
let subscribeFromAsAsyncSeq (conn:IEventStoreConnection) (stram:string) (resolveLinks:bool) (batchSize:int) (bufferSize:int) (from:int) (ct:CancellationToken) = asyncSeq {
// reads events forward from the specified position
let inline forwardAsAsyncSeq from =
forwardAsAsyncSeq conn stream resolveLinks batchSize from |> AsyncSeq.concatSeq
// start with pull, switching to push when pull completes making sure to not miss gaps
let rec append (from:int) (pull:AsyncSeq<ResolvedEvent>) (push:AsyncSeq<ResolvedEvent>) : AsyncSeq<ResolvedEvent> = asyncSeq {
let! pull = pull
match pull with
| Nil ->
// done pulling, switching to push...
let! push = push
match push with
| Nil -> ()
| Cons (h,t) ->
if (h.Event.EventNumber - from > 1) then
// gap detected, pull missing events then continue with push
yield! append (from + 1) (forwardAsAsyncSeq (from + 1)) (async.Return push)
else
// no gap detected, continuing with push, skipping events which were already read
yield! (async.Return push) |> AsyncSeq.skipWhile (fun evt -> evt.Event.EventNumber < from)
| Cons (h,t) ->
yield h
yield! append (h.Event.EventNumber) t push }
yield! append from (forwardAsAsyncSeq from) (subscribeAsAsyncSeq conn stream resolveLinks bufferSize ct)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.