Skip to content

Instantly share code, notes, and snippets.

@cloudRoutine
Last active November 15, 2015 22:27
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save cloudRoutine/18adec2e210799c1d4b1 to your computer and use it in GitHub Desktop.
Save cloudRoutine/18adec2e210799c1d4b1 to your computer and use it in GitHub Desktop.
Heterogeneous Parallel Async Composition Operator
(*
Heterogeneous Parallel Async Composition Operator
- an evolution of http://jackfoxy.com/transparent-heterogeneous-parallel-async-with-fsharp/
*)
type 'T Parallel =
private {
Compute : obj Async []
Unpack : obj [] -> int -> 'T
}
static member inline ComposeParallel (x: 'a Parallel) (f: ('a -> 'b) Parallel) : 'b Parallel = {
Compute = Array.append f.Compute x.Compute
Unpack = fun xs pos ->
let fv = f.Unpack xs pos
let xv = x.Unpack xs (pos + f.Compute.Length)
fv xv
}
static member inline ComposeAsync (x: 'a Async) (f: ('a ->'b) Parallel) : 'b Parallel = Parallel.ComposeParallel (Parallel.Await x) f
static member inline Concat (p1: 'a Parallel) (p2: 'b Parallel) : ('a * 'b) Parallel =
Parallel.Pure (fun a b -> a,b)
|> Parallel.ComposeParallel p1
|> Parallel.ComposeParallel p2
static member inline ConcatFlat (p1:('a*'b) Parallel,x:'c Async) : ('a*'b*'c) Parallel =
let p2 = Parallel.Await x
{ Compute = Array.append p1.Compute p2.Compute
Unpack = fun xs pos ->
let a,b = p1.Unpack xs pos
let c = p2.Unpack xs (pos + p1.Compute.Length)
a,b,c
}
static member inline ConcatFlat (p1:('a*'b*'c) Parallel,x:'d Async) : ('a*'b*'c*'d) Parallel =
let p2 = Parallel.Await x
{ Compute = Array.append p1.Compute p2.Compute
Unpack = fun xs pos ->
let a,b,c = p1.Unpack xs pos
let d = p2.Unpack xs (pos + p1.Compute.Length)
a,b,c,d
}
static member inline ConcatFlat (p1: ('a*'b*'c*'d) Parallel,x: 'e Async) : ('a*'b*'c*'d*'e) Parallel =
let p2 = Parallel.Await x
{ Compute = Array.append p1.Compute p2.Compute
Unpack = fun xs pos ->
let a,b,c,d = p1.Unpack xs pos
let e = p2.Unpack xs (pos + p1.Compute.Length)
a,b,c,d,e
}
static member inline ConcatFlat (p1: ('a*'b*'c*'d*'e) Parallel,x: 'f Async) : ('a*'b*'c*'d*'e*'f) Parallel =
let p2 = Parallel.Await x
{ Compute = Array.append p1.Compute p2.Compute
Unpack = fun xs pos ->
let a,b,c,d,e = p1.Unpack xs pos
let f = p2.Unpack xs (pos + p1.Compute.Length)
a,b,c,d,e,f
}
static member inline ConcatFlat (p1: ('a*'b*'c*'d*'e*'f) Parallel,x: 'g Async) : ('a*'b*'c*'d*'e*'f*'g) Parallel =
let p2 = Parallel.Await x
{ Compute = Array.append p1.Compute p2.Compute
Unpack = fun xs pos ->
let a,b,c,d,e,f = p1.Unpack xs pos
let g = p2.Unpack xs (pos + p1.Compute.Length)
a,b,c,d,e,f,g
}
static member inline ConcatFlat (p1: ('a*'b*'c*'d*'e*'f*'g) Parallel,x: 'h Async) : ('a*'b*'c*'d*'e*'f*'g*'h) Parallel =
let p2 = Parallel.Await x
{ Compute = Array.append p1.Compute p2.Compute
Unpack = fun xs pos ->
let a,b,c,d,e,f,g = p1.Unpack xs pos
let h = p2.Unpack xs (pos + p1.Compute.Length)
a,b,c,d,e,f,g,h
}
and Parallel =
static member Run<'T> (p: 'T Parallel) : 'T Async = async {
let! results =
match p.Compute.Length with
| 0 -> async.Return [|box ()|]
| 1 -> async {
let! r = p.Compute.[0]
return [| r |]
}
| _ -> Async.Parallel p.Compute
return p.Unpack results 0
}
static member Await<'T> (x: 'T Async) : 'T Parallel = {
Compute = [| async {
let! v = x
return box v
}|]
Unpack = fun xs pos -> unbox xs.[pos]
}
static member Pure<'T>(x: 'T) : 'T Parallel = {
Compute = [||]
Unpack = fun _ _ -> x
}
type Parallelizer = Parallelizer with
static member inline (?<-) (_,async1:'a Async,async2:'b Async) =
Parallel.Pure (fun x y -> x,y)
|> Parallel.ComposeAsync async1
|> Parallel.ComposeAsync async2
static member inline (?<-) (_,f:('a ->'b) Parallel,x:'a Async) = Parallel.ComposeAsync x f
static member inline (?<-) (_,p1:'a Parallel,p2:'b Parallel) = Parallel.Concat p1 p2
static member inline (?<-) (_,p:('a*'b) Parallel,x) = Parallel.ConcatFlat (p,x)
static member inline (?<-) (_,p:('a*'b*'c) Parallel,x) = Parallel.ConcatFlat (p,x)
static member inline (?<-) (_,p:('a*'b*'c*'d) Parallel,x) = Parallel.ConcatFlat (p,x)
static member inline (?<-) (_,p:('a*'b*'c*'d*'e) Parallel,x) = Parallel.ConcatFlat (p,x)
static member inline (?<-) (_,p:('a*'b*'c*'d*'e*'f) Parallel,x) = Parallel.ConcatFlat (p,x)
static member inline (?<-) (_,p:('a*'b*'c*'d*'e*'f*'g) Parallel,x) = Parallel.ConcatFlat (p,x)
let inline ( |*| ) (async1:'a) (async2:'b) = (?<-) Parallelizer async1 async2
let test0 () =
async { return 1 }
|*| async { return 'a' }
|*| async { return true }
|> Parallel.Run
|> Async.RunSynchronously
let test1 () =
async { return 1 }
|*| async { return 'a' }
|*| async { return true }
|*| async { return 'b' }
|*| async { return false }
|*| async { return 5 }
|*| async { return true }
|> Parallel.Run
|> Async.RunSynchronously
;;
let result0 = test0 ()
let result1 = test1 ()
(*
val result0 : int * char * bool = (1, 'a', true)
val result1 : int * char * bool * char * bool * int * bool =
(1, 'a', true, 'b', false, 5, true)
*)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment