Skip to content

Instantly share code, notes, and snippets.

@vasily-kirichenko
Created June 30, 2018 10:06
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save vasily-kirichenko/34c91c16de422b1e8eb4abfd888adb4e to your computer and use it in GitHub Desktop.
Save vasily-kirichenko/34c91c16de422b1e8eb4abfd888adb4e to your computer and use it in GitHub Desktop.
open Akka.Actor
open Akka.Streams
open Akkling.Streams
open System
type Image = Image of string
let imageJob delay jobName (Image image) =
async {
printfn "%s Started: %s" jobName image
do! Async.Sleep delay
let newImage = sprintf "%s [%s]" image jobName
printfn "%s Completed: %s" jobName newImage
return Image newImage
}
let scaleImage = imageJob 2000 "Scaling"
let filterImage = imageJob 1500 "Filtering"
let displayImage = imageJob 500 "Displaying"
[<EntryPoint>]
let main argv =
let queueLength = 3
let system = ActorSystem.Create "image-processing"
let mat = ActorMaterializer.Create system
let asyncMap f =
Source.buffer OverflowStrategy.Backpressure queueLength >>
Source.asyncMapUnordered Environment.ProcessorCount f
[Image "Foo.png"; Image "Bar.png"; Image "Baz.png"]
|> Source.ofList
|> asyncMap scaleImage
|> asyncMap filterImage
|> asyncMap displayImage
|> Source.runWith mat Sink.ignore
|> Async.RunSynchronously
0
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment