Skip to content

Instantly share code, notes, and snippets.

@ane
Last active August 29, 2015 14:14
Show Gist options
  • Save ane/e8768bdab8fd98da64ab to your computer and use it in GitHub Desktop.
Save ane/e8768bdab8fd98da64ab to your computer and use it in GitHub Desktop.
Simple Producer&Consumer program using Redis lists and Pub/Sub.
module WorkerProducer.Producer
open Nessos.FsPickler
open StackExchange.Redis
open System
open System.Text
open System.Threading
open WorkerProducer.Worker
let pickler = FsPickler.CreateBinary()
let random = new Random()
let genRandomTask() =
let rnd = random.Next(0, 3)
match rnd with
| 0 -> Double(random.Next(64))
| 1 -> Half(random.Next(64))
| _ -> Snore
let processResult (chan : RedisChannel) (msg : RedisValue) =
let res = pickler.UnPickle(ResultPickler, ~~msg)
match res with
| Value((cmd, i), result) -> printfn "%A Job #%d with command %A finished with result %A." DateTime.Now i cmd result
| Zzz(i) -> printfn "%A Worker fell asleep doing job #%d." DateTime.Now i
[<EntryPoint>]
let main argv =
let redis = ConnectionMultiplexer.Connect("localhost")
let subscriber = redis.GetSubscriber()
let db = redis.GetDatabase()
let cts = new CancellationTokenSource()
let handler (evt : ConsoleCancelEventArgs) =
printf "Shutting down... "
cts.Cancel()
subscriber.Publish(~~CommandChannel, ~~"stop") |> ignore
evt.Cancel <- true
printfn "done."
Console.ReadKey() |> ignore
exit (0)
let produce i =
let job = pickler.Pickle(JobPickler, (genRandomTask(), i))
db.ListRightPush(~~WorkQueue, ~~job) |> ignore
Thread.Sleep(500)
Console.CancelKeyPress.Add handler
subscriber.Subscribe(~~ResultChannel, Action<RedisChannel, RedisValue>(processResult))
// infinite loop
Seq.unfold (fun i -> Some(produce i, i + 1)) 0
// "unthunk" the seq
|> Seq.length
|> ignore
0
module WorkerProducer.Worker
open Nessos.FsPickler
open Nessos.FsPickler.Combinators
open StackExchange.Redis
open System
open System.Diagnostics
open System.Text
open System.Threading
let inline (~~) (x : ^a) : ^b = ((^a or ^b) : (static member op_Implicit : ^a -> ^b) x)
let WorkQueue = "work_queue"
let CommandChannel = "cmd_chan"
let ResultChannel = "work_result"
let pickler = FsPickler.CreateBinary()
type Task =
| Double of x : int
| Half of x : int
| Snore
type Job = Task * int
type Result =
| Value of Job * result : int
| Zzz of int
let TaskPickler =
Pickler.sum (fun v c1 c2 c3 ->
match v with
| Double(x) -> c1 x
| Half(x) -> c2 x
| Snore -> c3())
^+ Pickler.case Double Pickler.int ^+ Pickler.case Half Pickler.int ^. Pickler.variant Snore
let JobPickler = Pickler.pair TaskPickler Pickler.int
let ResultPickler =
Pickler.sum (fun v c1 c2 ->
match v with
| Value(job, result) -> c1 (job, result)
| Zzz(x) -> c2 x)
^+ Pickler.case Value (Pickler.pair JobPickler Pickler.int) ^. Pickler.case Zzz Pickler.int
type TaskType =
| Delay
| Quit
let work (db : IDatabase, subscriber : ISubscriber, interval) =
let cts = new CancellationTokenSource()
let rec loop() =
async {
let task = db.ListLeftPop ~~WorkQueue
let publish (result : Result) =
let pickled = pickler.Pickle(ResultPickler, result)
subscriber.Publish(~~ResultChannel, ~~pickled) |> ignore
if task.HasValue then
let job = pickler.UnPickle(JobPickler, ~~task)
let cmd, id = job
printfn "%A Got job %A." DateTime.Now job
match fst job with
| Double(x) -> Value(job, x * 2)
| Half(x) -> Value(job, int (x / 2))
| Snore -> Zzz(snd job)
|> publish
do! Async.Sleep interval
return! loop()
else
do! Async.Sleep interval
return! loop()
}
subscriber.Subscribe(~~CommandChannel, fun _ _ -> cts.Cancel())
try
Async.RunSynchronously(loop(), cancellationToken = cts.Token)
with :? OperationCanceledException -> printfn "Shut down."
[<EntryPoint>]
let main argv =
let redis = ConnectionMultiplexer.Connect("localhost")
let db = redis.GetDatabase()
work (db, redis.GetSubscriber(), 1000)
Console.WriteLine("Work completed.")
0
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment