Skip to content

Instantly share code, notes, and snippets.

@avneesh91
Created May 14, 2020 06:25
Show Gist options
  • Save avneesh91/d9f5bf13d0a1265e5cded0082f9f0c17 to your computer and use it in GitHub Desktop.
Save avneesh91/d9f5bf13d0a1265e5cded0082f9f0c17 to your computer and use it in GitHub Desktop.
'''
Script for picking up changes from the change stream
'''
import os
import json
import pymongo
from bson.json_util import dumps
from bson import ObjectId
client = pymongo.MongoClient('mongodb://localhost:27017/')
db = client['test']
def handle_event_expiry(id):
print(id)
events = db.notifications.find({'event_id': ObjectId(id)})
for i in events:
print(f'processing events {i}')
change_stream = db.expiry_event.watch()
for change in change_stream:
event = json.loads(dumps(change))
if event.get('operationType') == 'delete':
expired_event = event.get('documentKey', {}).get('_id', {}).get('$oid')
handle_event_expiry(expired_event)
'''
Script for enqueuing a notification
Create a TTL index using the following code
db.expiry_event.createIndex( { "created_at": 1 }, { expireAfterSeconds: 60 } )
'''
import os
import pymongo
import datetime
from bson.json_util import dumps
client = pymongo.MongoClient('mongodb://localhost:27017/')
db = client['test']
def create_notification(user_id, message):
notification = {'user_id': user_id, 'message': message}
val = db.expiry_event.find_one({'user_id': user_id})
timestamp = datetime.datetime.utcnow() + datetime.timedelta(minutes=1)
event_id = None
if val:
event_id = val.get('_id')
db.expiry_event.update({'_id': val.get('_id')}, {'$set':{'created_at': timestamp}})
else:
event_id = db.expiry_event.insert({'user_id': user_id, 'created_at': timestamp})
db.notifications.insert({'event_id': event_id, 'user_id': user_id, 'message': message})
# Call this function for creating a notification and
# enqueuing it
create_notification('test_user_1', 'random')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment