Created July 8, 2022 09:47
Illustrates the use of Log Analytics Query API to get Max CPU millicores and Max Memory RSS in bytes
// getVirtualNodeUsageStats fetches the virtual node max CPU milli-cores and max memory used in bytes within a span of
// startDateTime to endDateTime. KQL requires the client to ensure that these times are in UTC and ISO 8061 format(RFC3339)
func getVirtualNodeUsageStats(ctx context.Context, tokenCredential azcore.TokenCredential, subscriptionId, clusterName, resourceGroup string, startDateTime, endDateTime time.Time) (map[string]float64, error) {
client, err := armoperationalinsights.NewWorkspacesClient(subscriptionId, tokenCredential, nil)
if err != nil {
return nil, err
workspace, err := client.Get(ctx, resourceGroup, workspaceName, nil)
if err != nil {
return nil, err
logAnalyticsQueryURL := "{workspace-id}/query"
logAnalyticsQueryURL = strings.ReplaceAll(logAnalyticsQueryURL, "{workspace-id}", *workspace.Workspace.Properties.CustomerID)
var body struct {
Query string `json:"query"`
body.Query = getUsageMetricQuery(subscriptionId, resourceGroup, clusterName, startDateTime, endDateTime)
bodyBytes, err := json.Marshal(body)
if err != nil {
return nil, err
token, err := requestTokenWithLogAnalyticsScope(ctx, tokenCredential)
if err != nil {
return nil, err
req, err := http.NewRequest(http.MethodPost, logAnalyticsQueryURL, bytes.NewBuffer(bodyBytes))
if err != nil {
return nil, err
req.Header.Add("content-type", "application/json")
req.Header.Add("authorization", fmt.Sprintf("Bearer %s", token.Token))
response, err := http.DefaultClient.Do(req)
if err != nil {
return nil, err
responseBody, err := ioutil.ReadAll(response.Body)
if err != nil {
return nil, err
defer response.Body.Close()
stats, err := deserializeAsMap(responseBody)
if err != nil {
if err == ErrNoData {
zap.S().Warnf("%s, %s, %s, %s", err.Error(), resourceGroup, clusterName, *workspace.ID)
return nil, err
return stats, nil
// getUsageMetricQuery builds the KQL query for fetching max CPU and memory usage
func getUsageMetricQuery(subscriptionId, resourceGroup, clusterName string, startDate, endDate time.Time) string {
startDate = startDate.UTC()
endDate = endDate.UTC()
startDateString := startDate.Format(time.RFC3339)
endDateString := endDate.Format(time.RFC3339)
var query = `
let subscriptionId = '{subscription-id}';
let resourceGroup = '{resource-group}';
let clusterName = '{cluster-name}';
let startDateTime = datetime({start-tate-time});
let endDateTime = datetime({end-date-time});
let clusterId = strcat('/subscriptions/', subscriptionId, '/resourceGroups/', resourceGroup, '/providers/Microsoft.ContainerService/managedClusters/', clusterName);
let memoryUsageCounterName = 'memoryRssBytes';
let primaryInventory = KubePodInventory |
where TimeGenerated >= startDateTime |
where TimeGenerated < endDateTime |
where isnotempty(ClusterName) |
where isnotempty(Namespace) |
extend Node = Computer |
where ClusterId == clusterId |
where Node == 'virtual-node-aci-linux' |
project TimeGenerated, ClusterId, ClusterName, Namespace, ServiceName, Node = Computer, ControllerName, Pod = Name, ContainerInstance = ContainerName, ContainerID, InstanceName, PerfJoinKey = strcat(ClusterId, '/', ContainerName), ReadySinceNow = format_timespan(endDateTime - ContainerCreationTimeStamp, 'ddd.hh:mm:ss.fff'), Restarts = ContainerRestartCount, Status = ContainerStatus, ContainerStatusReason = columnifexists('ContainerStatusReason', ''), ControllerKind = ControllerKind, PodStatus, ControllerId = strcat(ClusterId, '/', Namespace, '/', ControllerName);
let latestContainersByController = primaryInventory |
where isnotempty(Node) |
summarize arg_max(TimeGenerated, *) by PerfJoinKey |
project ControllerId, PerfJoinKey;
let filteredMemoryUsage = Perf |
where TimeGenerated >= startDateTime |
where TimeGenerated < endDateTime |
where ObjectName == 'K8SContainer' |
where InstanceName startswith clusterId |
project TimeGenerated, CounterName, CounterValue, InstanceName, Node = Computer |
where Node == 'virtual-node-aci-linux';
let memoryUsageByController = filteredMemoryUsage |
where CounterName =~ memoryUsageCounterName |
extend PerfJoinKey = InstanceName |
summarize Value = max(CounterValue) by PerfJoinKey, CounterName |
join (latestContainersByController) on PerfJoinKey |
summarize Value = sum(Value) by ControllerId, CounterName |
project ControllerId, CounterName, MemoryAggregationValue = Value;
let CPUUsageCounterName = 'cpuUsageNanoCores';
let filteredCPUUsage = Perf |
where TimeGenerated >= startDateTime |
where TimeGenerated < endDateTime |
where ObjectName == 'K8SContainer' |
where InstanceName startswith clusterId |
project TimeGenerated, CounterName, CounterValue, InstanceName, Node = Computer |
where Node == 'virtual-node-aci-linux';
let CPUUsageByController = filteredCPUUsage |
where CounterName =~ CPUUsageCounterName |
extend PerfJoinKey = InstanceName |
summarize Value = max(CounterValue) by PerfJoinKey, CounterName |
join (latestContainersByController) on PerfJoinKey |
summarize Value = sum(Value) by ControllerId, CounterName |
project ControllerId, CounterName, CPUAggregationValue = Value/1000000;
let maxMemoryUsage = primaryInventory |
distinct ControllerId, ControllerName, ControllerKind, Namespace |
join kind=leftouter (memoryUsageByController) on ControllerId |
project MaxMemoryRSS = MemoryAggregationValue, ControllerId;
let maxCPUUsage = primaryInventory |
distinct ControllerId, ControllerName, ControllerKind, Namespace |
join kind=leftouter (CPUUsageByController) on ControllerId |
project MaxCPUUsage = CPUAggregationValue, ControllerId;
maxMemoryUsage |
join(maxCPUUsage) on ControllerId |
project MaxCPUUsage, MaxMemoryRSS |
summarize val1 = sum(MaxCPUUsage), val2 = sum(MaxMemoryRSS)|
project MaxCPUUsage = val1, MaxMemoryRSS = val2;`
query = strings.ReplaceAll(query, "{subscription-id}", subscriptionId)
query = strings.ReplaceAll(query, "{resource-group}", resourceGroup)
query = strings.ReplaceAll(query, "{cluster-name}", clusterName)
query = strings.ReplaceAll(query, "{start-tate-time}", startDateString)
query = strings.ReplaceAll(query, "{end-date-time}", endDateString)
return query
// deserializeAsMap deserializes the table JSON response from the log analytics API into a map
func deserializeAsMap(body []byte) (map[string]float64, error) {
var tableAsMap = make(map[string]float64)
var baseTable map[string][]map[string]interface{}
if err := json.Unmarshal(body, &baseTable); err != nil {
return nil, err
columns, ok := baseTable["tables"][0]["columns"].([]interface{})
if !ok {
return nil, ErrTypeConvert
if len(columns) == 0 {
return nil, ErrNoData
rows, ok := baseTable["tables"][0]["rows"].([]interface{})
if !ok {
return nil, ErrTypeConvert
if len(rows) == 0 {
return nil, ErrNoData
singleRow, ok := rows[0].([]interface{})
if !ok {
return nil, ErrTypeConvert
for i, col := range columns {
header, ok := col.(map[string]interface{})
if !ok {
return nil, ErrTypeConvert
name, ok := header["name"].(string)
if !ok {
return nil, ErrTypeConvert
value, ok := singleRow[i].(float64)
if !ok {
if singleRow[i] == nil {
zap.S().Warnf("%v is nil, setting to 0", name)
tableAsMap[name] = 0
return nil, ErrTypeConvert
tableAsMap[name] = value
return tableAsMap, nil
// requestTokenWithLogAnalyticsScope manually acquires a token with required scopes for log analytics REST endpoints
func requestTokenWithLogAnalyticsScope(ctx context.Context, tokenCredential azcore.TokenCredential) (*azcore.AccessToken, error) {
const scope = ""
token, err := tokenCredential.GetToken(ctx, policy.TokenRequestOptions{
Scopes: []string{scope},
if err != nil {
return nil, err
return &token, nil
