Skip to content

Instantly share code, notes, and snippets.

@hfleitas
Last active March 12, 2024 13:25
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save hfleitas/060838e0e8eeb782dffead1de77ccd71 to your computer and use it in GitHub Desktop.
Save hfleitas/060838e0e8eeb782dffead1de77ccd71 to your computer and use it in GitHub Desktop.
1brc.kql
// 1️⃣🐝🏎️🤿 (1BRC in Kusto 🏁 10.38ms)
// ref: https://github.com/gunnarmorling/1brc
.execute database script <|
.create-merge table 1brc (station:string, measurement:real)
.create-or-alter table 1brc ingestion csv mapping '1brc_mapping' '[{"column":"station", "Properties":{"Ordinal":"0"}},{"column":"measurement", "Properties":{"Ordinal":"1"}}]'
.ingest async into table 1brc (@'https://iotmonitoringsa26915.blob.core.windows.net/1brc/splits/measurements_split_aa') with (format='scsv',ignoreSizeLimit=true,ingestionMappingReference='1brc_mapping',ingestionMappingType='csv')
.ingest async into table 1brc (@'https://iotmonitoringsa26915.blob.core.windows.net/1brc/splits/measurements_split_ab') with (format='scsv',ignoreSizeLimit=true,ingestionMappingReference='1brc_mapping',ingestionMappingType='csv')
.ingest async into table 1brc (@'https://iotmonitoringsa26915.blob.core.windows.net/1brc/splits/measurements_split_ac') with (format='scsv',ignoreSizeLimit=true,ingestionMappingReference='1brc_mapping',ingestionMappingType='csv')
.ingest async into table 1brc (@'https://iotmonitoringsa26915.blob.core.windows.net/1brc/splits/measurements_split_ad') with (format='scsv',ignoreSizeLimit=true,ingestionMappingReference='1brc_mapping',ingestionMappingType='csv')
.ingest async into table 1brc (@'https://iotmonitoringsa26915.blob.core.windows.net/1brc/splits/measurements_split_ae') with (format='scsv',ignoreSizeLimit=true,ingestionMappingReference='1brc_mapping',ingestionMappingType='csv')
.ingest async into table 1brc (@'https://iotmonitoringsa26915.blob.core.windows.net/1brc/splits/measurements_split_af') with (format='scsv',ignoreSizeLimit=true,ingestionMappingReference='1brc_mapping',ingestionMappingType='csv')
.ingest async into table 1brc (@'https://iotmonitoringsa26915.blob.core.windows.net/1brc/splits/measurements_split_ag') with (format='scsv',ignoreSizeLimit=true,ingestionMappingReference='1brc_mapping',ingestionMappingType='csv')
.ingest async into table 1brc (@'https://iotmonitoringsa26915.blob.core.windows.net/1brc/splits/measurements_split_ah') with (format='scsv',ignoreSizeLimit=true,ingestionMappingReference='1brc_mapping',ingestionMappingType='csv')
.ingest async into table 1brc (@'https://iotmonitoringsa26915.blob.core.windows.net/1brc/splits/measurements_split_ai') with (format='scsv',ignoreSizeLimit=true,ingestionMappingReference='1brc_mapping',ingestionMappingType='csv')
.ingest async into table 1brc (@'https://iotmonitoringsa26915.blob.core.windows.net/1brc/splits/measurements_split_aj') with (format='scsv',ignoreSizeLimit=true,ingestionMappingReference='1brc_mapping',ingestionMappingType='csv')
.ingest async into table 1brc (@'https://iotmonitoringsa26915.blob.core.windows.net/1brc/splits/measurements_split_ak') with (format='scsv',ignoreSizeLimit=true,ingestionMappingReference='1brc_mapping',ingestionMappingType='csv')
.ingest async into table 1brc (@'https://iotmonitoringsa26915.blob.core.windows.net/1brc/splits/measurements_split_al') with (format='scsv',ignoreSizeLimit=true,ingestionMappingReference='1brc_mapping',ingestionMappingType='csv')
.ingest async into table 1brc (@'https://iotmonitoringsa26915.blob.core.windows.net/1brc/splits/measurements_split_am') with (format='scsv',ignoreSizeLimit=true,ingestionMappingReference='1brc_mapping',ingestionMappingType='csv')
////////////////
// partioning //
////////////////
.execute database script <|
.create-merge table measurements_part (station:string, measurement:real)
.alter table measurements_part policy partitioning ```
{
"PartitionKeys": [
{
"ColumnName": "station",
"Kind": "Hash",
"Properties": {
"Function": "XxHash64",
"MaxPartitionCount": 128,
"Seed": 1,
"PartitionAssignmentMode": "Uniform"
}
}
]
}```
.create table measurements_part ingestion csv mapping 'measurements_part_mapping' '[{"column":"station", "Properties":{"Ordinal":"0"}},{"column":"measurement", "Properties":{"Ordinal":"1"}}]'
.ingest async into table measurements_part (@'https://iotmonitoringsa26915.blob.core.windows.net/1brc/splits/measurements_split_aa') with (format='scsv',ignoreSizeLimit=true,ingestionMappingReference='measurements_part_mapping',ingestionMappingType='csv')
.ingest async into table measurements_part (@'https://iotmonitoringsa26915.blob.core.windows.net/1brc/splits/measurements_split_ab') with (format='scsv',ignoreSizeLimit=true,ingestionMappingReference='measurements_part_mapping',ingestionMappingType='csv')
.ingest async into table measurements_part (@'https://iotmonitoringsa26915.blob.core.windows.net/1brc/splits/measurements_split_ac') with (format='scsv',ignoreSizeLimit=true,ingestionMappingReference='measurements_part_mapping',ingestionMappingType='csv')
.ingest async into table measurements_part (@'https://iotmonitoringsa26915.blob.core.windows.net/1brc/splits/measurements_split_ad') with (format='scsv',ignoreSizeLimit=true,ingestionMappingReference='measurements_part_mapping',ingestionMappingType='csv')
.ingest async into table measurements_part (@'https://iotmonitoringsa26915.blob.core.windows.net/1brc/splits/measurements_split_ae') with (format='scsv',ignoreSizeLimit=true,ingestionMappingReference='measurements_part_mapping',ingestionMappingType='csv')
.ingest async into table measurements_part (@'https://iotmonitoringsa26915.blob.core.windows.net/1brc/splits/measurements_split_af') with (format='scsv',ignoreSizeLimit=true,ingestionMappingReference='measurements_part_mapping',ingestionMappingType='csv')
.ingest async into table measurements_part (@'https://iotmonitoringsa26915.blob.core.windows.net/1brc/splits/measurements_split_ag') with (format='scsv',ignoreSizeLimit=true,ingestionMappingReference='measurements_part_mapping',ingestionMappingType='csv')
.ingest async into table measurements_part (@'https://iotmonitoringsa26915.blob.core.windows.net/1brc/splits/measurements_split_ah') with (format='scsv',ignoreSizeLimit=true,ingestionMappingReference='measurements_part_mapping',ingestionMappingType='csv')
.ingest async into table measurements_part (@'https://iotmonitoringsa26915.blob.core.windows.net/1brc/splits/measurements_split_ai') with (format='scsv',ignoreSizeLimit=true,ingestionMappingReference='measurements_part_mapping',ingestionMappingType='csv')
.ingest async into table measurements_part (@'https://iotmonitoringsa26915.blob.core.windows.net/1brc/splits/measurements_split_aj') with (format='scsv',ignoreSizeLimit=true,ingestionMappingReference='measurements_part_mapping',ingestionMappingType='csv')
.ingest async into table measurements_part (@'https://iotmonitoringsa26915.blob.core.windows.net/1brc/splits/measurements_split_ak') with (format='scsv',ignoreSizeLimit=true,ingestionMappingReference='measurements_part_mapping',ingestionMappingType='csv')
.ingest async into table measurements_part (@'https://iotmonitoringsa26915.blob.core.windows.net/1brc/splits/measurements_split_al') with (format='scsv',ignoreSizeLimit=true,ingestionMappingReference='measurements_part_mapping',ingestionMappingType='csv')
.ingest async into table measurements_part (@'https://iotmonitoringsa26915.blob.core.windows.net/1brc/splits/measurements_split_am') with (format='scsv',ignoreSizeLimit=true,ingestionMappingReference='measurements_part_mapping',ingestionMappingType='csv')
.show table measurements_part details
.show table 1brc details
.show database extents partitioning statistics
/////////////////
// now we wait //
/////////////////
.show ingestion failures
1brc
| count
1brc
| distinct station
| count
1brc
| summarize count() by station
| summarize avg(count_)
// my 1brc - run it 5 times //
set query_results_cache_max_age = time(5m);
1brc
| summarize min(measurement), avg(measurement), max(measurement) by station
// formatting //
// {Chongqing=4.7/18.9/33.0, Kampala=9.4/18.1/31.1, Lodwar=16.9/31.9/44.8, Stockholm=-0.1/8.1/18.1, Zürich=8.1/15.6/20.6}
1brc
| summarize min_measurement=min(measurement), mean_measurement=round(avg(measurement),1), max_measurement=max(measurement) by station_name=station
| order by station_name asc
| project 1brc=strcat(station_name,'=',min_measurement,'/', mean_measurement, '/', max_measurement)
| summarize 1brc=make_list(todynamic(1brc))
| project 1brc=replace_strings(tostring(1brc),dynamic(['[', ']', '"', ',']), dynamic(['{', '}', '', ', ']))
// result rules //
// discard slowest & fastest, get mean of remaining 3 out of 5.
.show commands-and-queries;
$command_results
| where Text has ```set query_results_cache_max_age = time(5m);
1brc
| summarize min(measurement), avg(measurement), max(measurement) by station```
| top 5 by Duration asc
| extend rn=row_number()
| project rn, Duration, Text, StartedOn
| where rn !in (1,5)
| summarize avg(Duration)
| extend Milliseconds=avg_Duration/totimespan('00:00:00.001')
//00:00:02.2571198, 4 core 8 GB ram (free-personal clus) 🤩
//00:00:00.5107739, 32 core 128 GB ram (Standard_D32d_v4)
//00:00:00.3125790, 16 vcpu, 128 GB ram (Ls_v3 - fabric kqldb 5 nodes) 🤯
//00:00:00.6666997, 16 vcpu, 128 GB ram (Ls_v3 - fabric kqldb 2 nodes)...what if we try partioning?
//00:00:00.6042353, 16 vcpu, 128 GB ram (Ls_v3 - fabric kqldb 2 nodes)...partioned
//00:00:00.0103881, 16 vcpu, 128 GB ram (Ls_v3 - fabric kqldb 2 nodes)...query result cache, not partioned. ⚡
.show cluster
// 1 node, 4 vcpu, 8 GB ram
// 2 nodes, 32 vcpu, 128 GB ram
// 5 nodes, 16 vcpu, 128 GB ram
// 2 nodes, 16 vcpu, 128 GB ram
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment