Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
The actor is totally defined in the system1 and shares no code with system2 execpt the generic FsAkka.fs. The Message type is shared for now, but it can be changed. FsPickler is used for serialization. All messages are handled in system2 process !
module FsAkka
open Akka.Actor
open Microsoft.FSharp.Quotations
open Microsoft.FSharp.Linq.QuotationEvaluation
type IO<'msg> = | Input
type Cont<'m,'v> =
| Func of ('m -> Cont<'m,'v>)
| Return of 'v
type ActorBuilder() =
member this.Bind(m : IO<'msg>, f :'msg -> _) =
Func (fun m -> f m)
member this.Bind(x : Cont<'m,'a>, f :'a -> Cont<'m,'b>) : Cont<'m,'b> =
match x with
| Func fx -> Func(fun m -> this.Bind(fx m, f))
| Return v -> f v
member this.ReturnFrom(x) = x
member this.Return x = Return x
member this.Zero() = Return ()
member this.TryWith(f:Cont<'m,'a>,c: exn -> Cont<'m,'a>): Cont<'m,'a> =
Func(fun m ->
match f with
| Func fn ->
try
true, fn m
with
| ex -> false, c ex
| _ -> false, f
|> function
| true,r -> this.TryWith(r, c)
| false,r -> r
)
member this.Delay(f: unit -> Cont<_,_>) =
f()
member this.Combine(f,g) =
match f with
| Func fx -> Func(fun m -> this.Combine(fx m, g))
| Return v -> g
let actor = ActorBuilder()
type FunActor<'m,'v>(actor: Expr<IO<'m> -> Cont<'m,'v>>) =
inherit ActorBase()
let mutable state = actor.Compile() () Input
override x.OnReceive(msg) =
let message = msg :?> 'm
match state with
| Func f -> state <- f message
| Return v -> x.PostStop()
module Linq =
open System.Linq.Expressions
let (|Lambda|_|) (e:Expression) =
match e with
| :? LambdaExpression as l -> Some(l.Parameters, l.Body)
| _ -> None
let (|Call|_|) (e:Expression) =
match e with
| :? MethodCallExpression as c -> Some(c.Object,c.Method,c.Arguments)
| _ -> None
let (|Method|) (e:System.Reflection.MethodInfo) = e.Name
let (|Invoke|_|) =
function
| Call(o,Method("Invoke"),_) -> Some o
| _ -> None
let (|Ar|) (p:System.Collections.ObjectModel.ReadOnlyCollection<Expression>) = Array.ofSeq p
type Expression =
static member ToExpression (f:System.Linq.Expressions.Expression<System.Func<FunActor<'m,'v>>>) =
match f with
| Lambda(_,Invoke(Call(null, Method "ToFSharpFunc", Ar [|Lambda(_,p)|]))) ->
Expression.Lambda(p,[||]) :?> System.Linq.Expressions.Expression<System.Func<FunActor<'m,'v>>>
| _ -> failwith "Doesn't match"
open Nessos.FsPickler
open Akka.Serialization
open Quotations.Patterns
type ExprSerializer(system) =
inherit Serializer(system)
let fsp = new FsPickler()
override x.Identifier = 9
override x.IncludeManifest = true
override x.ToBinary(o) =
use stream = new System.IO.MemoryStream()
fsp.Serialize(o.GetType(),stream, o)
stream.ToArray()
override x.FromBinary(bytes, t) =
use stream = new System.IO.MemoryStream(bytes)
fsp.Deserialize(t, stream)
type Message =
| Inc of int
| Dec of int
| Start of int
| Stop
| Shutdown
module Actor =
let system name configStr =
let config = Akka.Configuration.ConfigurationFactory.ParseString configStr
let system = ActorSystem.Create(name, config)
let serializer = new ExprSerializer(system)
system.Serialization.AddSerializer(serializer)
system.Serialization.AddSerializationMap(typeof<Expr>, serializer)
system.Serialization.AddSerializationMap(typeof<Message>, serializer)
system
let spawn (system:ActorSystem) name (f: (Expr<IO<'m> -> Cont<'m,'v>>)) =
let e = Linq.Expression.ToExpression(fun () -> new FunActor<'m,'v>(f))
system.ActorOf(Props.Create(e), name)
// Learn more about F# at http://fsharp.net
// See the 'F# Tutorial' project for more help.
open Akka.Actor
open FsAkka
[<EntryPoint>]
let main argv =
use system =
Actor.system "system1" @"
akka {
log-config-on-start = on
stdout-loglevel = DEBUG
loglevel = ERROR
actor {
provider = ""Akka.Remote.RemoteActorRefProvider, Akka.Remote""
debug {
receive = on
autoreceive = on
lifecycle = on
event-stream = on
unhandled = on
}
deployment {
/local {
router = round-robin-pool
nr-of-instances = 10
}
/remote {
remote = ""akka.tcp://system2@localhost:8080""
}
}
}
remote {
tcp-transport {
transport-class = ""Akka.Remote.Transport.TcpTransport, Akka.Remote""
applied-adapters = []
transport-protocol = tcp
port = 8090
hostname = localhost
}
}
}
"
let a =
Actor.spawn system "remote"
<| <@ fun recv ->
let rec loop s =
actor {
let! msg = recv
printfn "%d" s
match msg with
| Inc n ->
return! loop (s + n)
| Dec n ->
return! loop (s - n)
| Stop -> return! stop ()
| _ -> return! loop s
}
and stop () = actor {
let! m = recv
match m with
| Shutdown ->
printfn "Shutdown"
return ()
| _ ->
printfn "I'm stopped"
return! stop()
}
let rec handleFirstMessages() =
actor {
let! m = recv
match m with
| Start n ->
printfn "We can now start with %d" n
return n
| _ ->
printfn "Skip while its not Start (%A)!" m
return! handleFirstMessages() }
let trySomething() =
actor {
try
let! m = recv
printfn "in try %A" m
match m with
| Stop -> failwith "Stop should not follow start"
| _ -> printfn "skip"
printfn "Should not appear"
let! m = recv
printfn "Should not appear"
with
| _ -> printfn "catched"
}
actor {
let! v = handleFirstMessages()
printfn "value from fn %d" v
do! trySomething()
return! loop v
} @>
System.Console.ReadLine() |> ignore
let (<!) (x:ActorRef) m = x.Tell m
a <! Stop
a <! Inc 1
a <! Start 7
a <! Stop
[0..10] |> List.iter(fun _ -> a <! Inc 2)
[0..10] |> List.iter (fun _ -> a <! Dec 1)
a <! Stop
[0..10] |> List.iter (fun _ -> a <! Inc 1)
a <! Shutdown
a <! Stop
System.Console.ReadLine() |> ignore
0 // return an integer exit code
// Learn more about F# at http://fsharp.net
// See the 'F# Tutorial' project for more help.
open Akka.Actor
open FsAkka
[<EntryPoint>]
let main argv =
use remote = Actor.system "system2" @"
akka {
log-config-on-start = on
stdout-loglevel = DEBUG
loglevel = ERROR
actor {
provider = ""Akka.Remote.RemoteActorRefProvider, Akka.Remote""
debug {
receive = on
autoreceive = on
lifecycle = on
event-stream = on
unhandled = on
}
}
remote {
tcp-transport {
transport-class = ""Akka.Remote.Transport.TcpTransport, Akka.Remote""
applied-adapters = []
transport-protocol = tcp
port = 8080
hostname = localhost
}
}
}
"
System.Console.ReadLine() |> ignore
0 // return an integer exit code
@thinkbeforecoding

This comment has been minimized.

Copy link
Owner Author

thinkbeforecoding commented Mar 17, 2014

The sample also use FsPowerTools.Linq.Community to compile Expr trees

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.