Skip to content

Instantly share code, notes, and snippets.

@eulerfx
Last active February 20, 2016 13:07
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save eulerfx/a4a29502f673f13b6a23 to your computer and use it in GitHub Desktop.
Save eulerfx/a4a29502f673f13b6a23 to your computer and use it in GitHub Desktop.
EventStore pull-to-push subscription transition
let subscribeAsAsyncSeq (conn:IEventStoreConnection) (stream:string) (resolveLinks:bool) (bufferSize:int) (ct:CancellationToken) = asyncSeq {
use buffer = new System.Collections.Concurrent.BlockingCollection<ResolvedEvent>(bufferSize)
let inline onEvent subs (evt:ResolvedEvent) =
buffer.Add(evt)
let inline onDrop (subs:EventStoreSubscription) (reason:SubscriptionDropReason) (ex:exn) =
printfn "SUBSCRIPTION DROPPED! last position=%i reason=%O ex=%O" (subs.LastEventNumber.GetValueOrDefault()) reason ex
()
let! subs = conn.SubscribeToStreamAsync(stream, resolveLinks, Action<_,_>(onEvent), Action<_,_,_>(onDrop), null) |> Async.AwaitTask
yield! buffer.GetConsumingEnumerable(ct) |> AsyncSeq.ofSeq
}
let subscribeFromAsAsyncSeq (conn:IEventStoreConnection) (stram:string) (resolveLinks:bool) (batchSize:int) (bufferSize:int) (from:int) (ct:CancellationToken) = asyncSeq {
// reads events forward from the specified position
let inline forwardAsAsyncSeq from =
forwardAsAsyncSeq conn stream resolveLinks batchSize from |> AsyncSeq.concatSeq
// start with pull, switching to push when pull completes making sure to not miss gaps
let rec append (from:int) (pull:AsyncSeq<ResolvedEvent>) (push:AsyncSeq<ResolvedEvent>) : AsyncSeq<ResolvedEvent> = asyncSeq {
let! pull = pull
match pull with
| Nil ->
// done pulling, switching to push...
let! push = push
match push with
| Nil -> ()
| Cons (h,t) ->
if (h.Event.EventNumber - from > 1) then
// gap detected, pull missing events then continue with push
yield! append (from + 1) (forwardAsAsyncSeq (from + 1)) (async.Return push)
else
// no gap detected, continuing with push, skipping events which were already read
yield! (async.Return push) |> AsyncSeq.skipWhile (fun evt -> evt.Event.EventNumber < from)
| Cons (h,t) ->
yield h
yield! append (h.Event.EventNumber) t push }
yield! append from (forwardAsAsyncSeq from) (subscribeAsAsyncSeq conn stream resolveLinks bufferSize ct)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment