Skip to content

Instantly share code, notes, and snippets.

@eulerfx
Last active August 29, 2015 14:03
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save eulerfx/3e7ced2423d3297c4e8d to your computer and use it in GitHub Desktop.
Save eulerfx/3e7ced2423d3297c4e8d to your computer and use it in GitHub Desktop.
F# EventStore API
type Stream = string
type EventType = string
type ExpectedVersion = int
type EventData = byte[]
type EventMetadata = byte[]
type ResolveLinks = bool
type From = int
type BatchSize = int
type BufferSize = int
type CheckpointStore = (unit -> Async<int option>) * (int -> Async<unit>)
module EventStore =
val conn : IPEndPoint -> IEventStoreConnection
val connHost : string -> IEventStoreConnection
val append : IEventStoreConnection -> Stream -> EventType -> ExpectedVersion -> EventData -> EventMetadata -> Async<unit>
val appendAll : IEventStoreConnection -> Stream -> ExpectedVersion -> seq<EventData> -> Async<unit>
val readOne : IEventStoreConnection -> Stream -> From -> ResolveLinks -> Async<ResolvedEvent option>
val readAll : IEventStoreConnection -> Stream -> ResolveLinks -> From -> BatchSize -> AsyncSeq<StreamEventsSlice>
val observe : IEventStoreConnection -> Stream -> ResolveLinks -> IObservable<ResolvedEvent>
val subscribeFrom : IEventStoreConnection -> Stream -> ResolveLinks -> BatchSize -> BufferSize -> From -> AsyncSeq<ResolvedEvent>
val subscribeFromBuffered : IEventStoreConnection -> Stream -> ResolveLinks -> BatchSize -> BufferSize -> TimeSpan -> From -> AsyncSeq<ResolvedEvent[]>
val subscribeWithCheckpointsBuffered : IEventStoreConnection -> Stream -> ResolveLinks -> BatchSize -> BufferSize -> TimeSpan -> CheckpointStore -> AsyncSeq<ResolvedEvent[]>
module AsyncSeq =
/// Iterates the async sequence and starts an action for each element in parallel.
let iterAsyncThrottled (parallelism:int) (f:'a -> Async<unit>) (s:AsyncSeq<'a>) : Async<unit> = async {
let cde = new CountdownEvent(1)
let sem = new SemaphoreSlim(parallelism)
let inline release() =
sem.Release() |> ignore
cde.Signal() |> ignore
let inline cont() = release()
let inline exCont ex = release()
let inline cnCont ex = release()
try
s |> toBlockingSeq |> Seq.iter (fun item ->
sem.Wait()
cde.AddCount(1)
Async.StartWithContinuations(f item, cont, exCont, cnCont)
)
cde.Signal() |> ignore
cde.Wait()
finally
cde.Dispose()
sem.Dispose() }
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment