Last active
July 3, 2023 20:19
-
-
Save gustavomcarmo/ed3ddeb32374b9429a9fb410a3fb119b to your computer and use it in GitHub Desktop.
Databricks Pipeline Maintenance Jobs Clusters KQL
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
let ClustersInitializations = DatabricksClusters | |
| distinct ActionName, Response, RequestParams, TimeGenerated | |
| where ActionName in ('create', 'start', 'restart') and TimeGenerated > ago(30d) | |
| extend ResponseTxt = parse_json(Response) | |
| where ResponseTxt.statusCode == 200 | |
| extend ResponseResultTxt = parse_json(tostring(ResponseTxt.result)) | |
| extend RequestParamsTxt = parse_json(RequestParams) | |
| project ClusterId = case(ActionName == 'create', ResponseResultTxt.cluster_id, | |
ActionName in ('start', 'restart'), RequestParamsTxt.cluster_id, ''), | |
InitializationTime = TimeGenerated, | |
InitializationActionName = ActionName | |
| where ClusterId != '' | |
| order by ClusterId asc, InitializationTime asc; | |
let ClustersTerminations = DatabricksClusters | |
| distinct ActionName, Response, RequestParams, TimeGenerated | |
| where ActionName in ('delete', 'deleteResult') and TimeGenerated > ago(30d) | |
| extend ResponseTxt = parse_json(Response) | |
| where ResponseTxt.statusCode == 200 | |
| extend RequestParamsTxt = parse_json(RequestParams) | |
| project ClusterId = case(ActionName == 'delete', RequestParamsTxt.cluster_id, | |
ActionName == 'deleteResult', RequestParamsTxt.clusterId, ''), | |
TerminationTime = TimeGenerated, | |
TerminationActionName = ActionName | |
| where ClusterId != '' | |
| order by ClusterId asc, TerminationTime asc; | |
let ClustersWorkersSet = DatabricksClusters | |
| distinct ActionName, Response, RequestParams, TimeGenerated | |
| where ActionName in ('createResult', 'startResult', 'restartResult', 'resizeResult') and TimeGenerated > ago(30d) | |
| extend ResponseTxt = parse_json(Response) | |
| where ResponseTxt.statusCode == 200 | |
| extend ResponseResultTxt = parse_json(tostring(ResponseTxt.result)) | |
| extend RequestParamsTxt = parse_json(RequestParams) | |
| project ClusterId = tostring(RequestParamsTxt.clusterId), | |
ClusterWorkers = toint(RequestParamsTxt.clusterWorkers), | |
WorkersSetTime = TimeGenerated, | |
WorkersSetActionName = ActionName | |
| where ClusterId != '' | |
| order by ClusterId asc, WorkersSetTime asc; | |
let PipelineMaintenanceJobs = DatabricksJobs | |
| where ActionName == 'create' and TimeGenerated > ago(30d) | |
| extend ResponseTxt = parse_json(Response) | |
| extend RequestParamsTxt = parse_json(RequestParams) | |
| where ResponseTxt.statusCode == 200 | |
and RequestParamsTxt.job_type == 'MAINTENANCE_PIPELINE' | |
| extend RequestParamsScheduleTxt = parse_json(tostring(RequestParamsTxt.schedule)) | |
| extend SplittedQuartzCronExpression = split(RequestParamsScheduleTxt.quartz_cron_expression, ' ') | |
| project JobName = tostring(RequestParamsTxt.name), | |
ScheduledTime = datetime_local_to_utc(todatetime(strcat(format_datetime(now(), 'yyyy-MM-dd '), strcat_delim(':', SplittedQuartzCronExpression[2], SplittedQuartzCronExpression[1], SplittedQuartzCronExpression[0]))), RequestParamsScheduleTxt.timezone_id) | |
| distinct JobName, ScheduledTime | |
| order by JobName asc; | |
let PipelineMaintenanceJobsClusters = DatabricksClusters | |
| distinct ActionName, Response, RequestParams, TimeGenerated | |
| where ActionName == 'create' and TimeGenerated > ago(30d) | |
| extend ResponseTxt = parse_json(Response) | |
| extend RequestParamsTxt = parse_json(RequestParams) | |
| where ResponseTxt.statusCode == 200 | |
and RequestParamsTxt.cluster_creator == 'MAINTENANCE_LAUNCHER' | |
| extend ResponseResultTxt = parse_json(tostring(ResponseTxt.result)) | |
| extend RequestParamsCustomTagsTxt = parse_json(tostring(RequestParamsTxt.custom_tags)) | |
| extend SplittedJobName = split(RequestParamsCustomTagsTxt.RunName, ' - ') | |
| project ClusterId = tostring(ResponseResultTxt.cluster_id), | |
JobId = tostring(RequestParamsCustomTagsTxt.JobId), | |
JobName = tostring(RequestParamsCustomTagsTxt.RunName), | |
PipelineId = tostring(SplittedJobName[2]), | |
PipelineName = tostring(SplittedJobName[1]) | |
| distinct ClusterId, JobId, JobName, PipelineId, PipelineName | |
| join kind=leftouter (PipelineMaintenanceJobs) | |
on JobName | |
| project ClusterId, JobId, JobName, PipelineId, PipelineName, ScheduledTime | |
| order by ClusterId asc; | |
let ClustersNames = DatabricksClusters | |
| distinct ActionName, Response, RequestParams, TimeGenerated | |
| where TimeGenerated > ago(30d) | |
| extend ResponseTxt = parse_json(Response) | |
| where ResponseTxt.statusCode == 200 | |
| extend ResponseResultTxt = parse_json(tostring(ResponseTxt.result)) | |
| extend RequestParamsTxt = parse_json(RequestParams) | |
| project ClusterId = case(ActionName == 'create', ResponseResultTxt.cluster_id, | |
ActionName in ('delete', 'start', 'restart'), RequestParamsTxt.cluster_id, | |
ActionName in ('createResult', 'deleteResult', 'startResult', 'restartResult', 'resizeResult'), RequestParamsTxt.clusterId, ''), | |
ClusterName = case(ActionName == 'create', RequestParamsTxt.cluster_name, | |
ActionName in ('createResult', 'deleteResult', 'startResult', 'restartResult', 'resizeResult'), RequestParamsTxt.clusterName, '') | |
| where ClusterId != '' and ClusterName != '' | |
| distinct ClusterId, ClusterName | |
| order by ClusterId asc; | |
let PipelinesUpdates = DatabricksDeltaPipelines | |
| distinct ActionName, Response, RequestParams, TimeGenerated | |
| where ActionName == 'startUpdate' and TimeGenerated > ago(30d) | |
| extend ResponseTxt = parse_json(Response) | |
| where ResponseTxt.statusCode == 200 | |
| extend RequestParamsTxt = parse_json(RequestParams) | |
| project PipelineId = tostring(RequestParamsTxt.pipeline_id), | |
UpdateTime = TimeGenerated | |
| sort by PipelineId asc, UpdateTime asc; | |
ClustersInitializations | |
| join kind=inner (ClustersTerminations) | |
on ClusterId | |
| project ClusterId, InitializationTime, InitializationActionName, TerminationTime | |
| where InitializationTime < TerminationTime | |
| summarize MinTerminationTime = min(TerminationTime) by ClusterId, InitializationTime, InitializationActionName | |
| summarize MinInitializationTime = min(InitializationTime) by ClusterId, MinTerminationTime | |
| project ClusterId, InitializationTime = MinInitializationTime, TerminationTime = MinTerminationTime | |
| join kind=inner (ClustersWorkersSet) | |
on ClusterId | |
| project ClusterId, InitializationTime, TerminationTime, ClusterWorkers, WorkersSetTime | |
| where WorkersSetTime between (InitializationTime .. TerminationTime) | |
| summarize MaxClusterWorkers = max(ClusterWorkers) by ClusterId, InitializationTime, TerminationTime | |
| join kind=leftouter (ClustersNames) | |
on ClusterId | |
| join kind=inner (PipelineMaintenanceJobsClusters) | |
on ClusterId | |
| join kind=leftouter (PipelinesUpdates) | |
on PipelineId | |
| where UpdateTime < InitializationTime | |
| summarize LastUpdateTime = max(UpdateTime) by PipelineName, ScheduledTime, InitializationTime, TerminationTime, MaxClusterWorkers | |
| project PipelineName, LastUpdateTime, ScheduledTime, InitializationTime, TimeAfterUpdate = InitializationTime - LastUpdateTime, TerminationTime, Duration = TerminationTime - InitializationTime, MaxClusterNodes = MaxClusterWorkers + 1 | |
| order by PipelineName asc, InitializationTime asc |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment