Created
October 24, 2016 02:46
-
-
Save manofstick/dc6d95334d75029a4703f062ba8265cd to your computer and use it in GitHub Desktop.
Chunking vs Nessos.Streams
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
open System.Diagnostics | |
open Nessos.Streams | |
[<Struct; NoComparison; NoEquality>] | |
type Chunk<'T> = | |
[<DefaultValue false>] val mutable _toProcess : int | |
[<DefaultValue false>] val mutable _1 : 'T | |
[<DefaultValue false>] val mutable _2 : 'T | |
[<DefaultValue false>] val mutable _3 : 'T | |
[<DefaultValue false>] val mutable _4 : 'T | |
[<DefaultValue false>] val mutable _5 : 'T | |
[<DefaultValue false>] val mutable _6 : 'T | |
[<DefaultValue false>] val mutable _7 : 'T | |
[<DefaultValue false>] val mutable _8 : 'T | |
[<AbstractClass>] | |
type Chunker<'T,'U>() = | |
abstract ProcessNext : byref<Chunk<'T>> -> bool | |
[<AbstractClass>] | |
type Map<'T,'U> = | |
inherit Chunker<'T,'U> | |
val mutable output : Chunk<'U> | |
new () = { output = Unchecked.defaultof<_> } | |
[<AbstractClass>] | |
type Sum<'T> = | |
inherit Chunker<'T,'T> | |
val mutable total : 'T | |
new () = { total = Unchecked.defaultof<_> } | |
member __.Total = __.total | |
let inline mapper (f:'T->'U) (next:Chunker<'U,'V>) = | |
{ new Map<'T,'U>() with | |
override this.ProcessNext (input) = | |
let toProcess = input._toProcess | |
if toProcess &&& 0x01 <> 0 then this.output._1 <- f input._1 | |
if toProcess &&& 0x02 <> 0 then this.output._2 <- f input._2 | |
if toProcess &&& 0x04 <> 0 then this.output._3 <- f input._3 | |
if toProcess &&& 0x08 <> 0 then this.output._4 <- f input._4 | |
if toProcess &&& 0x10 <> 0 then this.output._5 <- f input._5 | |
if toProcess &&& 0x20 <> 0 then this.output._6 <- f input._6 | |
if toProcess &&& 0x40 <> 0 then this.output._7 <- f input._7 | |
if toProcess &&& 0x80 <> 0 then this.output._8 <- f input._8 | |
this.output._toProcess <- toProcess | |
next.ProcessNext (&this.output) } | |
let inline filter (f:'T->bool) (next:Chunker<'T,'U>) = | |
{ new Chunker<'T,'U>() with | |
override this.ProcessNext (items) = | |
let input = items._toProcess | |
let mutable output = 0 | |
if input &&& 0x01 <> 0 && f items._1 then output <- output ||| 0x01 | |
if input &&& 0x02 <> 0 && f items._2 then output <- output ||| 0x02 | |
if input &&& 0x04 <> 0 && f items._3 then output <- output ||| 0x04 | |
if input &&& 0x08 <> 0 && f items._4 then output <- output ||| 0x08 | |
if input &&& 0x10 <> 0 && f items._5 then output <- output ||| 0x10 | |
if input &&& 0x20 <> 0 && f items._6 then output <- output ||| 0x20 | |
if input &&& 0x40 <> 0 && f items._7 then output <- output ||| 0x40 | |
if input &&& 0x80 <> 0 && f items._8 then output <- output ||| 0x80 | |
if output = 0 then false | |
else | |
items._toProcess <- output | |
next.ProcessNext (&items) } | |
let inline sum () = | |
{ new Sum<'T>() with | |
override this.ProcessNext (items) = | |
let input = items._toProcess | |
if input &&& 0x01 <> 0 then this.total <- this.total + items._1 | |
if input &&& 0x02 <> 0 then this.total <- this.total + items._2 | |
if input &&& 0x04 <> 0 then this.total <- this.total + items._3 | |
if input &&& 0x08 <> 0 then this.total <- this.total + items._4 | |
if input &&& 0x10 <> 0 then this.total <- this.total + items._5 | |
if input &&& 0x20 <> 0 then this.total <- this.total + items._6 | |
if input &&& 0x40 <> 0 then this.total <- this.total + items._7 | |
if input &&& 0x80 <> 0 then this.total <- this.total + items._8 | |
true } | |
[<EntryPoint>] | |
let main argv = | |
let data = Array.init 40000000 int64 | |
let dummy () = | |
let forth = sum () | |
let third = filter (fun x -> x % 11L <> 0L) forth | |
let second = mapper (fun x -> x + 1L) third | |
let first = filter (fun x -> x % 13L <> 0L) second | |
let mutable idx = 0 | |
let mutable chunk = Unchecked.defaultof<Chunk<_>> | |
while idx < data.Length-8 do | |
chunk._toProcess <- 255 | |
chunk._1 <- data.[idx+0] | |
chunk._2 <- data.[idx+1] | |
chunk._3 <- data.[idx+2] | |
chunk._4 <- data.[idx+3] | |
chunk._5 <- data.[idx+4] | |
chunk._6 <- data.[idx+5] | |
chunk._7 <- data.[idx+6] | |
chunk._8 <- data.[idx+7] | |
first.ProcessNext (&chunk) |> ignore | |
idx <- idx + 8 | |
while idx < data.Length do | |
chunk._toProcess <- 1 | |
chunk._1 <- data.[idx+0] | |
first.ProcessNext (&chunk) |> ignore | |
idx <- idx + 1 | |
forth.Total | |
for i = 1 to 10 do | |
let sw = Stopwatch.StartNew () | |
let run = | |
#if BASELINE | |
data | |
|> Stream.ofSeq | |
|> Stream.filter (fun x -> x % 13L <> 0L) | |
|> Stream.map (fun x -> x + 1L) | |
|> Stream.filter (fun x -> x % 11L <> 0L) | |
|> Stream.sum | |
#else | |
dummy () | |
#endif | |
printfn "%d (%d)" sw.ElapsedMilliseconds run | |
0 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment