Skip to content

Instantly share code, notes, and snippets.

@kspeakman
Last active December 2, 2021 02:36
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kspeakman/0ecb30bfe8336e7b31bc71911ad22d9f to your computer and use it in GitHub Desktop.
Save kspeakman/0ecb30bfe8336e7b31bc71911ad22d9f to your computer and use it in GitHub Desktop.
UMP - MVU-style back end workflow
module Emailer
type Model =
{ SendLimitPerSecond: int
Today: DateTime
ToSend: Email list list }
type Effect =
| FindDueEmails of DateTime
| ScheduleSend of DateTimeOffset
| SendEmail of Email
type Msg =
| Started
| DueEmails of Email list * now: DateTimeOffset
| TimeToSend of DateTimeOffset
| Failed
let update model msg =
let send now model =
match model.ToSend with
| [] ->
Ok model, [] // Ok = success, no new effects = done
| batch :: remaining ->
let nextSend = now.AddSeconds(1.0)
Ok { model with ToSend = remaining }
, [ for email in batch do
yield SendEmail email
yield ScheduleSend nextSend ]
match msg with
| Started ->
Ok model, [FindDueEmails model.Today]
| DueEmails (emails, now) ->
let batches = List.chunkBySize model.SendLimitPerSecond emails
{ model with Todo = batches }
|> send now
| TimeToSend now ->
send now model
// exit on error
| SendFailed ->
Error (), [] // Error = failure, no new effects = done
let perform config effect =
match effect with
| FindDueEmails today ->
async {
try
let! emails = Db.getDueEmails config.DbConnectString today
let now = DateTimeOffset.Now
return Some (DueItems (emails, now))
with ex ->
config.Logger.LogCritical("Could not load due items {@Ex}", ex)
return Some Failed
}
| ScheduleSend nextSend ->
let span = nextSend - DateTimeOffset.Now
let sleepTime = Math.Ceiling(span.TotalMilliseconds) |> int
async {
do! Async.Sleep sleepTime
return Some (TimeToSend DateTimeOffset.Now)
}
| SendEmail email ->
async {
try
do! Emailer.send config.EmailSettings email
do! Db.markEmailSent config.DbConnectString email.EmailId
return None
with ex ->
config.Logger.LogCritical("Failed to send {@Ex}", ex)
return Some Failed
}
let init (sendLimit, today) =
{ SendLimitPerSecond = sendLimit
Today = today
ToSend = List.empty }
, Started
// test subject
let ump = { Init = Emailer.init
Update = Emailer.update
Perform = fun _ -> async.Return None
Output = ignore }
// test data
let email1 = ...
let email2 = ...
let today = new DateTime("2001-01-01")
let now = new DateTimeOffset("2001-01-01 05:00:00")
let next = new DateTimeOffset("2001-01-01 05:00:01")
let initArg = (1, today)
// example test
let expected = [SendEmail email1; ScheduleSend next]
let _, actual = Ump.test ump initArg [DueItems ([email1; email2], now)]
Assert.IsTrue(expected = actual)
// Update-Model-Perform
type Ump<'initArg, 'Model, 'Msg, 'Effect, 'output> = {
/// Constructs an initial model and starting message.
/// For multiple initial arguments, use a tuple or record.
Init: 'initArg -> 'Model * 'Msg
/// Updates the model and requests effects in response to messages.
Update: 'Model -> 'Msg -> 'Model * 'Effect list
/// Executes the requested side effect.
/// The current model is provided for consultation.
/// The optional returned Async will be run and its message passed to Update.
/// This is the only function that should perform side effects.
Perform: 'Model -> 'Effect -> Async<'Msg> option
/// Converts the model to the output type.
Output: 'Model -> 'output
}
module Ump =
/// Test a UMP workflow.
/// Provide the initial argument and messages (side effect results) to replay.
/// Returns the final output and all generated side effects.
/// Note: The start message will be run before the provided messages.
let test (ump: Ump<'initArg, 'Model, 'Msg, 'Effect, 'output>)
(initArg: 'initArg)
(msgs: 'Msg list)
: 'output * 'Effect list =
let (model, msg) = ump.Init initArg
let init = (model, [])
let update (model, effects) msg =
let (model, newEffects) = ump.Update model msg
(model, List.append effects newEffects)
let (model, effects) = List.fold update init (msg :: msgs)
(ump.Output model, effects)
[<AutoOpen>]
module Internal =
[<Struct>]
type RunState<'Model, 'Msg, 'Effect> = {
Model: 'Model
Msgs: 'Msg list
Effects: 'Effect list
}
let rec runLoop ump state =
// process all msgs
let model, effects =
match state.Msgs with
| [] -> (state.Model, state.Effects)
| _ ->
let init = (state.Model, [])
let update (model, effects) msg =
let (model, newEffects) = ump.Update model msg
(model, List.append effects newEffects)
List.fold update init state.Msgs
match effects with
| [] -> async.Return (ump.Output model)
| _ ->
// run all effects in parallel
// When sequencing matters Ump.Update will return
// one Effect at a time for control flow anyway
async {
let msgsAsyncOpt = List.map (ump.Perform model) effects
let! msgArr = msgsAsyncOpt |> List.choose id |> Async.Parallel
let state = {
Model = model
Msgs = List.ofArray msgArr
Effects = []
}
return! runLoop ump state
}
/// Run a UMP from an initial argument.
/// You can create a reusable run function by partially applying the UMP argument.
/// Infinite loops are possible when Update generates Effects on every Msg.
/// This allows the UMP to support interactive applications.
let run (ump: Ump<'initArg, 'Model, 'Msg, 'Effect, 'output>)
(arg: 'initArg)
: Async<'output> =
let (model, msg) = ump.Init arg
let state = { Model = model; Msgs = [ msg ]; Effects = [] }
runLoop ump state
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment