Skip to content

Instantly share code, notes, and snippets.

@dkliban
Last active August 29, 2015 14:16
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dkliban/18989c57bae02a3d466d to your computer and use it in GitHub Desktop.
Save dkliban/18989c57bae02a3d466d to your computer and use it in GitHub Desktop.
Add the following to the end of /usr/lib/python2.7/site-packages/pulp/server/tasks/__init__.py
After you edit the file you will need make sure you have pulp_workers running. If you had the
workers running already, please restart them after editing the file.
---------------------------------------------------------------
import os, shutil
import celery
from pulp.server.async.tasks import TaskResult, Task
@celery.task(base=Task)
def test_get_size_other_pid():
return os.path.getsize('/var/run/auditd.pid')
@celery.task(base=Task)
def test_get_size_own_pid():
return os.path.getsize('/var/run/pulp/celerybeat.pid')
@celery.task(base=Task)
def test_get_size_other_tmp():
path = '/tmp/testfile'
with open(path, 'r') as f:
pass
@celery.task(base=Task)
def test_create_and_remove_tmp_dir():
path = '/tmp/test_remove_tmp_dir'
os.makedirs(path)
shutil.rmtree(path)
@celery.task(base=Task)
def test_create_and_remove_tmp_file():
path = '/tmp/test_remove_tmp_file'
with open(path, 'a') as f:
pass
os.remove(path)
@celery.task(base=Task)
def test_create_and_remove_tmp_symlink():
path = '/tmp/test_create_tmp_symlink'
link = '/tmp/test_create_tmp_symlink_link'
with open(path, 'a') as f:
pass
os.symlink(path, link)
---------------------------------------------------------------
Then you need to execute the following commands:
sudo touch /tmp/testfile
sudo -u apache python
Then you will be able to ask pulp workers to perform the newly defined tasks by executing
the following code. The success or failure of a task will be indicated in the regular Pulp logs.
---------------------------------------------------------------
from pulp.server.tasks import test_get_size_other_pid, test_get_size_own_pid, test_get_size_other_tmp, test_create_and_remove_tmp_dir, test_create_and_remove_tmp_file, test_create_and_remove_tmp_symlink
from pulp.server.db.connection import initialize
initialize()
#This task should fail due to permission denied
test_get_size_other_pid.delay()
#This task should pass
test_get_size_own_pid.delay()
#This task should fail with permission denied
test_get_size_other_tmp.delay()
#This should succeed
test_create_and_remove_tmp_dir.delay()
#This should succeed
test_create_and_remove_tmp_file.delay()
#This should fail with permission denied
test_create_and_remove_tmp_symlink.delay()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment