Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
An example of the MongoDB Aggregation pipelines and commands used for some general filters. View
// -----------------------------------------------------------------
// Applying aggregation pipeline #1 Customer - HAVE
// -----------------------------------------------------------------
db.command({
"aggregate": "app.customer",
"pipeline": [{
"$match": {
"$and": [{
// -- Ensure the customers we are selecting are active and have a Firebase token so we can send them a push notification
"status": "active",
"firebaseToken": {"$exists": true, "$ne": null} // We likely wouldn't add the firebaseToken check if sending coupons
}, {
// -- Checking for the Age ranges 0-20, 31-40, 41-50, 51-60
"$or": [{
"$and": [{
"dateOfBirth": {
"$lte": 1589068800,
"$gte": 957916800
}
}]
}, {
"$and": [{
"dateOfBirth": {
"$lte": 610761600,
"$gte": 326764800
}
}]
}, {
"$and": [{
"dateOfBirth": {
"$lte": 295142400,
"$gte": 11145600
}
}]
}, {
"$and": [{
"dateOfBirth": {
"$lte": -20390400,
"$gte": -304387200
}
}]
}
]
}, {
// Check for the state
"state": {"$in": ['SA', 'Vic', 'QLD', 'NSW', 'WA', 'Tas']}
}]
}
}, {
// Only get the Customer's _id field
"$project": {"_id": 1}
}, {
"$merge": {
"into": "customerGroup-5eb78328af2f5c079a1e9352",
"on": "_id",
"whenMatched": "keepExisting",
"whenNotMatched": "insert" // This is the first aggregation run, so we just insert all entries, don't need to merge
}
}],
})
// -----------------------------------------------------------------
// Applying aggregation pipeline #2 Transaction - HAVE
// -----------------------------------------------------------------
db.command({
"aggregate": "app.customerTransaction",
"pipeline": [
{
"$match":
{
// We thankfully can $and all the HAVE $match'es together
"$and": [
{
// -- Search for customers who in the last 60 days have shopped at the listed stores
"$and": [
{
"transactionDate": {"$gte": 1583884800}
}, {
"storeId": {"$in": ["MongoDB\\BSON\\ObjectId(5e6111bcaf2f5c1d9f7f88f2)", "MongoDB\\BSON\\ObjectId(5e6111bcaf2f5c1d9f7f88f3)", "MongoDB\\BSON\\ObjectId(5e6111bcaf2f5c1d9f7f88f4)", "MongoDB\\BSON\\ObjectId(5e6111bcaf2f5c1d9f7f88f5)", "MongoDB\\BSON\\ObjectId(5e6111bcaf2f5c1d9f7f88f6)", "MongoDB\\BSON\\ObjectId(5e6111bcaf2f5c1d9f7f88f7)", "MongoDB\\BSON\\ObjectId(5e6111bcaf2f5c1d9f7f88fb)"]}
}
]
}, {
// -- For the Exclude of "Haven't Purchased With Product Categories X Within Y Days" filter
// we convert it to an include (select) of those who HAVE purchased the various Product Categories in the last 100 days
// as we are then removing those who haven't made such a purchase
"$and": [
{
"transactionDate": {"$gte": 1580428800}
},
{
"productCategories": {
"$in": ["MongoDB\\BSON\\ObjectId(5e79a458af2f5c1f95415062)", "MongoDB\\BSON\\ObjectId(5e79a458af2f5c1f95415084)", "MongoDB\\BSON\\ObjectId(5e79a458af2f5c1f95415087)", "MongoDB\\BSON\\ObjectId(5e79a458af2f5c1f9541508b)", "MongoDB\\BSON\\ObjectId(5e79a458af2f5c1f9541508f)", "MongoDB\\BSON\\ObjectId(5e79a458af2f5c1f9541508d)"]
}
}]
}]
}
},
{
// Select just the customerId field
"$group": {"_id": "$customerId"}
}, {
// Add a selected: true to the customers we match on
"$addFields": {"selected": true}
}, {
"$merge": {
"into": "customerGroup-5eb78328af2f5c079a1e9352",
"on": "_id",
"whenMatched": "merge", // Merge the results, setting selected:true to these
"whenNotMatched": "discard" // We aren't adding any new customers, we are filtering down the existing set
}
}],
});
// Delete all entries we didn't just mark as selected
db['customerGroup-5eb78328af2f5c079a1e9352'].bulkWrite([{
"type": "delete",
"condition": {"selected": {"$exists": false}},
"options": {"multi": true}
}])
// Remove the selected field, ready for the next pipelines
db['customerGroup-5eb78328af2f5c079a1e9352'].bulkWrite([{
"type": "update",
"condition": {"selected": {"$exists": true}},
"document": {"$unset": {"selected": ""}},
"options": {"multi": true, "upsert": false}
}])
// End of pipeline 2
// -----------------------------------------------------------------
// Applying aggregation pipeline #3 Customer - Have Not
// -----------------------------------------------------------------
db.command({
"aggregate": "app.customer",
"pipeline": [{
"$match": {
"$or": [{
// Excluding the males by
// selecting the males and then removing them
"gender": {"$in": ["male"]}
}]
}
}, {
// Just want the customer._id field
"$project": {"_id": 1}
}, {
// Add an excluded:true field to the ones we match
"$addFields": {"excluded": true}
}, {
"$merge": {
"into": "customerGroup-5eb78328af2f5c079a1e9352",
"on": "_id",
"whenMatched": "merge", // Here's where we say we are merging the excluded field
"whenNotMatched": "discard"
}
}],
});
// Delete the excluded customers
db['customerGroup-5eb78328af2f5c079a1e9352'].bulkWrite([{
"type": "delete",
"condition": {"excluded": true},
"options": {"multi": true}
}]);
// End of pipeline 3
// -----------------------------------------------------------------
// Applying aggregation pipeline #4 Transaction - Have Not
// -----------------------------------------------------------------
db.command({
"aggregate": "app.customerTransaction",
"pipeline": [{
"$match": {
// $or the HAVE NOT's together
"$or": [{
"$and": [{
// The Include filter of "Haven't Purchased With Product Categories X Within Y Days, productCategories: SEAFOOD - SPECIAL, days: 2"
// gets converted into an exclude of those who have purchased.
// So we select and exclude anyone who's purchased from the Special Seafood category of food in the last 2 days
"transactionDate": {"$gte": 1588896000}
}, {
"productCategories": {"$in": ["MongoDB\\BSON\\ObjectId(5e79a458af2f5c1f95415067)"]}
}]
}, {
// The Exclude filter of "Have Purchased Products X Within Y Days, products: 180 PROT BAR VEGAN CHOC MINT 40G, days: 90"
// Gets converted into us selecting and excluding those who've purchased the Vegan protein bar in the last 90 days.
"$and": [{
"transactionDate": {"$gte": 1581292800}
}, {
"products": {"$in": ["MongoDB\\BSON\\ObjectId(5e916292af2f5c0896207e37)"]}
}]
}]
}
}, {
"$group": {
"_id": "$customerId"
}
}, {
"$addFields": {"excluded": true}
}, {
"$merge": {
"into": "customerGroup-5eb78328af2f5c079a1e9352",
"on": "_id",
"whenMatched": "merge",
"whenNotMatched": "discard"
}
}],
});
// Remove the excluded customers
db['customerGroup-5eb78328af2f5c079a1e9352'].bulkWrite([{"type":"delete","condition":{"excluded":true},"options":{"multi":true}}]);
// End of pipeline 4
// -----------------------------
// We are using "customerGroup-<customerGroupId>" for the merged collection results name. So each customerGroup has it's own merged collection with results.
// The collection customerGroup-5eb78328af2f5c079a1e9352 should now have a set of _id's which are the customerId's which match all the filters.
// We can now load up the actual customer models in batches based on those _id's and process them, sending push notifications, coupons, surveys, etc..
// We save a customerGroup.lastRan unixtime field so we know if the merged results are stale and need re-creating.
// Note: Whilst this is nearly valid MongoDB commands, it's been modified and is based on the PHP output of run commands
// You'll want to change "MongoDB\\BSON\\ObjectId(5e79a458af2f5c1f95415062)" to something like ObjectId("5e79a458af2f5c1f95415062")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.