Last active
May 16, 2019 05:42
-
-
Save vjrantal/29cd4881a9f65ad803daadabd56c1f68 to your computer and use it in GitHub Desktop.
Update policy sample to expand JSON arrays (note that below payload sample could also be ingested directly using multijson format - see multijson.csl)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Create staging table that holds the JSON data as it comes via the Event Hubs. | |
// If the staging table is not needed, a zero retention policy can be set using: | |
// https://docs.microsoft.com/en-us/azure/kusto/management/retention-policy#alter-retention-policy | |
.create table Staging (Data: dynamic) | |
// Create the table for the expanded messages. See here for an example what kind of input | |
// could be mapped to a table like this: | |
// https://gist.github.com/vjrantal/29cd4881a9f65ad803daadabd56c1f68#file-payload-json | |
.create table Messages (ApplicationUri: string, DisplayName: string, Value: real, NodeId: string, SourceTimestamp: datetime) | |
// Create a data connection between the Event Hubs and the Staging table using TXT format | |
// and leaving the column mapping empty. | |
// Create a function that gets called for data incoming to the Staging table. | |
// This expands the array to individual rows which in the case of above-linked | |
// sample input would have columns ApplicationUri, DisplayName, Value, NodeId and SourceTimestamp. | |
// The last project line is to ensure the columns are always in same order and can be used for additional | |
// type conversion such as Value=toreal(Value) to convert to the real type. | |
.create-or-alter function ExpandFunction() | |
{ | |
Staging | |
| mv-expand ['Data'] | |
| evaluate bag_unpack(Data) | |
| evaluate bag_unpack(Value) | |
| project ApplicationUri=ApplicationUri, DisplayName=DisplayName, Value=toreal(Value), NodeId=NodeId, SourceTimestamp=SourceTimestamp | |
} | |
// Create a transactional update policy. Transactional is used, because in this case, the wanted result is that | |
// if the update policy fails, the entire ingestion of that particular data input should fail. | |
.alter table Messages policy update @'[{"IsEnabled": true, "Source": "Staging", "Query": "ExpandFunction()", "IsTransactional": true}]' |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Create the table for the messages. See here for an example what kind of input | |
// could be mapped to a table like this: | |
// https://gist.github.com/vjrantal/29cd4881a9f65ad803daadabd56c1f68#file-payload-json | |
.create table Messages (ApplicationUri: string, DisplayName: string, Value: real, NodeId: string, SourceTimestamp: datetime) | |
// Create mapping from JSON to the table columns | |
.create table Messages ingestion json mapping "MultiJSONMapping" '[{"column":"ApplicationUri","path":"$.ApplicationUri"},{"column":"DisplayName","path":"$.DisplayName"},{"column":"NodeId","path":"$.NodeId"},{"column":"Value","path":"$.Value.Value"},{"column":"SourceTimestamp","path":"$.Value.SourceTimestamp"}]' | |
// Create a data connection between the Event Hubs and the Messages table using Multiline JSON format | |
// and the column mapping above named "MultiJSONMapping". | |
// If you want to test the ingestion inline before setting up the Event Hubs connection, you can run: | |
.ingest inline into table Messages | |
with (format="multijson", jsonMapping='[{"column":"ApplicationUri","path":"$.ApplicationUri"},{"column":"DisplayName","path":"$.DisplayName"},{"column":"NodeId","path":"$.NodeId"},{"column":"Value","path":"$.Value.Value"},{"column":"SourceTimestamp","path":"$.Value.SourceTimestamp"}]') <| | |
[ | |
{"ApplicationUri":"urn:beijing004:ua:beijing:prodline3:assembly","DisplayName":"NumberOfManufacturedProducts","NodeId":"ns=2;i=385","Value":{"SourceTimestamp":"2019-05-12T11:46:36.300284Z","Value":24484}} | |
] | |
Messages | take 1 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[ | |
{ | |
"ApplicationUri": "urn:beijing004:ua:beijing:prodline3:assembly", | |
"DisplayName": "NumberOfManufacturedProducts", | |
"NodeId": "ns=2;i=385", | |
"Value": { | |
"SourceTimestamp": "2019-05-12T11:46:36.300284Z", | |
"Value": 24484 | |
} | |
}, | |
{ | |
"ApplicationUri": "urn:beijing004:ua:beijing:prodline3:assembly", | |
"DisplayName": "EnergyConsumption", | |
"NodeId": "ns=2;i=406", | |
"Value": { | |
"SourceTimestamp": "2019-05-12T11:46:36.300284Z", | |
"Value": 0.2540555555555556 | |
} | |
} | |
] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment