Last active
July 30, 2017 12:14
-
-
Save JefClaes/215d202fcdf9aa58968b92a129241292 to your computer and use it in GitHub Desktop.
Fast projections
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
open System | |
open System.Collections.Generic | |
let consoleColor (fc : ConsoleColor) = | |
let current = Console.ForegroundColor | |
Console.ForegroundColor <- fc | |
{ new IDisposable with | |
member x.Dispose() = Console.ForegroundColor <- current } | |
let cprintf color str = Printf.kprintf (fun s -> use c = consoleColor color in printf "%s" s) str | |
let cprintfn color str = Printf.kprintf (fun s -> use c = consoleColor color in printfn "%s" s) str | |
type Statement = | |
| Checkpoint of Int64 | |
| IncrementViewCount of string * int | |
type Batch = Statement seq | |
type Storage = { | |
ExecuteStatement : Statement -> unit | |
ExecuteBatch : Batch -> unit | |
CalculateCost : unit -> decimal | |
} | |
type Event = | |
| WatchedVideo of WatchedVideo | |
| BookmarkedVideo of BookmarkedVideo | |
and WatchedVideo = { ViewerId : string; VideoId : string; At : DateTime } | |
and BookmarkedVideo = { ViewerId : string; VideoId : string; At : DateTime } | |
type Envelope = { | |
Id : Int64 | |
Event : Event | |
Historic : bool | |
} | |
type State () = | |
member val ExecutedStatements = new List<Statement>() with get, set | |
member val ExecutedBatches = new List<Batch>() with get, set | |
member val Checkpoint = -1L with get, set | |
member val ViewCount = new Dictionary<string, int>() | |
type EventOccurred = Envelope -> unit | |
let createStorage (state : State) : Storage = { | |
ExecuteStatement = fun stmt -> | |
state.ExecutedStatements.Add stmt | |
match stmt with | |
| Checkpoint value -> state.Checkpoint <- value | |
| IncrementViewCount (viewerId, count) -> | |
if state.ViewCount.ContainsKey(viewerId) then | |
state.ViewCount.[viewerId] <- state.ViewCount.[viewerId] + count | |
else | |
state.ViewCount.Add(viewerId, count) | |
ExecuteBatch = fun batch -> | |
state.ExecutedBatches.Add (batch |> Seq.toList) | |
batch | |
|> Seq.iter (fun stmt -> | |
match stmt with | |
| Checkpoint value -> state.Checkpoint <- value | |
| IncrementViewCount (viewerId, count) -> | |
if state.ViewCount.ContainsKey(viewerId) then | |
state.ViewCount.[viewerId] <- state.ViewCount.[viewerId] + count | |
else | |
state.ViewCount.Add(viewerId, count) | |
) | |
CalculateCost = fun _ -> | |
(state.ExecutedStatements.Count |> Convert.ToDecimal) + | |
((state.ExecutedBatches.Count |> Convert.ToDecimal) + ((state.ExecutedBatches |> Seq.map (fun b -> b |> Seq.length) |> Seq.sum |> Convert.ToDecimal) * 0.1M)) | |
} | |
let buffer = new List<Envelope>() | |
let ``First try`` (storage : Storage) : EventOccurred = | |
fun envelope -> | |
match envelope.Event with | |
| WatchedVideo x -> IncrementViewCount (x.ViewerId, 1) |> storage.ExecuteStatement | |
| _ -> () | |
Checkpoint envelope.Id |> storage.ExecuteStatement | |
let ``Less checkpointing`` (storage : Storage) : EventOccurred = | |
fun envelope -> | |
match envelope.Event with | |
| WatchedVideo x -> | |
IncrementViewCount (x.ViewerId, 1) |> storage.ExecuteStatement | |
Checkpoint envelope.Id |> storage.ExecuteStatement | |
| _ -> () | |
let ``Batching`` (storage : Storage) : EventOccurred = | |
fun envelope -> | |
let flushOn = if envelope.Historic then 100 else 1 | |
let flush () = | |
let batch = | |
buffer | |
|> Seq.map (fun env -> | |
match env.Event with | |
| WatchedVideo x -> IncrementViewCount (x.ViewerId, 1) |> Some | |
| _ -> None | |
) | |
|> Seq.choose id | |
if not (batch |> Seq.isEmpty) then | |
[ Checkpoint envelope.Id ] | |
|> Seq.append batch | |
|> storage.ExecuteBatch | |
buffer.Clear() | |
match envelope.Event with | |
| WatchedVideo _ -> buffer.Add envelope | |
| _ -> () | |
if buffer.Count >= flushOn then | |
flush() | |
let ``Batching with transformation`` (storage : Storage) : EventOccurred = | |
fun envelope -> | |
let flushOn = if envelope.Historic then 100 else 1 | |
let flush () = | |
let batch = | |
buffer | |
|> Seq.map (fun env -> match env.Event with | WatchedVideo x -> Some x | _ -> None) | |
|> Seq.choose id | |
|> Seq.groupBy (fun e -> e.ViewerId) | |
|> Seq.map (fun (viewerId, e) -> (viewerId, e |> Seq.length)) | |
|> Seq.map (fun (viewerId, length) -> IncrementViewCount (viewerId, length)) | |
if not (batch |> Seq.isEmpty) then | |
[ Checkpoint envelope.Id ] | |
|> Seq.append batch | |
|> storage.ExecuteBatch | |
buffer.Clear() | |
match envelope.Event with | |
| WatchedVideo _ -> buffer.Add envelope | |
| _ -> () | |
if buffer.Count >= flushOn then | |
flush() | |
let stream = seq { | |
for i in 1L .. 3500L do | |
yield { Id = i; Event = WatchedVideo { ViewerId = "1"; VideoId = "1"; At = DateTime.UtcNow }; Historic = true } | |
for i in 3551L .. 3650L do | |
yield { Id = i; Event = WatchedVideo { ViewerId = "2"; VideoId = "1"; At = DateTime.UtcNow }; Historic = false } | |
for i in 3501L .. 3550L do | |
yield { Id = i; Event = BookmarkedVideo { ViewerId = "1"; VideoId = "1"; At = DateTime.UtcNow }; Historic = true } | |
} | |
printfn "## Constant input" | |
printfn "-----------------------------------------" | |
printfn "Stream length: %A" (stream |> Seq.length) | |
printfn "" | |
let run (name : string) (handle : Storage -> EventOccurred) = | |
cprintfn ConsoleColor.Yellow "## %s ##" name | |
cprintfn ConsoleColor.Yellow "-----------------------------------------" | |
let state = new State() | |
let storage = state |> createStorage | |
buffer.Clear() | |
stream |> Seq.iter (handle storage) | |
printfn "- Viewcount: %A" state.ViewCount | |
printfn "- Checkpoint: %A" state.Checkpoint | |
printfn "-----------------------------------------" | |
cprintfn ConsoleColor.Green "= Cost: %A" (storage.CalculateCost()) | |
printfn "" | |
run "First try" ``First try`` | |
run "Less checkpointing" ``Less checkpointing`` | |
run "Batching" ``Batching`` | |
run "Batching with transformation" ``Batching with transformation`` |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment