Skip to content

Instantly share code, notes, and snippets.

@vjrantal
Last active May 16, 2019 05:42
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save vjrantal/29cd4881a9f65ad803daadabd56c1f68 to your computer and use it in GitHub Desktop.
Save vjrantal/29cd4881a9f65ad803daadabd56c1f68 to your computer and use it in GitHub Desktop.
Update policy sample to expand JSON arrays (note that below payload sample could also be ingested directly using multijson format - see multijson.csl)
// Create staging table that holds the JSON data as it comes via the Event Hubs.
// If the staging table is not needed, a zero retention policy can be set using:
// https://docs.microsoft.com/en-us/azure/kusto/management/retention-policy#alter-retention-policy
.create table Staging (Data: dynamic)
// Create the table for the expanded messages. See here for an example what kind of input
// could be mapped to a table like this:
// https://gist.github.com/vjrantal/29cd4881a9f65ad803daadabd56c1f68#file-payload-json
.create table Messages (ApplicationUri: string, DisplayName: string, Value: real, NodeId: string, SourceTimestamp: datetime)
// Create a data connection between the Event Hubs and the Staging table using TXT format
// and leaving the column mapping empty.
// Create a function that gets called for data incoming to the Staging table.
// This expands the array to individual rows which in the case of above-linked
// sample input would have columns ApplicationUri, DisplayName, Value, NodeId and SourceTimestamp.
// The last project line is to ensure the columns are always in same order and can be used for additional
// type conversion such as Value=toreal(Value) to convert to the real type.
.create-or-alter function ExpandFunction()
{
Staging
| mv-expand ['Data']
| evaluate bag_unpack(Data)
| evaluate bag_unpack(Value)
| project ApplicationUri=ApplicationUri, DisplayName=DisplayName, Value=toreal(Value), NodeId=NodeId, SourceTimestamp=SourceTimestamp
}
// Create a transactional update policy. Transactional is used, because in this case, the wanted result is that
// if the update policy fails, the entire ingestion of that particular data input should fail.
.alter table Messages policy update @'[{"IsEnabled": true, "Source": "Staging", "Query": "ExpandFunction()", "IsTransactional": true}]'
// Create the table for the messages. See here for an example what kind of input
// could be mapped to a table like this:
// https://gist.github.com/vjrantal/29cd4881a9f65ad803daadabd56c1f68#file-payload-json
.create table Messages (ApplicationUri: string, DisplayName: string, Value: real, NodeId: string, SourceTimestamp: datetime)
// Create mapping from JSON to the table columns
.create table Messages ingestion json mapping "MultiJSONMapping" '[{"column":"ApplicationUri","path":"$.ApplicationUri"},{"column":"DisplayName","path":"$.DisplayName"},{"column":"NodeId","path":"$.NodeId"},{"column":"Value","path":"$.Value.Value"},{"column":"SourceTimestamp","path":"$.Value.SourceTimestamp"}]'
// Create a data connection between the Event Hubs and the Messages table using Multiline JSON format
// and the column mapping above named "MultiJSONMapping".
// If you want to test the ingestion inline before setting up the Event Hubs connection, you can run:
.ingest inline into table Messages
with (format="multijson", jsonMapping='[{"column":"ApplicationUri","path":"$.ApplicationUri"},{"column":"DisplayName","path":"$.DisplayName"},{"column":"NodeId","path":"$.NodeId"},{"column":"Value","path":"$.Value.Value"},{"column":"SourceTimestamp","path":"$.Value.SourceTimestamp"}]') <|
[
{"ApplicationUri":"urn:beijing004:ua:beijing:prodline3:assembly","DisplayName":"NumberOfManufacturedProducts","NodeId":"ns=2;i=385","Value":{"SourceTimestamp":"2019-05-12T11:46:36.300284Z","Value":24484}}
]
Messages | take 1
[
{
"ApplicationUri": "urn:beijing004:ua:beijing:prodline3:assembly",
"DisplayName": "NumberOfManufacturedProducts",
"NodeId": "ns=2;i=385",
"Value": {
"SourceTimestamp": "2019-05-12T11:46:36.300284Z",
"Value": 24484
}
},
{
"ApplicationUri": "urn:beijing004:ua:beijing:prodline3:assembly",
"DisplayName": "EnergyConsumption",
"NodeId": "ns=2;i=406",
"Value": {
"SourceTimestamp": "2019-05-12T11:46:36.300284Z",
"Value": 0.2540555555555556
}
}
]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment