Create a gist now

Instantly share code, notes, and snippets.

What would you like to do?
Fast projections
open System
open System.Collections.Generic
let consoleColor (fc : ConsoleColor) =
let current = Console.ForegroundColor
Console.ForegroundColor <- fc
{ new IDisposable with
member x.Dispose() = Console.ForegroundColor <- current }
let cprintf color str = Printf.kprintf (fun s -> use c = consoleColor color in printf "%s" s) str
let cprintfn color str = Printf.kprintf (fun s -> use c = consoleColor color in printfn "%s" s) str
type Statement =
| Checkpoint of Int64
| IncrementViewCount of string * int
type Batch = Statement seq
type Storage = {
ExecuteStatement : Statement -> unit
ExecuteBatch : Batch -> unit
CalculateCost : unit -> decimal
}
type Event =
| WatchedVideo of WatchedVideo
| BookmarkedVideo of BookmarkedVideo
and WatchedVideo = { ViewerId : string; VideoId : string; At : DateTime }
and BookmarkedVideo = { ViewerId : string; VideoId : string; At : DateTime }
type Envelope = {
Id : Int64
Event : Event
Historic : bool
}
type State () =
member val ExecutedStatements = new List<Statement>() with get, set
member val ExecutedBatches = new List<Batch>() with get, set
member val Checkpoint = -1L with get, set
member val ViewCount = new Dictionary<string, int>()
type EventOccurred = Envelope -> unit
let createStorage (state : State) : Storage = {
ExecuteStatement = fun stmt ->
state.ExecutedStatements.Add stmt
match stmt with
| Checkpoint value -> state.Checkpoint <- value
| IncrementViewCount (viewerId, count) ->
if state.ViewCount.ContainsKey(viewerId) then
state.ViewCount.[viewerId] <- state.ViewCount.[viewerId] + count
else
state.ViewCount.Add(viewerId, count)
ExecuteBatch = fun batch ->
state.ExecutedBatches.Add (batch |> Seq.toList)
batch
|> Seq.iter (fun stmt ->
match stmt with
| Checkpoint value -> state.Checkpoint <- value
| IncrementViewCount (viewerId, count) ->
if state.ViewCount.ContainsKey(viewerId) then
state.ViewCount.[viewerId] <- state.ViewCount.[viewerId] + count
else
state.ViewCount.Add(viewerId, count)
)
CalculateCost = fun _ ->
(state.ExecutedStatements.Count |> Convert.ToDecimal) +
((state.ExecutedBatches.Count |> Convert.ToDecimal) + ((state.ExecutedBatches |> Seq.map (fun b -> b |> Seq.length) |> Seq.sum |> Convert.ToDecimal) * 0.1M))
}
let buffer = new List<Envelope>()
let ``First try`` (storage : Storage) : EventOccurred =
fun envelope ->
match envelope.Event with
| WatchedVideo x -> IncrementViewCount (x.ViewerId, 1) |> storage.ExecuteStatement
| _ -> ()
Checkpoint envelope.Id |> storage.ExecuteStatement
let ``Less checkpointing`` (storage : Storage) : EventOccurred =
fun envelope ->
match envelope.Event with
| WatchedVideo x ->
IncrementViewCount (x.ViewerId, 1) |> storage.ExecuteStatement
Checkpoint envelope.Id |> storage.ExecuteStatement
| _ -> ()
let ``Batching`` (storage : Storage) : EventOccurred =
fun envelope ->
let flushOn = if envelope.Historic then 100 else 1
let flush () =
let batch =
buffer
|> Seq.map (fun env ->
match env.Event with
| WatchedVideo x -> IncrementViewCount (x.ViewerId, 1) |> Some
| _ -> None
)
|> Seq.choose id
if not (batch |> Seq.isEmpty) then
[ Checkpoint envelope.Id ]
|> Seq.append batch
|> storage.ExecuteBatch
buffer.Clear()
match envelope.Event with
| WatchedVideo _ -> buffer.Add envelope
| _ -> ()
if buffer.Count >= flushOn then
flush()
let ``Batching with transformation`` (storage : Storage) : EventOccurred =
fun envelope ->
let flushOn = if envelope.Historic then 100 else 1
let flush () =
let batch =
buffer
|> Seq.map (fun env -> match env.Event with | WatchedVideo x -> Some x | _ -> None)
|> Seq.choose id
|> Seq.groupBy (fun e -> e.ViewerId)
|> Seq.map (fun (viewerId, e) -> (viewerId, e |> Seq.length))
|> Seq.map (fun (viewerId, length) -> IncrementViewCount (viewerId, length))
if not (batch |> Seq.isEmpty) then
[ Checkpoint envelope.Id ]
|> Seq.append batch
|> storage.ExecuteBatch
buffer.Clear()
match envelope.Event with
| WatchedVideo _ -> buffer.Add envelope
| _ -> ()
if buffer.Count >= flushOn then
flush()
let stream = seq {
for i in 1L .. 3500L do
yield { Id = i; Event = WatchedVideo { ViewerId = "1"; VideoId = "1"; At = DateTime.UtcNow }; Historic = true }
for i in 3551L .. 3650L do
yield { Id = i; Event = WatchedVideo { ViewerId = "2"; VideoId = "1"; At = DateTime.UtcNow }; Historic = false }
for i in 3501L .. 3550L do
yield { Id = i; Event = BookmarkedVideo { ViewerId = "1"; VideoId = "1"; At = DateTime.UtcNow }; Historic = true }
}
printfn "## Constant input"
printfn "-----------------------------------------"
printfn "Stream length: %A" (stream |> Seq.length)
printfn ""
let run (name : string) (handle : Storage -> EventOccurred) =
cprintfn ConsoleColor.Yellow "## %s ##" name
cprintfn ConsoleColor.Yellow "-----------------------------------------"
let state = new State()
let storage = state |> createStorage
buffer.Clear()
stream |> Seq.iter (handle storage)
printfn "- Viewcount: %A" state.ViewCount
printfn "- Checkpoint: %A" state.Checkpoint
printfn "-----------------------------------------"
cprintfn ConsoleColor.Green "= Cost: %A" (storage.CalculateCost())
printfn ""
run "First try" ``First try``
run "Less checkpointing" ``Less checkpointing``
run "Batching" ``Batching``
run "Batching with transformation" ``Batching with transformation``
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment