Skip to content

Instantly share code, notes, and snippets.

Last active February 15, 2024 01:30
Show Gist options
  • Save hfleitas/0916b2412a4f1a538df1735e51b7d6ca to your computer and use it in GitHub Desktop.
Save hfleitas/0916b2412a4f1a538df1735e51b7d6ca to your computer and use it in GitHub Desktop.
// #connect cluster('igniteadxsource.eastus2').database('Occupancy')
#connect cluster('adxpm10774.eastus').database('IoTAnalytics')
print("😍 ML 🤖")
// Python UnSupervised Learning
//Create a cusotm UDF to run K-Means clustering using Python plugin
.create-or-alter function with (folder = "Python") kmeans_sf_OccupDetc(tbl:(*),k:int,features:dynamic,cluster_col:string) {
let kwargs = pack('k', k, 'features', features, 'cluster_col', cluster_col);
let code =
'from sklearn.cluster import KMeans\n'
'k = kargs["k"]\n'
'features = kargs["features"]\n'
'cluster_col = kargs["cluster_col"]\n'
'km = KMeans(n_clusters=k)\n'
'df1 = df[features]\n'
'result = df\n'
'result[cluster_col] = km.labels_\n';
| evaluate python(typeof(*), code, kwargs)
// Invoke the custom UDF for KMeans clusters
| where EnqueuedTimeUTC > ago(7d)
| extend cluster_id=double(null)
| project EnqueuedTimeUTC, DeviceId, Temp, Humidity, cluster_id
| invoke kmeans_sf_OccupDetc(3, pack_array("Temp", "Humidity"), "cluster_id")
| sample 10
// Python Supervised Model
// Custom UDF to score based on pre-trained model
.create-or-alter function with (folder = "Python", skipvalidation = "true") classify_sf_OccupDetc(samples:(*), models_tbl:(name:string,timestamp:datetime,model:string), model_name:string, features_cols:dynamic, pred_col:string) {
let model_str = toscalar(ML_Models | where name == model_name | top 1 by timestamp desc | project model);
let kwargs = pack('smodel', model_str, 'features_cols', features_cols, 'pred_col', pred_col);
let code =
'import pickle\n'
'import binascii\n'
'smodel = kargs["smodel"]\n'
'features_cols = kargs["features_cols"]\n'
'pred_col = kargs["pred_col"]\n'
'bmodel = binascii.unhexlify(smodel)\n'
'clf1 = pickle.loads(bmodel)\n'
'df1 = df[features_cols]\n'
'predictions = clf1.predict(df1)\n'
'result = df\n'
'result[pred_col] = pd.DataFrame(predictions, columns=[pred_col])'
samples | evaluate python(typeof(*), code, kwargs)
.append ML_Models <| datatable (name: string, timestamp: datetime, model: string) [
| take 10
//Based on the Temp and Humidity - Is the room occupied?
| where EnqueuedTimeUTC > ago(15m)
| extend pred_Occupancy=bool(0)
| extend CO2=0, HumidityRatio=0
| invoke classify_sf_OccupDetc(ML_Models, 'Occupancy', pack_array('Temp', 'Humidity', 'BatteryLevel', 'CO2', 'HumidityRatio'), 'pred_Occupancy')
// Built-in forecasting & anomaly detection
//Lets forcast out 12 hours
let start = now()-1d;
let end = now();
| where EnqueuedTimeUTC between (start .. end)
| where DeviceId == '34a08293-348f-47b3-ad6d-2aa1ae7039d6'
| make-series AvgTemp=avg(Temp) default=real(null) on EnqueuedTimeUTC from start to end+12h step 1m
| extend NoGapsTemp=series_fill_linear(AvgTemp)
| project EnqueuedTimeUTC, NoGapsTemp
| extend forecast = series_decompose_forecast(NoGapsTemp, 720)
| render timechart with(title='Forecasting the next 15min by Time Series Decmposition')
//Are there any anomalies for this device?
let start = now()-24h;
let end = now();
| where EnqueuedTimeUTC between (start .. end)
| where DeviceId == '34a08293-348f-47b3-ad6d-2aa1ae7039d6'
| make-series AvgTemp=avg(Temp) default=real(null) on EnqueuedTimeUTC from start to end step 1m
| extend NoGapsTemp=series_fill_linear(AvgTemp)
| project EnqueuedTimeUTC, NoGapsTemp
| extend anomalies = series_decompose_anomalies(NoGapsTemp,1)
| render anomalychart with(anomalycolumns=anomalies)
//Lets make it less sensative
| where EnqueuedTimeUTC > ago(3d)
| where DeviceId == '34a08293-348f-47b3-ad6d-2aa1ae7039d6'
| make-series AvgTemp=avg(Temp) default=real(null) on EnqueuedTimeUTC from ago(3d) to now() step 1m
| extend NoGapsTemp=series_fill_linear(AvgTemp)
| project EnqueuedTimeUTC, NoGapsTemp
| extend anomalies = series_decompose_anomalies(NoGapsTemp,1.2)
| render anomalychart with(anomalycolumns=anomalies)
// split stats into panels
let start = now()-3d;
let end = now();
| where EnqueuedTimeUTC between (start .. end)
| where DeviceId == '34a08293-348f-47b3-ad6d-2aa1ae7039d6'
| make-series AvgTemp=avg(Temp) default=real(null) on EnqueuedTimeUTC from ago(3d) to now() step 1m
| extend NoGapsTemp=series_fill_linear(AvgTemp)
| project EnqueuedTimeUTC, NoGapsTemp
| extend (anomaly, deviation, seasonal) = series_decompose_anomalies(NoGapsTemp,1.2)
| render timechart with(ysplit=panels)
//What the anomalies I should focus on across all devices?
let start = now()-3d;
let end = now();
| where EnqueuedTimeUTC between (start .. end)
| make-series AvgTemp=avg(Temp) default=real(null) on EnqueuedTimeUTC from start to end step 1m by DeviceId
| extend NoGapsTemp=series_fill_linear(AvgTemp)
| project EnqueuedTimeUTC, DeviceId, NoGapsTemp
| extend anomalies = series_decompose_anomalies(NoGapsTemp, 1.5)
| mv-expand EnqueuedTimeUTC, anomalies, NoGapsTemp
| where anomalies == 1
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment