Skip to content

Instantly share code, notes, and snippets.

@philcleveland
Last active January 7, 2016 22:48
Show Gist options
  • Save philcleveland/4fbfae0e3fc91939d6a2 to your computer and use it in GitHub Desktop.
Save philcleveland/4fbfae0e3fc91939d6a2 to your computer and use it in GitHub Desktop.
Akka Producer/Consumer with infinite polling loop
// Learn more about F# at http://fsharp.org
// See the 'F# Tutorial' project for more help.
open System
open System.IO
open Akka.FSharp
type ProducerCommand =
| Start
| Stop
[<EntryPoint>]
let main argv =
printfn "%A" argv
let system = System.create "system" <| Configuration.defaultConfig ()
let ProducerEndpoint consumer id interval (mailbox: Actor<_>) =
printfn "Created Producer %s" id
let mutable cancellationSource = new System.Threading.CancellationTokenSource()
let rec timedPollingLoop interval =
let timer = new System.Diagnostics.Stopwatch()
async{
timer.Start()
while timer.ElapsedMilliseconds < interval do
do! Async.Sleep (int(interval - timer.ElapsedMilliseconds))
consumer <! printfn "Hello from Producer %s Thread %d" id System.Threading.Thread.CurrentThread.ManagedThreadId
timer.Reset()
do!timedPollingLoop interval
}
let rec loop () = actor {
let! message = mailbox.Receive ()
match message with
| Start ->
cancellationSource <- new System.Threading.CancellationTokenSource()
printfn "Producer %s starting on Thread %d" id System.Threading.Thread.CurrentThread.ManagedThreadId
Async.Start(timedPollingLoop interval, cancellationSource.Token)
| Stop ->
printfn "Producer stopping on Thread %d" System.Threading.Thread.CurrentThread.ManagedThreadId
cancellationSource.Cancel()
return! loop ()
}
loop ()
let ConsumerEndpoint (mailbox: Actor<_>) =
let rec loop () = actor {
let! message = mailbox.Receive ()
printfn "Consumer received %A" message
}
loop()
let consumer1 = spawn system "ConsumerEndpoint" ConsumerEndpoint
let producer = spawn system "ProducerA" <| ProducerEndpoint consumer1 "A" 15L
let producer2 = spawn system "ProducerB" <| ProducerEndpoint consumer1 "B" 15L
let producer3 = spawn system "ProducerC" <| ProducerEndpoint consumer1 "C" 15L
let rec processKeyInput() =
let key = Console.ReadKey()
match key.Key with
| ConsoleKey.Spacebar ->
producer <! Start
producer2 <! Start
producer3 <! Start
processKeyInput()
| ConsoleKey.S ->
producer <! Stop
producer2 <! Stop
producer3 <! Stop
processKeyInput()
| ConsoleKey.Escape -> printfn "Leaving"
| _ ->
printfn "Escape to leave, Space to start, S to stop"
processKeyInput()
processKeyInput() |> ignore
0 // return an integer exit code
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment