Skip to content

Instantly share code, notes, and snippets.

@Rohitrajak1807
Created July 8, 2022 09:47
Show Gist options
  • Save Rohitrajak1807/62220c7a9287e6818f83be13aab92df4 to your computer and use it in GitHub Desktop.
Save Rohitrajak1807/62220c7a9287e6818f83be13aab92df4 to your computer and use it in GitHub Desktop.
Illustrates the use of Log Analytics Query API to get Max CPU millicores and Max Memory RSS in bytes
// getVirtualNodeUsageStats fetches the virtual node max CPU milli-cores and max memory used in bytes within a span of
// startDateTime to endDateTime. KQL requires the client to ensure that these times are in UTC and ISO 8061 format(RFC3339)
func getVirtualNodeUsageStats(ctx context.Context, tokenCredential azcore.TokenCredential, subscriptionId, clusterName, resourceGroup string, startDateTime, endDateTime time.Time) (map[string]float64, error) {
client, err := armoperationalinsights.NewWorkspacesClient(subscriptionId, tokenCredential, nil)
if err != nil {
zap.S().Error(err)
return nil, err
}
workspace, err := client.Get(ctx, resourceGroup, workspaceName, nil)
if err != nil {
zap.S().Error(err)
return nil, err
}
logAnalyticsQueryURL := "https://api.loganalytics.io/v1/workspaces/{workspace-id}/query"
logAnalyticsQueryURL = strings.ReplaceAll(logAnalyticsQueryURL, "{workspace-id}", *workspace.Workspace.Properties.CustomerID)
var body struct {
Query string `json:"query"`
}
body.Query = getUsageMetricQuery(subscriptionId, resourceGroup, clusterName, startDateTime, endDateTime)
bodyBytes, err := json.Marshal(body)
if err != nil {
zap.S().Error(err)
return nil, err
}
token, err := requestTokenWithLogAnalyticsScope(ctx, tokenCredential)
if err != nil {
zap.S().Error(err)
return nil, err
}
bytes.NewBuffer(bodyBytes)
req, err := http.NewRequest(http.MethodPost, logAnalyticsQueryURL, bytes.NewBuffer(bodyBytes))
if err != nil {
zap.S().Error(err)
return nil, err
}
req.Header.Add("content-type", "application/json")
req.Header.Add("authorization", fmt.Sprintf("Bearer %s", token.Token))
response, err := http.DefaultClient.Do(req)
if err != nil {
zap.S().Error(err)
return nil, err
}
responseBody, err := ioutil.ReadAll(response.Body)
if err != nil {
zap.S().Error(err)
return nil, err
}
defer response.Body.Close()
stats, err := deserializeAsMap(responseBody)
if err != nil {
if err == ErrNoData {
zap.S().Warnf("%s, %s, %s, %s", err.Error(), resourceGroup, clusterName, *workspace.ID)
}
return nil, err
}
return stats, nil
}
// getUsageMetricQuery builds the KQL query for fetching max CPU and memory usage
func getUsageMetricQuery(subscriptionId, resourceGroup, clusterName string, startDate, endDate time.Time) string {
startDate = startDate.UTC()
endDate = endDate.UTC()
startDateString := startDate.Format(time.RFC3339)
endDateString := endDate.Format(time.RFC3339)
var query = `
let subscriptionId = '{subscription-id}';
let resourceGroup = '{resource-group}';
let clusterName = '{cluster-name}';
let startDateTime = datetime({start-tate-time});
let endDateTime = datetime({end-date-time});
let clusterId = strcat('/subscriptions/', subscriptionId, '/resourceGroups/', resourceGroup, '/providers/Microsoft.ContainerService/managedClusters/', clusterName);
let memoryUsageCounterName = 'memoryRssBytes';
let primaryInventory = KubePodInventory |
where TimeGenerated >= startDateTime |
where TimeGenerated < endDateTime |
where isnotempty(ClusterName) |
where isnotempty(Namespace) |
extend Node = Computer |
where ClusterId == clusterId |
where Node == 'virtual-node-aci-linux' |
project TimeGenerated, ClusterId, ClusterName, Namespace, ServiceName, Node = Computer, ControllerName, Pod = Name, ContainerInstance = ContainerName, ContainerID, InstanceName, PerfJoinKey = strcat(ClusterId, '/', ContainerName), ReadySinceNow = format_timespan(endDateTime - ContainerCreationTimeStamp, 'ddd.hh:mm:ss.fff'), Restarts = ContainerRestartCount, Status = ContainerStatus, ContainerStatusReason = columnifexists('ContainerStatusReason', ''), ControllerKind = ControllerKind, PodStatus, ControllerId = strcat(ClusterId, '/', Namespace, '/', ControllerName);
let latestContainersByController = primaryInventory |
where isnotempty(Node) |
summarize arg_max(TimeGenerated, *) by PerfJoinKey |
project ControllerId, PerfJoinKey;
let filteredMemoryUsage = Perf |
where TimeGenerated >= startDateTime |
where TimeGenerated < endDateTime |
where ObjectName == 'K8SContainer' |
where InstanceName startswith clusterId |
project TimeGenerated, CounterName, CounterValue, InstanceName, Node = Computer |
where Node == 'virtual-node-aci-linux';
let memoryUsageByController = filteredMemoryUsage |
where CounterName =~ memoryUsageCounterName |
extend PerfJoinKey = InstanceName |
summarize Value = max(CounterValue) by PerfJoinKey, CounterName |
join (latestContainersByController) on PerfJoinKey |
summarize Value = sum(Value) by ControllerId, CounterName |
project ControllerId, CounterName, MemoryAggregationValue = Value;
let CPUUsageCounterName = 'cpuUsageNanoCores';
let filteredCPUUsage = Perf |
where TimeGenerated >= startDateTime |
where TimeGenerated < endDateTime |
where ObjectName == 'K8SContainer' |
where InstanceName startswith clusterId |
project TimeGenerated, CounterName, CounterValue, InstanceName, Node = Computer |
where Node == 'virtual-node-aci-linux';
let CPUUsageByController = filteredCPUUsage |
where CounterName =~ CPUUsageCounterName |
extend PerfJoinKey = InstanceName |
summarize Value = max(CounterValue) by PerfJoinKey, CounterName |
join (latestContainersByController) on PerfJoinKey |
summarize Value = sum(Value) by ControllerId, CounterName |
project ControllerId, CounterName, CPUAggregationValue = Value/1000000;
let maxMemoryUsage = primaryInventory |
distinct ControllerId, ControllerName, ControllerKind, Namespace |
join kind=leftouter (memoryUsageByController) on ControllerId |
project MaxMemoryRSS = MemoryAggregationValue, ControllerId;
let maxCPUUsage = primaryInventory |
distinct ControllerId, ControllerName, ControllerKind, Namespace |
join kind=leftouter (CPUUsageByController) on ControllerId |
project MaxCPUUsage = CPUAggregationValue, ControllerId;
maxMemoryUsage |
join(maxCPUUsage) on ControllerId |
project MaxCPUUsage, MaxMemoryRSS |
summarize val1 = sum(MaxCPUUsage), val2 = sum(MaxMemoryRSS)|
project MaxCPUUsage = val1, MaxMemoryRSS = val2;`
query = strings.ReplaceAll(query, "{subscription-id}", subscriptionId)
query = strings.ReplaceAll(query, "{resource-group}", resourceGroup)
query = strings.ReplaceAll(query, "{cluster-name}", clusterName)
query = strings.ReplaceAll(query, "{start-tate-time}", startDateString)
query = strings.ReplaceAll(query, "{end-date-time}", endDateString)
return query
}
// deserializeAsMap deserializes the table JSON response from the log analytics API into a map
func deserializeAsMap(body []byte) (map[string]float64, error) {
var tableAsMap = make(map[string]float64)
var baseTable map[string][]map[string]interface{}
if err := json.Unmarshal(body, &baseTable); err != nil {
zap.S().Error(err)
return nil, err
}
columns, ok := baseTable["tables"][0]["columns"].([]interface{})
if !ok {
zap.S().Error(ErrTypeConvert)
return nil, ErrTypeConvert
}
if len(columns) == 0 {
return nil, ErrNoData
}
rows, ok := baseTable["tables"][0]["rows"].([]interface{})
if !ok {
zap.S().Error(ErrTypeConvert)
return nil, ErrTypeConvert
}
if len(rows) == 0 {
return nil, ErrNoData
}
singleRow, ok := rows[0].([]interface{})
if !ok {
zap.S().Error(ErrTypeConvert)
return nil, ErrTypeConvert
}
for i, col := range columns {
header, ok := col.(map[string]interface{})
if !ok {
zap.S().Error(ErrTypeConvert)
return nil, ErrTypeConvert
}
name, ok := header["name"].(string)
if !ok {
zap.S().Error(ErrTypeConvert)
return nil, ErrTypeConvert
}
value, ok := singleRow[i].(float64)
if !ok {
if singleRow[i] == nil {
zap.S().Warnf("%v is nil, setting to 0", name)
tableAsMap[name] = 0
continue
}
zap.S().Error(ErrTypeConvert)
return nil, ErrTypeConvert
}
tableAsMap[name] = value
}
return tableAsMap, nil
}
// requestTokenWithLogAnalyticsScope manually acquires a token with required scopes for log analytics REST endpoints
func requestTokenWithLogAnalyticsScope(ctx context.Context, tokenCredential azcore.TokenCredential) (*azcore.AccessToken, error) {
const scope = "https://api.loganalytics.io/.default"
token, err := tokenCredential.GetToken(ctx, policy.TokenRequestOptions{
Scopes: []string{scope},
})
if err != nil {
zap.S().Error(err)
return nil, err
}
return &token, nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment