Skip to content

Instantly share code, notes, and snippets.

@adamsvoboda
Last active April 16, 2023 15:12
Show Gist options
  • Save adamsvoboda/76d353d3d272d6c828af2319b10ae7fd to your computer and use it in GitHub Desktop.
Save adamsvoboda/76d353d3d272d6c828af2319b10ae7fd to your computer and use it in GitHub Desktop.
asynchronous rethinkdb change feeds with django channels
import asyncio
from rethinkdb import r, RqlRuntimeError
from channels import Group
r.set_loop_type('asyncio') # RethinkDB's driver must use asyncio for the loop type.
def create_changefeed_monitor(feeds):
"""
Creates a new asyncio event loop and starts the parallel execution of attach_changefeed
for each table we want to monitor. Easily extended to accept whatever arguements you need.
Once your routing is setup, this function could be called like:
Channel('create-changefeed-monitor').send({'tables': ['table1', 'table2']})
Resources:
https://www.wordfugue.com/using-django-channels-email-sending-queue
https://docs.python.org/3/library/asyncio-task.html#asyncio.ensure_future
https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.BaseEventLoop.run_until_complete
"""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
tasks = []
for table in feeds['tables']:
tasks.append(asyncio.ensure_future(changefeed_attach(table)))
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
# RethinkDB Changefeeds
# https://rethinkdb.com/docs/changefeeds/python/
async def changefeed_attach(table):
"""
Creates a new RethinkDB connection for each instance of the function, attaches to
the changefeed for the table, awaits for any changes to be emitted from the
AsyncioCursor returned by RethinkDB!
The python RethinkDB driver is not thread-safe, but I don't think creating new
connections for each changefeed monitor is a big deal. "RethinkDB doesn't create a
thread per connection, so once established, you can have tens of thousands of
connections with very little overhead."
It may be worth investigating a Queue.Queue() object and doing something like a
RethinkDB connection pool for projects that use a lot of RethinkDB.
"""
try:
conn = await r.connect('127.0.0.1', 28015, db='your_database')
feed = await r.table(table).changes().run(conn)
while await feed.fetch_next():
change = await feed.next()
# Do something with the change. I'm broadcasting it out to a group for testing.
Group('changefeed-' + table).send({
"type": "changefeed",
"table": table,
"old": change['old_val'],
"new": change['new_val'],
})
except RqlRuntimeError as e:
# TODO: Implement changefeed error recovery or task monitoring
pass
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment