Skip to content

Instantly share code, notes, and snippets.

@ibab
Last active January 13, 2016 17:40
Show Gist options
  • Save ibab/7737c914b242bba266a8 to your computer and use it in GitHub Desktop.
Save ibab/7737c914b242bba266a8 to your computer and use it in GitHub Desktop.
Faster grid submission using async
from __future__ import print_function
import gevent
import gevent.queue
import gevent.pool
import gevent.subprocess
import gevent.monkey
gevent.monkey.patch_all()
from functools import partial
from DIRAC import S_OK, S_ERROR, gLogger, exit
from DIRAC.Core.Base import Script
Script.parseCommandLine(ignoreErrors = False)
from DIRAC.Interfaces.API.Job import Job
from DIRAC.Interfaces.API.Dirac import Dirac
monitoring_group = gevent.pool.Group()
monitoring = gevent.queue.Queue()
downloading_group = gevent.pool.Group()
downloading = gevent.queue.Queue()
def submit(j):
print('Submitting job')
#resp = dirac.submit(j)
resp = dirac.submit(j)
jid = resp['JobID']
monitoring.put({'jid': jid, 'status': 'Submitted'})
print('Submitted job')
def monitor():
while True:
info = monitoring.get()
jid = info['jid']
status = info['status']
resp = dirac.status(jid)
new_status = resp['Value'][jid]['Status']
if new_status == 'Done':
print('Job {} finished!'.format(jid))
downloading.put({'jid': jid})
elif new_status == 'Failed':
print('Job {} failed!'.format(jid))
else:
if new_status != status:
print('Job {} changed to {}'.format(jid, new_status))
monitoring.put({'jid': jid, 'status': new_status})
def download():
while True:
info = downloading.get()
jid = info['jid']
print('Downloading job {}'.format(jid))
output = gevent.subprocess.check_output(["dirac-wms-job-get-output", str(jid)])
print('Downloaded job {}'.format(jid))
if __name__ == '__main__':
dirac = Dirac()
tasks = []
for i in range(100):
j = Job()
j.setExecutable('job.sh')
j.setName('Async Test {:03d}'.format(i))
j.setOutputSandbox('output.dat')
tasks.append(gevent.spawn(partial(submit, j)))
for i in range(10):
monitoring_group.spawn(monitor)
for i in range(10):
downloading_group.spawn(download)
gevent.joinall(tasks)
monitoring_group.join()
downloading_group.join()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment