Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save gustavomcarmo/ed3ddeb32374b9429a9fb410a3fb119b to your computer and use it in GitHub Desktop.
Save gustavomcarmo/ed3ddeb32374b9429a9fb410a3fb119b to your computer and use it in GitHub Desktop.
Databricks Pipeline Maintenance Jobs Clusters KQL
let ClustersInitializations = DatabricksClusters
| distinct ActionName, Response, RequestParams, TimeGenerated
| where ActionName in ('create', 'start', 'restart') and TimeGenerated > ago(30d)
| extend ResponseTxt = parse_json(Response)
| where ResponseTxt.statusCode == 200
| extend ResponseResultTxt = parse_json(tostring(ResponseTxt.result))
| extend RequestParamsTxt = parse_json(RequestParams)
| project ClusterId = case(ActionName == 'create', ResponseResultTxt.cluster_id,
ActionName in ('start', 'restart'), RequestParamsTxt.cluster_id, ''),
InitializationTime = TimeGenerated,
InitializationActionName = ActionName
| where ClusterId != ''
| order by ClusterId asc, InitializationTime asc;
let ClustersTerminations = DatabricksClusters
| distinct ActionName, Response, RequestParams, TimeGenerated
| where ActionName in ('delete', 'deleteResult') and TimeGenerated > ago(30d)
| extend ResponseTxt = parse_json(Response)
| where ResponseTxt.statusCode == 200
| extend RequestParamsTxt = parse_json(RequestParams)
| project ClusterId = case(ActionName == 'delete', RequestParamsTxt.cluster_id,
ActionName == 'deleteResult', RequestParamsTxt.clusterId, ''),
TerminationTime = TimeGenerated,
TerminationActionName = ActionName
| where ClusterId != ''
| order by ClusterId asc, TerminationTime asc;
let ClustersWorkersSet = DatabricksClusters
| distinct ActionName, Response, RequestParams, TimeGenerated
| where ActionName in ('createResult', 'startResult', 'restartResult', 'resizeResult') and TimeGenerated > ago(30d)
| extend ResponseTxt = parse_json(Response)
| where ResponseTxt.statusCode == 200
| extend ResponseResultTxt = parse_json(tostring(ResponseTxt.result))
| extend RequestParamsTxt = parse_json(RequestParams)
| project ClusterId = tostring(RequestParamsTxt.clusterId),
ClusterWorkers = toint(RequestParamsTxt.clusterWorkers),
WorkersSetTime = TimeGenerated,
WorkersSetActionName = ActionName
| where ClusterId != ''
| order by ClusterId asc, WorkersSetTime asc;
let PipelineMaintenanceJobs = DatabricksJobs
| where ActionName == 'create' and TimeGenerated > ago(30d)
| extend ResponseTxt = parse_json(Response)
| extend RequestParamsTxt = parse_json(RequestParams)
| where ResponseTxt.statusCode == 200
and RequestParamsTxt.job_type == 'MAINTENANCE_PIPELINE'
| extend RequestParamsScheduleTxt = parse_json(tostring(RequestParamsTxt.schedule))
| extend SplittedQuartzCronExpression = split(RequestParamsScheduleTxt.quartz_cron_expression, ' ')
| project JobName = tostring(RequestParamsTxt.name),
ScheduledTime = datetime_local_to_utc(todatetime(strcat(format_datetime(now(), 'yyyy-MM-dd '), strcat_delim(':', SplittedQuartzCronExpression[2], SplittedQuartzCronExpression[1], SplittedQuartzCronExpression[0]))), RequestParamsScheduleTxt.timezone_id)
| distinct JobName, ScheduledTime
| order by JobName asc;
let PipelineMaintenanceJobsClusters = DatabricksClusters
| distinct ActionName, Response, RequestParams, TimeGenerated
| where ActionName == 'create' and TimeGenerated > ago(30d)
| extend ResponseTxt = parse_json(Response)
| extend RequestParamsTxt = parse_json(RequestParams)
| where ResponseTxt.statusCode == 200
and RequestParamsTxt.cluster_creator == 'MAINTENANCE_LAUNCHER'
| extend ResponseResultTxt = parse_json(tostring(ResponseTxt.result))
| extend RequestParamsCustomTagsTxt = parse_json(tostring(RequestParamsTxt.custom_tags))
| extend SplittedJobName = split(RequestParamsCustomTagsTxt.RunName, ' - ')
| project ClusterId = tostring(ResponseResultTxt.cluster_id),
JobId = tostring(RequestParamsCustomTagsTxt.JobId),
JobName = tostring(RequestParamsCustomTagsTxt.RunName),
PipelineId = tostring(SplittedJobName[2]),
PipelineName = tostring(SplittedJobName[1])
| distinct ClusterId, JobId, JobName, PipelineId, PipelineName
| join kind=leftouter (PipelineMaintenanceJobs)
on JobName
| project ClusterId, JobId, JobName, PipelineId, PipelineName, ScheduledTime
| order by ClusterId asc;
let ClustersNames = DatabricksClusters
| distinct ActionName, Response, RequestParams, TimeGenerated
| where TimeGenerated > ago(30d)
| extend ResponseTxt = parse_json(Response)
| where ResponseTxt.statusCode == 200
| extend ResponseResultTxt = parse_json(tostring(ResponseTxt.result))
| extend RequestParamsTxt = parse_json(RequestParams)
| project ClusterId = case(ActionName == 'create', ResponseResultTxt.cluster_id,
ActionName in ('delete', 'start', 'restart'), RequestParamsTxt.cluster_id,
ActionName in ('createResult', 'deleteResult', 'startResult', 'restartResult', 'resizeResult'), RequestParamsTxt.clusterId, ''),
ClusterName = case(ActionName == 'create', RequestParamsTxt.cluster_name,
ActionName in ('createResult', 'deleteResult', 'startResult', 'restartResult', 'resizeResult'), RequestParamsTxt.clusterName, '')
| where ClusterId != '' and ClusterName != ''
| distinct ClusterId, ClusterName
| order by ClusterId asc;
let PipelinesUpdates = DatabricksDeltaPipelines
| distinct ActionName, Response, RequestParams, TimeGenerated
| where ActionName == 'startUpdate' and TimeGenerated > ago(30d)
| extend ResponseTxt = parse_json(Response)
| where ResponseTxt.statusCode == 200
| extend RequestParamsTxt = parse_json(RequestParams)
| project PipelineId = tostring(RequestParamsTxt.pipeline_id),
UpdateTime = TimeGenerated
| sort by PipelineId asc, UpdateTime asc;
ClustersInitializations
| join kind=inner (ClustersTerminations)
on ClusterId
| project ClusterId, InitializationTime, InitializationActionName, TerminationTime
| where InitializationTime < TerminationTime
| summarize MinTerminationTime = min(TerminationTime) by ClusterId, InitializationTime, InitializationActionName
| summarize MinInitializationTime = min(InitializationTime) by ClusterId, MinTerminationTime
| project ClusterId, InitializationTime = MinInitializationTime, TerminationTime = MinTerminationTime
| join kind=inner (ClustersWorkersSet)
on ClusterId
| project ClusterId, InitializationTime, TerminationTime, ClusterWorkers, WorkersSetTime
| where WorkersSetTime between (InitializationTime .. TerminationTime)
| summarize MaxClusterWorkers = max(ClusterWorkers) by ClusterId, InitializationTime, TerminationTime
| join kind=leftouter (ClustersNames)
on ClusterId
| join kind=inner (PipelineMaintenanceJobsClusters)
on ClusterId
| join kind=leftouter (PipelinesUpdates)
on PipelineId
| where UpdateTime < InitializationTime
| summarize LastUpdateTime = max(UpdateTime) by PipelineName, ScheduledTime, InitializationTime, TerminationTime, MaxClusterWorkers
| project PipelineName, LastUpdateTime, ScheduledTime, InitializationTime, TimeAfterUpdate = InitializationTime - LastUpdateTime, TerminationTime, Duration = TerminationTime - InitializationTime, MaxClusterNodes = MaxClusterWorkers + 1
| order by PipelineName asc, InitializationTime asc
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment