Last active
July 7, 2022 14:05
-
-
Save robshep/82b7e15d0f67aee9c3725a1a12bd5a3f to your computer and use it in GitHub Desktop.
Quick test of Celery with MongoDB broker using mongo in docker
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Self-contained test for a simple Celery task interaction using an ephemeral MongoDB broker. | |
* MongoDB is created using docker with a temporary directory for storage. | |
* Celery broker is designated with the above containers random host-port. | |
* Celery worker is created using python Multi-process and managed. | |
* Triggers a distributed task | |
* Stops celery workers | |
* Stops mongodb container | |
* Removes temp directory. | |
$ pip install docker pymongo celery celery-with-mongodb | |
""" | |
from __future__ import absolute_import, unicode_literals | |
from celery import current_app, Celery | |
from celery.bin import worker | |
from docker import from_env | |
from docker.models.containers import Container | |
import time | |
from multiprocessing import Process | |
from pymongo import MongoClient | |
import os | |
import tempfile | |
import shutil | |
docker_c = from_env() | |
dbpath = tempfile.mkdtemp(dir="/tmp") | |
print str(os.getpid()) + ": Setup: Created Database Directory: " + dbpath | |
mongod = docker_c.containers.run("mongo:3.4", | |
ports={'27017/tcp': None}, | |
detach=True, | |
auto_remove=True, | |
volumes={dbpath: {'bind':'/data/db', 'mode':'rw'}}) # type: Container | |
mongod.reload() | |
port = mongod.attrs['NetworkSettings']['Ports']['27017/tcp'][0]['HostPort'] | |
BROKER_URL = 'mongodb://localhost:' + str(port) + '/q' | |
app = Celery('test_celery_q', broker=BROKER_URL) | |
print str(os.getpid()) + ": Setup: Started Mongod container: " + str(mongod.attrs['Name']) | |
print str(os.getpid()) + ": Setup: Mongod Broker is at: " + BROKER_URL | |
@app.task | |
def run_in_background(a, b): | |
print str(os.getpid()) + ": run_in_background --> " | |
time.sleep(5) | |
value = a + b | |
print str(os.getpid()) + ": run_in_background: result: " + str(value) | |
def run_celery(broker): | |
print str(os.getpid()) + ": run_celery: " + BROKER_URL | |
app = current_app._get_current_object() | |
workerz = worker.worker(app=app) | |
options = { | |
'broker': broker, | |
'loglevel': 'INFO', | |
'traceback': True, | |
} | |
workerz.run(**options) | |
if __name__ == "__main__": | |
print str(os.getpid()) + ": Main: " + BROKER_URL | |
client = MongoClient(BROKER_URL) | |
db = client['q'] | |
coll = db['test_collection'] | |
print str(os.getpid()) + ": Main: " + str(coll.count()) | |
p = Process(target=run_celery, args=(BROKER_URL,)) | |
p.daemon = True | |
p.start() | |
print str(os.getpid()) + ": Main: Starting Celery Worker in Background Process" | |
time.sleep(3) | |
print str(os.getpid()) + ": Main: Started Celery Worker in Background Process: " + str(p.pid) | |
run_in_background.delay(3, 4) | |
time.sleep(10) | |
print str(os.getpid()) + ": Main: Stopping Celery Worker in Background Process: " + str(p.pid) | |
app.control.broadcast('shutdown') | |
print str(os.getpid()) + ": Main: Stopping Mongod container" | |
mongod.stop() | |
print str(os.getpid()) + ": Main: Removing Database Directory: " + dbpath | |
shutil.rmtree(dbpath) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment