Skip to content

Instantly share code, notes, and snippets.

@starzia
Last active May 6, 2022 15:04
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save starzia/9d1f8a25a2e2e2124b78e2da71159602 to your computer and use it in GitHub Desktop.
Save starzia/9d1f8a25a2e2e2124b78e2da71159602 to your computer and use it in GitHub Desktop.
//-----------------------------------------------------------------------------------------
// helpers
function getMillis() {
const d = new Date();
return d.getTime();
}
function runtime(f) {
var start = getMillis();
f();
print("elapsed time: " + (getMillis() - start) + " ms")
}
//-----------------------------------------------------------------------------------------
// Generate sequential data. Buckets have 1000 values with one or two consecutive timestamps
function initDb(SAMPLES, isSharded) {
db.telemetry.drop();
db.createCollection(
"telemetry",
{
timeseries: {
timeField: "timestamp",
metaField: "metadata",
granularity: "seconds"
}
}
);
if (isSharded) {
sh.enableSharding("test");
sh.shardCollection("test.telemetry", {"metadata.sensorId": "hashed"})
}
function getRandomInt(max) {
return Math.floor(Math.random() * max);
}
var NUM_SENSORS = 100;
var sensors = Array.from(Array(NUM_SENSORS).keys());
var BATCH_SIZE = 1000;
for (let i=0; i<SAMPLES/BATCH_SIZE; i++) {
var batch = [];
for (let j=0; j<BATCH_SIZE; j++) {
var r = getRandomInt(10000000);
var d = new Date(((i*BATCH_SIZE) + j) * 1000);
batch.push({
"metadata": {
"sensorId": sensors[r % sensors.length],
"type": "temperature"
},
"timestamp": d,
"temp": (r % 987) / 10.0
});
}
db.telemetry.insertMany(batch);
}
db.telemetry.createIndex({"metadata.sensorId":1, "timestamp":-1});
}
//-----------------------------------------------------------------------------------------
// do some ad-hoc testing
initDb(10000000);
db.system.buckets.telemetry.findOne();
// USE CASE 1
// fails with:
// MongoServerError: PlanExecutor error during aggregation :: caused by :: Sort exceeded memory limit of 104857600 bytes, but did not opt in to external sorting.
db.telemetry.aggregate([
{$sort: {"metadata.sensorId": 1, "timestamp": 1}},
{$group: {
_id: "$metadata.sensorId",
ts: {$last: "$timestamp"},
temp: {$last: "$temp"}
}}
]);
// USE CASE 1 again
// note that the plan has an _internalUnpackBucket stage first. That's the bottleneck.
db.telemetry.explain("executionStats").aggregate([
{$sort: {"metadata.sensorId": 1, "timestamp": 1}},
{$group: {
_id: "$metadata.sensorId",
ts: {$last: "$timestamp"},
temp: {$last: "$temp"}
}}
], {allowDiskUse: true});
// USE CASE 2 (reverse):
// looks like it should use the index, but does not, due to bucket unpack
db.telemetry.explain("executionStats").aggregate([
{$sort: {"metadata.sensorId": 1, "timestamp": -1}},
{$group: {
_id: "$metadata.sensorId",
ts: {$first: "$timestamp"},
temp: {$first: "$temp"}
}}
], {allowDiskUse: true});
// USE CASE 3 (topN):
db.telemetry.explain("executionStats").aggregate([
{$group: {
_id: "$metadata.sensorId",
mostRecent: {$topN: {
n: 1,
sortBy: {"timestamp": -1},
output: {
ts: "$timestamp",
temp: "$temp"
}
}}
}}
], {allowDiskUse: true});
// USE CASE 4: $match
// execution time is ~100x faster because only matching buckets are scanned.
db.telemetry.explain("executionStats").aggregate([
{$match: {"metadata.sensorId": 11}},
{$sort: {"metadata.sensorId": 1, "timestamp": 1}},
{$group: {
_id: "$metadata.sensorId",
ts: {$last: "$timestamp"},
temp: {$last: "$temp"}
}}
], {allowDiskUse: true});
// USE CASE 5: sort by second key in compound index
// it's actually about 15% faster than the original
db.telemetry.explain("executionStats").aggregate([
{$sort: {"timestamp": 1}},
{$group: {
_id: "$metadata.sensorId",
ts: {$last: "$timestamp"},
temp: {$last: "$temp"}
}}
], {allowDiskUse: true});
// confirmed that all of the above give consistent answers (though variation 4 is for just one sensor).
// If we know the exact last timeseries value we can use the index
// as follows, giving an idea of the desired execution time.
db.telemetry.explain("executionStats").aggregate([
{$match: {"timestamp": ISODate("1970-01-01T00:00:00Z")}},
{$group: {
_id: "$metadata.sensorId",
ts: {$last: "$timestamp"},
temp: {$last: "$temp"}
}}
]);
// looking at just one sensor now
// using IDX scan now
db.telemetry.explain("executionStats").aggregate([
{$match: {
"metadata.sensorId": 11,
"timestamp": ISODate("1970-01-01T00:00:00Z")
}},
{$group: {
_id: "$metadata.sensorId",
ts: {$last: "$timestamp"},
temp: {$last: "$temp"}
}}
]);
// now try to get last points without knowing the timestamp.
// Use the planned design for PM-2330 to that make use of SERVER-9507 $sort+$group+$first optimization
// makes use of DISTINCT_SCAN.
db.system.buckets.telemetry.explain("executionStats").aggregate([
{$sort: {"meta.sensorId": 1, "control.max.timestamp": -1}},
{$group: {
_id: "$meta.sensorId",
bucket: {$first: "$_id"},
}},
{$lookup: {
from: "system.buckets.telemetry",
foreignField: "_id",
localField: "bucket",
as: "bucket_data",
pipeline:[
{$_internalUnpackBucket: {
timeField:"timestamp",
metaField:"tags",
bucketMaxSpanSeconds:NumberInt("60")
}},
{$sort: {"timestamp": -1}},
{$limit:1}
]
}},
{$unwind: "$bucket_data"},
{$replaceWith:{
_id: "$_id",
timestamp: "$bucket_data.timestamp",
temp: "$bucket_data.temp"
}}
]);
// compare to results from direct, slow version of query:
db.telemetry.aggregate([
{$sort: {"metadata.sensorId": 1, "timestamp": 1}},
{$group: {
_id: "$metadata.sensorId",
ts: {$last: "$timestamp"},
temp: {$last: "$temp"}
}}
], {allowDiskUse: true});
// results are the same! (if we have a unique value per timestamp)
// Now try without index. Peformance is still pretty good compared to original,
// because we're scanning buckets, not documents.
db.system.buckets.telemetry.explain("executionStats").aggregate([
{$sort: {"meta.sensorId": 1, "control.max.timestamp": 1}}, // sorting against index, so will not be able to do DISTINCT_SCAN
{$group: {
_id: "$meta.sensorId",
bucket: {$first: "$_id"},
}},
{$lookup: {
from: "system.buckets.telemetry",
foreignField: "_id",
localField: "bucket",
as: "bucket_data",
pipeline:[
{$_internalUnpackBucket: {
timeField:"timestamp",
metaField:"tags",
bucketMaxSpanSeconds:NumberInt("60")
}},
{$sort: {"timestamp": -1}},
{$limit:1}
]
}},
{$unwind: "$bucket_data"},
{$replaceWith:{
_id: "$_id",
timestamp: "$bucket_data.timestamp",
temp: "$bucket_data.temp"
}}
], {allowDiskUse: true});
// here's the final rewrite after SERVER-65005, replacing $lookup with another $group
db.system.buckets.telemetry.aggregate([
{$sort: {"meta.sensorId": 1, "control.max.timestamp": -1}},
{$group: {
_id: "$meta.sensorId",
bucket: {$first: "$_id"},
control: {$first: "$control"},
meta: {$first: "$meta"},
data: {$first: "$data"}
}},
{$_internalUnpackBucket: {
timeField:"timestamp",
metaField:"meta",
bucketMaxSpanSeconds:NumberInt("60")
}},
{$sort: {"meta.sensorId": 1, "timestamp": -1}},
{$group: {
_id: "$meta.sensorId",
ts: {$first: "$timestamp"},
temp: {$first: "$temp"}
}}
]);
//-----------------------------------------------------------------------------------------
// test on a sharded collection
// Run on sharded cluster also with 10M datapoints:
initDb(10000000, true)
// verify that collection is sharded
db.adminCommand({ listShards: 1 })
db.system.buckets.telemetry.getShardDistribution()
// repeat queries above...
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment