Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save takyon12/5b090a081e7a86745b34f0583f22ca62 to your computer and use it in GitHub Desktop.
Save takyon12/5b090a081e7a86745b34f0583f22ca62 to your computer and use it in GitHub Desktop.
mongdb aggregation pipeline python example
import pymongo, json
def db_connection(db_name, collection_name):
    client = pymongo.MongoClient('mongodb://localhost:27017/')
    db = client[db_name]
    collection = db[collection_name]
    print('db {} connected, collection {} current data: {}'.
           format(db_name, collection_name, collection.count()))
    return collection

def db_tmp():
    db_name = "_tmp_"
    collection_name = "_tmp_"
    _temp_ = db_connection(db_name, collection_name)
    _temp_.drop() 
    return _temp_

preparing the data

tmp = db_tmp()

# json file data: http://media.mongodb.org/zips.json?_ga=1.172262627.1124773036.1490323698
with open("zips.json") as fp:
    json_lines = fp.readlines()
    fp.close()
for json_line in json_lines:
    tmp.insert_one(json.loads(json_line))
db _tmp_ connected, collection _tmp_ current data: 0
tmp.find_one()
{'_id': '01001',
 'city': 'AGAWAM',
 'loc': [-72.622739, 42.070206],
 'pop': 15338,
 'state': 'MA'}

aggregate() Method

All of the following examples use the aggregate() helper in the mongo shell.

The aggregate() method uses the aggregation pipeline to processes documents into aggregated results. An aggregation pipeline consists of stages with each stage processing the documents as they pass along the pipeline. Documents pass through the stages in sequence.

The aggregate() method in the mongo shell provides a wrapper around the aggregate database command. See the documentation for your driver for a more idiomatic interface for data aggregation operations.

# Return States with Populations above 10 Million
pipeline = [    
    { "$group": { "_id": "$state", "totalPop": { "$sum": "$pop" } } },
    { "$match": { "totalPop": { "$gte": 10*1e6 } } }
] 

cursor = tmp.aggregate(pipeline)
list(cursor)
[{'_id': 'IL', 'totalPop': 11427576},
 {'_id': 'OH', 'totalPop': 10846517},
 {'_id': 'FL', 'totalPop': 12686644},
 {'_id': 'NY', 'totalPop': 17990402},
 {'_id': 'PA', 'totalPop': 11881643},
 {'_id': 'TX', 'totalPop': 16984601},
 {'_id': 'CA', 'totalPop': 29754890}]

The equivalent SQL for this aggregation operation is:

SELECT state, SUM(pop) AS totalPop
FROM zipcodes
GROUP BY state
HAVING totalPop >= (10*1000*1000)
# Return Average City Population by State

pipeline = [
    { "$group": {"_id": {"state": "$state", "city": "$city" }, "city_pop": { "$sum": "$pop" } } }, 
    { "$group": { "_id": "$_id.state", "avgCityPop": { "$avg": "$city_pop" } } },
    { "$sort": {"avgCityPop": -1}},
    { "$limit": 10}
]

cursor = tmp.aggregate(pipeline)
list(cursor)
[{'_id': 'DC', 'avgCityPop': 303450.0},
 {'_id': 'CA', 'avgCityPop': 27756.42723880597},
 {'_id': 'FL', 'avgCityPop': 27400.958963282937},
 {'_id': 'AZ', 'avgCityPop': 20591.16853932584},
 {'_id': 'RI', 'avgCityPop': 19292.653846153848},
 {'_id': 'NV', 'avgCityPop': 18209.590909090908},
 {'_id': 'HI', 'avgCityPop': 15831.842857142858},
 {'_id': 'NJ', 'avgCityPop': 15775.89387755102},
 {'_id': 'MA', 'avgCityPop': 14855.37037037037},
 {'_id': 'CT', 'avgCityPop': 14674.625}]
# Return Largest and Smallest Cities by State

# do it step by step
# get populations by cities, and sort it.

pipeline = [
    { "$group": {"_id": { "state": "$state", "city": "$city" }, "city_pop": { "$sum": "$pop" } } }, 
    { "$sort": { "city_pop": 1 } }
    ]
cursor = tmp.aggregate(pipeline)
list(cursor)[:10]
[{'_id': {'city': 'SELAWIK', 'state': 'AK'}, 'city_pop': 0},
 {'_id': {'city': 'RUSSIAN MISSION', 'state': 'AK'}, 'city_pop': 0},
 {'_id': {'city': 'NAKNEK', 'state': 'AK'}, 'city_pop': 0},
 {'_id': {'city': 'GRAYLING', 'state': 'AK'}, 'city_pop': 0},
 {'_id': {'city': 'CHEVAK', 'state': 'AK'}, 'city_pop': 0},
 {'_id': {'city': 'ODELL', 'state': 'OR'}, 'city_pop': 0},
 {'_id': {'city': 'NINOLE', 'state': 'HI'}, 'city_pop': 0},
 {'_id': {'city': 'OREGON HOUSE', 'state': 'CA'}, 'city_pop': 0},
 {'_id': {'city': 'ALLEGHANY', 'state': 'CA'}, 'city_pop': 0},
 {'_id': {'city': 'TWIN BRIDGES', 'state': 'CA'}, 'city_pop': 0}]
# regroup the last result by state and return the first and last city name and population

pipeline += [
    { "$group": {"_id" : "$_id.state", 
                 "biggestCity":  { "$last": "$_id.city" }, 
                 "biggestPop":   { "$last": "$city_pop" }, 
                 "smallestCity": { "$first": "$_id.city" },
                 "smallestPop":  { "$first": "$city_pop" }
                 }}
    ]
cursor = tmp.aggregate(pipeline)
list(cursor)[:3]
[{'_id': 'DE',
  'biggestCity': 'NEWARK',
  'biggestPop': 111674,
  'smallestCity': 'BETHEL',
  'smallestPop': 108},
 {'_id': 'MO',
  'biggestCity': 'SAINT LOUIS',
  'biggestPop': 397802,
  'smallestCity': 'BENDAVIS',
  'smallestPop': 44},
 {'_id': 'IL',
  'biggestCity': 'CHICAGO',
  'biggestPop': 2452177,
  'smallestCity': 'ANCONA',
  'smallestPop': 38}]
# modify the output format by $project

pipeline += [
    { "$project": 
     { 
         "_id": 0,
         "state": "$_id",  
          
         "biggestCity":  { "name": "$biggestCity",  "pop": "$biggestPop" }, 
         "smallestCity": { "name": "$smallestCity", "pop": "$smallestPop" }
         }
     }
]
cursor = tmp.aggregate(pipeline)
list(cursor)[:3]
[{'biggestCity': {'name': 'NEWARK', 'pop': 111674},
  'smallestCity': {'name': 'BETHEL', 'pop': 108},
  'state': 'DE'},
 {'biggestCity': {'name': 'SAINT LOUIS', 'pop': 397802},
  'smallestCity': {'name': 'BENDAVIS', 'pop': 44},
  'state': 'MO'},
 {'biggestCity': {'name': 'CHICAGO', 'pop': 2452177},
  'smallestCity': {'name': 'ANCONA', 'pop': 38},
  'state': 'IL'}]
pipeline
[{'$group': {'_id': {'city': '$city', 'state': '$state'},
   'city_pop': {'$sum': '$pop'}}},
 {'$sort': {'city_pop': 1}},
 {'$group': {'_id': '$_id.state',
   'biggestCity': {'$last': '$_id.city'},
   'biggestPop': {'$last': '$city_pop'},
   'smallestCity': {'$first': '$_id.city'},
   'smallestPop': {'$first': '$city_pop'}}},
 {'$project': {'_id': 0,
   'biggestCity': {'name': '$biggestCity', 'pop': '$biggestPop'},
   'smallestCity': {'name': '$smallestCity', 'pop': '$smallestPop'},
   'state': '$_id'}}]

Reference:

Aggregation Pipeline

Aggregation Pipeline Quick Reference

mongoDB Reference Card

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment