Skip to content

Instantly share code, notes, and snippets.

@mndrake
Created January 15, 2014 17:45
Show Gist options
  • Save mndrake/8440854 to your computer and use it in GitHub Desktop.
Save mndrake/8440854 to your computer and use it in GitHub Desktop.
Agent based calculation engine
open System
open System.Collections.Generic
open System.Collections.ObjectModel
open System.ComponentModel
open System.Threading
open Microsoft.FSharp.Control
open Microsoft.FSharp.Reflection
type Value =
| Dirty
| Processing
| Valid of obj
type Message =
| Changed of string * Value
| Eval of AsyncReplyChannel<Value>
| Processed
| AutoCalculation of bool
//#region helpers
/// checks if value is a F# Tuple type
let isTuple value =
match box value with
| null -> false
| _ -> Microsoft.FSharp.Reflection.FSharpType.IsTuple(value.GetType())
/// basic message logger for calculation nodes
type Log() =
let messages = List<_>()
let agent =
MailboxProcessor.Start(fun inbox ->
let rec loop() = async {
let! msg = inbox.Receive()
match msg with
|(name, state, message) ->
messages.Add(name, sprintf "STATE: %s MESSAGE: %A" state message)
return! loop()
}
loop())
member this.Post(name, state,message) = agent.Post(name,state,message)
member this.Get(name) =
messages |> Seq.filter(fun (n,_) -> name = n)
|> Seq.map(fun (_,m) -> m)
|> Seq.iter (printfn "%s")
member this.Get() =
messages |> Seq.iter (fun (name, message) -> printfn "NODE: %s %s" name message)
//#endregion
type INode =
abstract Eval : unit -> Async<Value>
abstract Name : string with get
abstract OnChanged : (string * Value -> unit) -> unit
abstract Value : obj with get
type CalculationHandler() =
let changed = Event<unit>()
let automatic = ref false
member this.Automatic
with get () = !automatic
and set v =
automatic := v
changed.Trigger()
member this.Changed = changed.Publish
type Input<'U>(name, log : Log, calc : CalculationHandler, initialValue : 'U) =
let actions = List<_>()
let value = ref initialValue
let agent =
MailboxProcessor.Start(fun inbox ->
let rec valid() =
async {
let! msg = inbox.Receive()
log.Post(name, "valid", msg)
match msg with
| Eval r -> r.Reply(Valid !value)
| Processed -> for action in actions do action(name, Valid !value)
| _ -> ()
return! valid()
}
valid())
member this.SetValue v =
value := v
agent.Post(Processed)
member this.Eval() = agent.PostAndAsyncReply(fun r -> Eval r)
member this.Name = name
member this.OnChanged(action) = actions.Add(action)
member this.Value = box !value
interface INode with
member this.Eval() = agent.PostAndAsyncReply(fun r -> Eval r)
member this.Name = name
member this.OnChanged(action) = actions.Add(action)
member this.Value = box !value
type Output<'N, 'T, 'U>(name, log : Log, calc : CalculationHandler, nodeInputs : 'N, nodeFunction : 'T -> 'U) =
// convert tuple to object array
let nodes =
if isTuple nodeInputs then FSharpValue.GetTupleFields(nodeInputs) |> Array.map(fun x -> x :?> INode)
else [| (box nodeInputs) :?> INode |]
let func =
if isTuple nodeInputs then fun p -> (FSharpValue.MakeTuple(p, typeof<'T>) :?> 'T) |> nodeFunction
else fun p -> (p.[0] :?> 'T) |> nodeFunction
let value = ref(Unchecked.defaultof<'U>)
let actions = List<_>()
let agent =
MailboxProcessor.Start(fun inbox ->
let rec calculate() = async {
let! nodeValues = Async.Parallel(nodes |> Seq.map(fun n -> n.Eval()))
if (nodeValues |> Array.forall(function | Valid _ -> true | _ -> false)) then
let values = nodeValues |> Array.map(function | Valid v -> v | _ -> null)
async { value := func values
inbox.Post(Processed)
} |> Async.Start
return! processing()
}
and valid() = async {
let! msg = inbox.Receive()
log.Post(name, "valid", msg)
match msg with
| Changed (n,v) ->
if not calc.Automatic then
for action in actions do action(name,Dirty)
return! dirty()
else
for action in actions do action(name,Processing)
return! calculate()
| Eval r ->
r.Reply(Valid !value)
return! valid()
| _ ->
return! valid()
}
and processing() = async {
let! msg = inbox.Receive()
log.Post(name, "processing", msg)
match msg with
| Changed(nodeName, value) ->
return! calculate()
| Eval r ->
r.Reply(Processing)
return! processing()
| Processed ->
for action in actions do action(name,Valid !value)
return! valid()
| AutoCalculation _ ->
return! processing()
}
and dirty() =
async {
let! msg = inbox.Receive()
log.Post(name, "dirty", msg)
match msg with
| Changed(nodeName, value) ->
return! dirty()
| Eval r ->
r.Reply(Processing)
return! calculate()
| AutoCalculation true ->
return! calculate()
| _ -> return! dirty()
}
// initial state
if calc.Automatic then
calculate()
else
dirty())
do
nodes |> Seq.iter(fun n -> n.OnChanged(fun args -> agent.Post(Changed args)))
calc.Changed.Add(fun () -> agent.Post(AutoCalculation(calc.Automatic)))
member this.Eval() = agent.PostAndAsyncReply(fun r -> Eval r)
member this.Name = name
member this.OnChanged(action) = actions.Add(action)
member this.Value = box !value
interface INode with
member this.Eval() = agent.PostAndAsyncReply(fun r -> Eval r)
member this.Name = name
member this.OnChanged(action) = actions.Add(action)
member this.Value = box !value
// ---------------------------------------------------------------------------------
// example
let log = Log()
let calc = CalculationHandler()
calc.Automatic <- false
let input name v = Input(name, log, calc, v)
let addNode name nodes =
Output(name, log, calc, nodes,
(fun (x, y) ->
log.Post(name, sprintf "eval %s, thread %i" name Thread.CurrentThread.ManagedThreadId, Processed)
Thread.Sleep 1000
x + y))
let evalAsync(node : INode) = async { node.Eval() |> ignore } |> Async.Start
let i1 = input "i1" 1
let i2 = input "i2" 3
let i3 = input "i3" 5
let n1 = addNode "n1" (i1,i2)
let n2 = addNode "n2" (i2,i3)
let n3 = addNode "n3" (n1,n2)
// run the following a line at a time
// enable automatic calculations
// calc.Automatic <- true
// print the messages log
// log.Get()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment