Skip to content

Instantly share code, notes, and snippets.

@a-h
Created January 18, 2016 18:21
Show Gist options
  • Save a-h/64dc3053840965880ab6 to your computer and use it in GitHub Desktop.
Save a-h/64dc3053840965880ab6 to your computer and use it in GitHub Desktop.
Aggregation Framework Grouping and Filtering
from pymongo.errors import AutoReconnect
from pymongo import MongoClient
import pprint
from itertools import islice
def get_chunks(iterable, max_chunk_size):
temp = list()
for item in iterable:
temp.append(item)
if(len(temp) >= max_chunk_size):
yield temp
temp = list()
if(len(temp) > 0):
yield temp
def main():
pp = pprint.PrettyPrinter(indent=4)
client = MongoClient("mongodb://localhost:27017/students", j=True)
db = client.students
#for grade in db.grades.find().limit(10):
# print(grade)
#for grade in client.students.grades.distinct("student_id"):
# print(grade)
print("Aggregation Framework")
pipeline = [
# Filter to just homework types.
{ "$match": { "type": "homework" } },
# Group by the student.
{ "$group": {
"_id": { "student_id": "$student_id" },
# "homework_scores": { "$push": "$$ROOT" }. # Pushes the whole document into the grouping.
"homework_scores": { "$push": { "_id": "$_id", "score": "$score" } }, # Just pushes the id and the score.
"min_score" : { "$min" : "$score"}, # Adds the minimum to the group parent.
}
},
# Pass the minimum score from the grouping parent to each of the subdocuments.
{ "$project": {
"min_score": 1,
"homework_scores._id": 1,
"homework_scores.score": 1,
"homework_scores.lowest_score_in_group": "$min_score", # Copy the parent min_score to the child.
}
},
# Remove any subdocuments that aren't the minimum score.
{ "$project": {
"min_score": 1,
"homework_scores": {
"$filter": {
"input": "$homework_scores",
"as": "item",
"cond": { "$eq": [ "$$item.lowest_score_in_group", "$$item.score" ] }
}
}
}
},
# Create a single element for each item in homework_scores (there should only be one anyway).
{ "$unwind": {
"path" : "$homework_scores"
}
},
# Simplify the output document shape.
{ "$project": {
"_id": "$homework_scores._id",
# "student_id" : "$_id.student_id",
# "score": "$homework_scores.score",
# "lowest_score_in_group": "$homework_scores.lowest_score_in_group"
}
},
]
# Keep a record of the ids to remove.
ids_to_remove = list()
for item_to_remove in db.grades.aggregate(pipeline):
ids_to_remove.append(item_to_remove["_id"])
# Delete the records in batches.
batch_size = 100
for idx, chunk in enumerate(get_chunks(ids_to_remove, batch_size)):
print(str.format("Removing chunk {0}.", idx + 1))
db.grades.delete_many({ "_id": { "$in": chunk }})
print("Complete")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment