Last active
May 6, 2022 15:04
-
-
Save starzia/9d1f8a25a2e2e2124b78e2da71159602 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//----------------------------------------------------------------------------------------- | |
// helpers | |
function getMillis() { | |
const d = new Date(); | |
return d.getTime(); | |
} | |
function runtime(f) { | |
var start = getMillis(); | |
f(); | |
print("elapsed time: " + (getMillis() - start) + " ms") | |
} | |
//----------------------------------------------------------------------------------------- | |
// Generate sequential data. Buckets have 1000 values with one or two consecutive timestamps | |
function initDb(SAMPLES, isSharded) { | |
db.telemetry.drop(); | |
db.createCollection( | |
"telemetry", | |
{ | |
timeseries: { | |
timeField: "timestamp", | |
metaField: "metadata", | |
granularity: "seconds" | |
} | |
} | |
); | |
if (isSharded) { | |
sh.enableSharding("test"); | |
sh.shardCollection("test.telemetry", {"metadata.sensorId": "hashed"}) | |
} | |
function getRandomInt(max) { | |
return Math.floor(Math.random() * max); | |
} | |
var NUM_SENSORS = 100; | |
var sensors = Array.from(Array(NUM_SENSORS).keys()); | |
var BATCH_SIZE = 1000; | |
for (let i=0; i<SAMPLES/BATCH_SIZE; i++) { | |
var batch = []; | |
for (let j=0; j<BATCH_SIZE; j++) { | |
var r = getRandomInt(10000000); | |
var d = new Date(((i*BATCH_SIZE) + j) * 1000); | |
batch.push({ | |
"metadata": { | |
"sensorId": sensors[r % sensors.length], | |
"type": "temperature" | |
}, | |
"timestamp": d, | |
"temp": (r % 987) / 10.0 | |
}); | |
} | |
db.telemetry.insertMany(batch); | |
} | |
db.telemetry.createIndex({"metadata.sensorId":1, "timestamp":-1}); | |
} | |
//----------------------------------------------------------------------------------------- | |
// do some ad-hoc testing | |
initDb(10000000); | |
db.system.buckets.telemetry.findOne(); | |
// USE CASE 1 | |
// fails with: | |
// MongoServerError: PlanExecutor error during aggregation :: caused by :: Sort exceeded memory limit of 104857600 bytes, but did not opt in to external sorting. | |
db.telemetry.aggregate([ | |
{$sort: {"metadata.sensorId": 1, "timestamp": 1}}, | |
{$group: { | |
_id: "$metadata.sensorId", | |
ts: {$last: "$timestamp"}, | |
temp: {$last: "$temp"} | |
}} | |
]); | |
// USE CASE 1 again | |
// note that the plan has an _internalUnpackBucket stage first. That's the bottleneck. | |
db.telemetry.explain("executionStats").aggregate([ | |
{$sort: {"metadata.sensorId": 1, "timestamp": 1}}, | |
{$group: { | |
_id: "$metadata.sensorId", | |
ts: {$last: "$timestamp"}, | |
temp: {$last: "$temp"} | |
}} | |
], {allowDiskUse: true}); | |
// USE CASE 2 (reverse): | |
// looks like it should use the index, but does not, due to bucket unpack | |
db.telemetry.explain("executionStats").aggregate([ | |
{$sort: {"metadata.sensorId": 1, "timestamp": -1}}, | |
{$group: { | |
_id: "$metadata.sensorId", | |
ts: {$first: "$timestamp"}, | |
temp: {$first: "$temp"} | |
}} | |
], {allowDiskUse: true}); | |
// USE CASE 3 (topN): | |
db.telemetry.explain("executionStats").aggregate([ | |
{$group: { | |
_id: "$metadata.sensorId", | |
mostRecent: {$topN: { | |
n: 1, | |
sortBy: {"timestamp": -1}, | |
output: { | |
ts: "$timestamp", | |
temp: "$temp" | |
} | |
}} | |
}} | |
], {allowDiskUse: true}); | |
// USE CASE 4: $match | |
// execution time is ~100x faster because only matching buckets are scanned. | |
db.telemetry.explain("executionStats").aggregate([ | |
{$match: {"metadata.sensorId": 11}}, | |
{$sort: {"metadata.sensorId": 1, "timestamp": 1}}, | |
{$group: { | |
_id: "$metadata.sensorId", | |
ts: {$last: "$timestamp"}, | |
temp: {$last: "$temp"} | |
}} | |
], {allowDiskUse: true}); | |
// USE CASE 5: sort by second key in compound index | |
// it's actually about 15% faster than the original | |
db.telemetry.explain("executionStats").aggregate([ | |
{$sort: {"timestamp": 1}}, | |
{$group: { | |
_id: "$metadata.sensorId", | |
ts: {$last: "$timestamp"}, | |
temp: {$last: "$temp"} | |
}} | |
], {allowDiskUse: true}); | |
// confirmed that all of the above give consistent answers (though variation 4 is for just one sensor). | |
// If we know the exact last timeseries value we can use the index | |
// as follows, giving an idea of the desired execution time. | |
db.telemetry.explain("executionStats").aggregate([ | |
{$match: {"timestamp": ISODate("1970-01-01T00:00:00Z")}}, | |
{$group: { | |
_id: "$metadata.sensorId", | |
ts: {$last: "$timestamp"}, | |
temp: {$last: "$temp"} | |
}} | |
]); | |
// looking at just one sensor now | |
// using IDX scan now | |
db.telemetry.explain("executionStats").aggregate([ | |
{$match: { | |
"metadata.sensorId": 11, | |
"timestamp": ISODate("1970-01-01T00:00:00Z") | |
}}, | |
{$group: { | |
_id: "$metadata.sensorId", | |
ts: {$last: "$timestamp"}, | |
temp: {$last: "$temp"} | |
}} | |
]); | |
// now try to get last points without knowing the timestamp. | |
// Use the planned design for PM-2330 to that make use of SERVER-9507 $sort+$group+$first optimization | |
// makes use of DISTINCT_SCAN. | |
db.system.buckets.telemetry.explain("executionStats").aggregate([ | |
{$sort: {"meta.sensorId": 1, "control.max.timestamp": -1}}, | |
{$group: { | |
_id: "$meta.sensorId", | |
bucket: {$first: "$_id"}, | |
}}, | |
{$lookup: { | |
from: "system.buckets.telemetry", | |
foreignField: "_id", | |
localField: "bucket", | |
as: "bucket_data", | |
pipeline:[ | |
{$_internalUnpackBucket: { | |
timeField:"timestamp", | |
metaField:"tags", | |
bucketMaxSpanSeconds:NumberInt("60") | |
}}, | |
{$sort: {"timestamp": -1}}, | |
{$limit:1} | |
] | |
}}, | |
{$unwind: "$bucket_data"}, | |
{$replaceWith:{ | |
_id: "$_id", | |
timestamp: "$bucket_data.timestamp", | |
temp: "$bucket_data.temp" | |
}} | |
]); | |
// compare to results from direct, slow version of query: | |
db.telemetry.aggregate([ | |
{$sort: {"metadata.sensorId": 1, "timestamp": 1}}, | |
{$group: { | |
_id: "$metadata.sensorId", | |
ts: {$last: "$timestamp"}, | |
temp: {$last: "$temp"} | |
}} | |
], {allowDiskUse: true}); | |
// results are the same! (if we have a unique value per timestamp) | |
// Now try without index. Peformance is still pretty good compared to original, | |
// because we're scanning buckets, not documents. | |
db.system.buckets.telemetry.explain("executionStats").aggregate([ | |
{$sort: {"meta.sensorId": 1, "control.max.timestamp": 1}}, // sorting against index, so will not be able to do DISTINCT_SCAN | |
{$group: { | |
_id: "$meta.sensorId", | |
bucket: {$first: "$_id"}, | |
}}, | |
{$lookup: { | |
from: "system.buckets.telemetry", | |
foreignField: "_id", | |
localField: "bucket", | |
as: "bucket_data", | |
pipeline:[ | |
{$_internalUnpackBucket: { | |
timeField:"timestamp", | |
metaField:"tags", | |
bucketMaxSpanSeconds:NumberInt("60") | |
}}, | |
{$sort: {"timestamp": -1}}, | |
{$limit:1} | |
] | |
}}, | |
{$unwind: "$bucket_data"}, | |
{$replaceWith:{ | |
_id: "$_id", | |
timestamp: "$bucket_data.timestamp", | |
temp: "$bucket_data.temp" | |
}} | |
], {allowDiskUse: true}); | |
// here's the final rewrite after SERVER-65005, replacing $lookup with another $group | |
db.system.buckets.telemetry.aggregate([ | |
{$sort: {"meta.sensorId": 1, "control.max.timestamp": -1}}, | |
{$group: { | |
_id: "$meta.sensorId", | |
bucket: {$first: "$_id"}, | |
control: {$first: "$control"}, | |
meta: {$first: "$meta"}, | |
data: {$first: "$data"} | |
}}, | |
{$_internalUnpackBucket: { | |
timeField:"timestamp", | |
metaField:"meta", | |
bucketMaxSpanSeconds:NumberInt("60") | |
}}, | |
{$sort: {"meta.sensorId": 1, "timestamp": -1}}, | |
{$group: { | |
_id: "$meta.sensorId", | |
ts: {$first: "$timestamp"}, | |
temp: {$first: "$temp"} | |
}} | |
]); | |
//----------------------------------------------------------------------------------------- | |
// test on a sharded collection | |
// Run on sharded cluster also with 10M datapoints: | |
initDb(10000000, true) | |
// verify that collection is sharded | |
db.adminCommand({ listShards: 1 }) | |
db.system.buckets.telemetry.getShardDistribution() | |
// repeat queries above... | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment