Last active
February 20, 2016 13:07
-
-
Save eulerfx/a4a29502f673f13b6a23 to your computer and use it in GitHub Desktop.
EventStore pull-to-push subscription transition
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
let subscribeAsAsyncSeq (conn:IEventStoreConnection) (stream:string) (resolveLinks:bool) (bufferSize:int) (ct:CancellationToken) = asyncSeq { | |
use buffer = new System.Collections.Concurrent.BlockingCollection<ResolvedEvent>(bufferSize) | |
let inline onEvent subs (evt:ResolvedEvent) = | |
buffer.Add(evt) | |
let inline onDrop (subs:EventStoreSubscription) (reason:SubscriptionDropReason) (ex:exn) = | |
printfn "SUBSCRIPTION DROPPED! last position=%i reason=%O ex=%O" (subs.LastEventNumber.GetValueOrDefault()) reason ex | |
() | |
let! subs = conn.SubscribeToStreamAsync(stream, resolveLinks, Action<_,_>(onEvent), Action<_,_,_>(onDrop), null) |> Async.AwaitTask | |
yield! buffer.GetConsumingEnumerable(ct) |> AsyncSeq.ofSeq | |
} | |
let subscribeFromAsAsyncSeq (conn:IEventStoreConnection) (stram:string) (resolveLinks:bool) (batchSize:int) (bufferSize:int) (from:int) (ct:CancellationToken) = asyncSeq { | |
// reads events forward from the specified position | |
let inline forwardAsAsyncSeq from = | |
forwardAsAsyncSeq conn stream resolveLinks batchSize from |> AsyncSeq.concatSeq | |
// start with pull, switching to push when pull completes making sure to not miss gaps | |
let rec append (from:int) (pull:AsyncSeq<ResolvedEvent>) (push:AsyncSeq<ResolvedEvent>) : AsyncSeq<ResolvedEvent> = asyncSeq { | |
let! pull = pull | |
match pull with | |
| Nil -> | |
// done pulling, switching to push... | |
let! push = push | |
match push with | |
| Nil -> () | |
| Cons (h,t) -> | |
if (h.Event.EventNumber - from > 1) then | |
// gap detected, pull missing events then continue with push | |
yield! append (from + 1) (forwardAsAsyncSeq (from + 1)) (async.Return push) | |
else | |
// no gap detected, continuing with push, skipping events which were already read | |
yield! (async.Return push) |> AsyncSeq.skipWhile (fun evt -> evt.Event.EventNumber < from) | |
| Cons (h,t) -> | |
yield h | |
yield! append (h.Event.EventNumber) t push } | |
yield! append from (forwardAsAsyncSeq from) (subscribeAsAsyncSeq conn stream resolveLinks bufferSize ct) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment