Skip to content

Instantly share code, notes, and snippets.

@robshep
Last active July 7, 2022 14:05
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save robshep/82b7e15d0f67aee9c3725a1a12bd5a3f to your computer and use it in GitHub Desktop.
Save robshep/82b7e15d0f67aee9c3725a1a12bd5a3f to your computer and use it in GitHub Desktop.
Quick test of Celery with MongoDB broker using mongo in docker
"""
Self-contained test for a simple Celery task interaction using an ephemeral MongoDB broker.
* MongoDB is created using docker with a temporary directory for storage.
* Celery broker is designated with the above containers random host-port.
* Celery worker is created using python Multi-process and managed.
* Triggers a distributed task
* Stops celery workers
* Stops mongodb container
* Removes temp directory.
$ pip install docker pymongo celery celery-with-mongodb
"""
from __future__ import absolute_import, unicode_literals
from celery import current_app, Celery
from celery.bin import worker
from docker import from_env
from docker.models.containers import Container
import time
from multiprocessing import Process
from pymongo import MongoClient
import os
import tempfile
import shutil
docker_c = from_env()
dbpath = tempfile.mkdtemp(dir="/tmp")
print str(os.getpid()) + ": Setup: Created Database Directory: " + dbpath
mongod = docker_c.containers.run("mongo:3.4",
ports={'27017/tcp': None},
detach=True,
auto_remove=True,
volumes={dbpath: {'bind':'/data/db', 'mode':'rw'}}) # type: Container
mongod.reload()
port = mongod.attrs['NetworkSettings']['Ports']['27017/tcp'][0]['HostPort']
BROKER_URL = 'mongodb://localhost:' + str(port) + '/q'
app = Celery('test_celery_q', broker=BROKER_URL)
print str(os.getpid()) + ": Setup: Started Mongod container: " + str(mongod.attrs['Name'])
print str(os.getpid()) + ": Setup: Mongod Broker is at: " + BROKER_URL
@app.task
def run_in_background(a, b):
print str(os.getpid()) + ": run_in_background --> "
time.sleep(5)
value = a + b
print str(os.getpid()) + ": run_in_background: result: " + str(value)
def run_celery(broker):
print str(os.getpid()) + ": run_celery: " + BROKER_URL
app = current_app._get_current_object()
workerz = worker.worker(app=app)
options = {
'broker': broker,
'loglevel': 'INFO',
'traceback': True,
}
workerz.run(**options)
if __name__ == "__main__":
print str(os.getpid()) + ": Main: " + BROKER_URL
client = MongoClient(BROKER_URL)
db = client['q']
coll = db['test_collection']
print str(os.getpid()) + ": Main: " + str(coll.count())
p = Process(target=run_celery, args=(BROKER_URL,))
p.daemon = True
p.start()
print str(os.getpid()) + ": Main: Starting Celery Worker in Background Process"
time.sleep(3)
print str(os.getpid()) + ": Main: Started Celery Worker in Background Process: " + str(p.pid)
run_in_background.delay(3, 4)
time.sleep(10)
print str(os.getpid()) + ": Main: Stopping Celery Worker in Background Process: " + str(p.pid)
app.control.broadcast('shutdown')
print str(os.getpid()) + ": Main: Stopping Mongod container"
mongod.stop()
print str(os.getpid()) + ": Main: Removing Database Directory: " + dbpath
shutil.rmtree(dbpath)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment