Skip to content

Instantly share code, notes, and snippets.

@kevmal
Created April 2, 2019 22:06
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kevmal/d5382e8d2b0059a179341805be2fc2d3 to your computer and use it in GitHub Desktop.
Save kevmal/d5382e8d2b0059a179341805be2fc2d3 to your computer and use it in GitHub Desktop.
// Need packages:
// System.Reactive
// Trill
open Microsoft.StreamProcessing
open System
open System.Reactive.Linq
type ContextSwitch =
{
Tick : int64
ProcessId : int64
CpuId : int64
CpuTemp : int64
}
let dataStr = "0 1 1 120|0 3 2 121|0 5 3 124|120 2 1 123|300 1 1 122|1800 4 2 125|3540 2 1 119|3600 1 1 120"
let data =
dataStr.Split('|')
|> Array.filter (String.IsNullOrWhiteSpace >> not)
|> Array.map (fun x -> x.Trim().Trim('\r').Split ' ')
|> Array.map
(fun x ->
{
Tick = int64 x.[0]
ProcessId = int64 x.[1]
CpuId = int64 x.[2]
CpuTemp = int64 x.[3]
}
)
let contextSwitchObservable = data.ToObservable()
let contextSwitchStreamEventObservable = contextSwitchObservable |> Observable.map (fun e -> StreamEvent.CreateInterval(e.Tick, e.Tick + 1L, e))
let contextSwitchIngressStreamable = contextSwitchStreamEventObservable.ToStreamable(DisorderPolicy.Drop())
let contextSwitchStreamable : IStreamable<Empty, ContextSwitch> = contextSwitchIngressStreamable :> _
let passthroughContextSwitchStreamEventObservable = contextSwitchStreamable.ToStreamEventObservable()
passthroughContextSwitchStreamEventObservable.Where(fun e -> e.IsData).ForEachAsync(fun e -> Console.WriteLine(e.ToString())).Wait()
let contextSwitchTwoCores = contextSwitchStreamable.Where(fun p -> p.CpuId = 1L || p.CpuId = 2L)
let contextSwitchTwoCoresNoTemp = contextSwitchTwoCores.Select(fun e -> {| Tick = e.Tick ; ProcessId = e.ProcessId; CpuId = e.CpuId |})
type Process =
{
ProcessId : int64
Name : string
}
let processes =
[
{ProcessId = 1L; Name = "Word"}
{ProcessId = 2L; Name = "Internet Explorer"}
{ProcessId = 3L; Name = "Excel"}
{ProcessId = 4L; Name = "Visual Studio"}
{ProcessId = 5L; Name = "Outlook"}
]
let namesStream = processes.ToObservable().Select(fun e -> StreamEvent.CreateInterval(0L,10000L,e)).ToStreamable()
let contextSwitchWithNames =
contextSwitchTwoCoresNoTemp.Join(namesStream,
(fun e -> e.ProcessId),
(fun e -> e.ProcessId),
(fun l r ->
{| Tick = l.Tick;
ProcessId = l.ProcessId;
CpuId = l.CpuId;
Name = r.Name |}))
let infiniteContextSwitch = contextSwitchWithNames.AlterEventDuration(StreamEvent.InfinitySyncTime)
let clippedContextSwitch = infiniteContextSwitch.Multicast(fun x -> x.ClipEventDuration(infiniteContextSwitch, (fun e -> e.CpuId), (fun e -> e.CpuId)))
let shiftedClippedContextSwitch = clippedContextSwitch.ShiftEventLifetime(1L)
let timeslices =
//shiftedClippedContextSwitch.Join(contextSwitchWithNames, (fun e -> e.CpuId) , (fun e -> e.CpuId), (fun l r -> {|ProcessId = l.ProcessId; CpuId = l.CpuId; Name = l.Name; Timeslice = r.Tick - l.Tick|}))
contextSwitchWithNames
.AlterEventDuration(StreamEvent.InfinitySyncTime)
.Multicast(fun x ->
x.ClipEventDuration(infiniteContextSwitch, (fun e -> e.CpuId), (fun e -> e.CpuId))
.ShiftEventLifetime(1L)
.Join(contextSwitchWithNames, (fun e -> e.CpuId) , (fun e -> e.CpuId), (fun l r -> {|ProcessId = l.ProcessId; CpuId = l.CpuId; Name = l.Name; Timeslice = r.Tick - l.Tick|}))
)
let timeslicesForProcess1Cpu1 = timeslices.Where(fun x -> x.CpuId = 1L && x.ProcessId = 1L)
let windowedTimeslices = timeslicesForProcess1Cpu1.AlterEventLifetime((fun origStartTime -> (1L + ((origStartTime - 1L) / 3600L)) * 3600L), 3600L)
let windowedTimeslicesForProcess1Cpu1 = timeslicesForProcess1Cpu1.HoppingWindowLifetime(3600L, 3600L)
let totalConsumptionPerPeriodForProcess1Cpu1 = windowedTimeslicesForProcess1Cpu1.Sum(fun e -> e.Timeslice)
let totalConsumptionPerPeriod = timeslices.GroupApply(
(fun e -> {| CpuId = e.CpuId; ProcessId = e.ProcessId; Name = e.Name |}),
(fun s -> s.HoppingWindowLifetime(3600L, 3600L).Sum(fun e -> e.Timeslice)),
(fun g p -> {| CpuId = g.Key.CpuId; Name = g.Key.Name; TotalTime = p |}))
totalConsumptionPerPeriodForProcess1Cpu1.ToStreamEventObservable().ToEnumerable()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment