Skip to content

Instantly share code, notes, and snippets.

Lev Gorodinski eulerfx

Block or report user

Report or block eulerfx

Hide content and notifications from this user.

Learn more about blocking users

Contact Support about this user’s behavior.

Learn more about reporting abuse

Report abuse
View GitHub Profile
eulerfx /
Last active Sep 11, 2019
Discussion of the notion of commands and events within reactive system

There are some clear distinctions between commands and events but there are also some not so clear. I discuss the notion of a command as an interpretation of an event.

Commands are requests to do something, they are named in the imperative tense and capture intent. In CQRS, commands are distinguished from queries because they invoke behaviors therefore causing state changes whereas queries don't. In some frameworks, there is a distinction between commands and events in that one sends commands and publishes events. Commands are sent to a single logical handler whereas events are broadcast to potentially multiple handlers.

Events are characterized as being in the past tense. In addition to naming this imposes the constraint that events can't be rejected - you can't change the past. Furthermore, an event can be regarded as an outcome of a command - a causality relation.

There are perspectives where the distinction blurs. For example, what is the distinction between a command handler and an event handler?

eulerfx / PurchaseOrder.cs
Last active Aug 21, 2019
An example or a DDD application service.
View PurchaseOrder.cs
// A repository.
public interface IPurchaseOrderRepository
PurchaseOrder Get(string id);
// The commit method would likely be moved to a Unit of Work managed by infrastructure.
void Commit();
// A marker interface for a domain event.
public interface IDomainEvent { }
eulerfx /
Last active Jul 15, 2019
Command Sourcing vs Event Sourcing

Both command sourcing (CS) and event sourcing (ES) rely on determinism for correctness.

The correctness condition for ES is the determinism (purity) of the function State -> Event -> State. Given that this function is deterministic, aka it always maps the same inputs to the same ouputs, we can rely on it to reconstitute state at any point in time. Determinism is typically achieved by ensuring that the Event has all required information to make the state transition, ie no side effects. The Event then is a sort of "closure" of all pertinent information about the event.

The correctness condition for CS is the determinism of the function State -> Command -> Event. Herein lies one of the distinctions between command sourcing and event sourcing - a program can control its output, but it not its input. Since one can't control the input, aka command, one can't in general, enrich it with all required information to make the above function deterministic. A consequence of this is that you can't simply replay a

View Async.Race.fs
let race (a:Async<'a>) (b:Async<'a>) : Async<'a * Async<'a>> = async {
Async.FromContinuations <| fun (ok,err,cnc) ->
let state = ref 0
let iv = new TaskCompletionSource<_>()
let ok a =
if (Interlocked.CompareExchange(state, 1, 0) = 0) then
ok (a, iv.Task |> Async.AwaitTask)
iv.SetResult a
eulerfx /
Last active Mar 28, 2019
The relationship between state machines and event sourcing

A state machine is defined as follows:

  • Input - a set of inputs
  • Output - a set of outputs
  • State - a set of states
  • S0 ∈ S - an initial state
  • T : Input * State -> Output * State - a transition function

If you model your services (aggregates, projections, process managers, sagas, whatever) as state machines, one issue to address is management of State. There must be a mechanism to provide State to the state machine, and to persist resulting State for subsequent retrieval. One way to address this is by storing State is a key-value store. Another way is to use a SQL database. Yet another way is event sourcing. The benefit of even sourcing is that you never need to store State itself. Instead, you rely on the Output of a service to reconstitute state. In order to do that, the state machine transition function needs to be factored into two functions as follows:

View Async.ParallelThrottledIgnore.fs
let ParallelThrottledIgnore (startOnCallingThread:bool) (parallelism:int) (xs:seq<Async<_>>) = async {
let! ct = Async.CancellationToken
let sm = new SemaphoreSlim(parallelism)
let count = ref 1
let res = TaskCompletionSource<_>()
let tryWait () =
try sm.Wait () ; true
with _ -> false
let tryComplete () =
if Interlocked.Decrement count = 0 then
eulerfx / Async.WithCancellation.fs
Created Jun 21, 2018
F# Async Cancellation Helper
View Async.WithCancellation.fs
let withCancellation (ct:CancellationToken) (a:Async<'a>) : Async<'a> = async {
let! ct2 = Async.CancellationToken
use cts = CancellationTokenSource.CreateLinkedTokenSource (ct, ct2)
let tcs = new TaskCompletionSource<'a>()
use _reg = cts.Token.Register (fun () -> tcs.TrySetCanceled() |> ignore)
let a = async {
let! a = a
tcs.TrySetResult a |> ignore
with ex ->
View Async.Timeout.fs
let timeoutNone (timeoutMs:int) (a:Async<'a>) : Async<'a option> = async {
let! ct = Async.CancellationToken
let res = TaskCompletionSource<_>()
use cts = CancellationTokenSource.CreateLinkedTokenSource ct
res.Task.ContinueWith (fun _ -> cts.Cancel ()) |> ignore
use timer = new Timer((fun _ -> res.TrySetResult None |> ignore), null, timeoutMs, Timeout.Infinite)
Async.StartThreadPoolWithContinuations (
(fun a -> res.TrySetResult (Some a) |> ignore),
(fun e -> res.TrySetException e |> ignore),
eulerfx / Async.Cache.fs
Last active Jun 27, 2018
F# Async Cache
View Async.Cache.fs
let cache (a:Async<'a>) : Async<'a> =
let tcs = TaskCompletionSource<'a>()
let state = ref 0
async {
if (Interlocked.CompareExchange(state, 1, 0) = 0) then
(fun _ -> tcs.SetCanceled()))
eulerfx / EventStore.fs
Created May 31, 2018
Sample of polling reader of EventStore.ClientAPI in F#
View EventStore.fs
// RetryPolicy from
/// Returns an Async computation which evaluates the input computation until the specified condition is met
/// with delays between attempts dictated by the specified retry policy.
let pollUntil (rp:RetryPolicy) (condition:'a -> bool) (a:Async<'a>) : Async<'a option> =
(AsyncSeq.replicateInfiniteAsync a, RetryPolicy.delayStream rp)
||> AsyncSeq.interleaveChoice
|> AsyncSeq.tryPick (function Choice1Of2 a when condition a -> Some a | _ -> None)
You can’t perform that action at this time.