Skip to content

Instantly share code, notes, and snippets.

@eulerfx
Created May 31, 2018 19:37
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save eulerfx/73aa3c2aa39cc2324e82a0539a84debc to your computer and use it in GitHub Desktop.
Save eulerfx/73aa3c2aa39cc2324e82a0539a84debc to your computer and use it in GitHub Desktop.
Sample of polling reader of EventStore.ClientAPI in F#
// https://github.com/fsprojects/FSharp.Control.AsyncSeq
// RetryPolicy from https://github.com/jet/kafunk/blob/master/src/kafunk/Utility/Faults.fs#L89
/// Returns an Async computation which evaluates the input computation until the specified condition is met
/// with delays between attempts dictated by the specified retry policy.
let pollUntil (rp:RetryPolicy) (condition:'a -> bool) (a:Async<'a>) : Async<'a option> =
(AsyncSeq.replicateInfiniteAsync a, RetryPolicy.delayStream rp)
||> AsyncSeq.interleaveChoice
|> AsyncSeq.tryPick (function Choice1Of2 a when condition a -> Some a | _ -> None)
// EventStore client
type Conn = string
type Position = int64
type Event = string
let readBatch (c:Conn) (p:Position) : Async<Event[] * Position> = failwith "TODO"
let stream (c:Conn) (p:Position) (rp:RetryPolicy) : AsyncSeq<Event[]> =
p |> AsyncSeq.unfoldAsync (readBatch c >> pollUntil rp (fun (es,_) -> es.Length > 0))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment