Skip to content

Instantly share code, notes, and snippets.

@andre-merzky
Last active November 21, 2019 14:16
Show Gist options
  • Save andre-merzky/a6f1eb33dc5c55c51438e041a0349ae7 to your computer and use it in GitHub Desktop.
Save andre-merzky/a6f1eb33dc5c55c51438e041a0349ae7 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
import os
import sys
import json
import radical.utils as ru
import subprocess as sp
prof = ru.Profiler('flux.test')
rep = ru.Reporter('flux.test')
# ------------------------------------------------------------------------------
#
def start_flux():
'''
run `flux start` which will open a new subshell. Feed `flux env` on stdin
of that shell and parse the output. Export to `os.environ`, and expand
`sys.path` wth the found `PYTHONPATH`, so that `import flux` then works.
'''
rep.header('flux startup')
prof.prof('flux_start')
cmd = '/home/merzky/projects/flux/install/bin/flux start'
proc = sp.Popen(cmd.split(), stdin=sp.PIPE, stdout=sp.PIPE, stderr=sp.STDOUT)
proc.stdin.write('flux env\necho "OK"\n')
rep.info('parse env\n')
while True:
line = proc.stdout.readline().strip()
if line == 'OK':
break
elif line.startswith('export '):
line = line.split(' ', 1)[1]
k, v = line.split('=', 1)
v = v.rstrip('"').lstrip('"')
os.environ[k] = v
if k == 'PYTHONPATH':
sys.path.extend(v.split(':'))
rep.info(' ppath %s\n' % v)
elif k == 'FLUX_URI':
rep.info(' uri %s\n' % v)
rep.ok('>>ok\n')
prof.prof('flux_started')
return proc
# ------------------------------------------------------------------------------
#
def stop_flux(proc):
'''
send `exit` to the flux shell stdin to trigger termination, and print all
output found up to completion.
'''
rep.header('flux stop')
rep.info('terminate\n')
prof.prof('flux_stop')
proc.stdin.write('exit\n')
while True:
line = proc.stdout.readline().strip()
if not line:
break
rep.plain(' -> %s\n' % line)
rep.ok('>>ok\n')
prof.prof('flux_stopped')
# ------------------------------------------------------------------------------
#
def use_flux():
'''
Submit `n` tasks, and listen for state transtion events. Once `n`
`INACTIVE` events are found, we are done
FIXME: how to obtain exit codes?
'''
rep.header('execute workload')
import flux
from flux import job
h = flux.Flux()
h.event_subscribe('job-state')
njobs = 256
jobspec = json.dumps(ru.read_json('spec.json'))
rep.info('submit ')
prof.prof('submit_start')
for i in range(njobs):
jobid = job.submit(h, jobspec)
rep.progress('.')
rep.ok('>>ok\n')
prof.prof('submit_stop')
done = 0
rep.info('collect ')
prof.prof('collect_start')
while done < njobs:
event = h.event_recv()
for _, state in event.payload['transitions']:
if state == 'INACTIVE':
done += 1
rep.progress('+')
rep.ok('>>ok\n')
prof.prof('collect_stop')
h.event_unsubscribe('job-state')
# ------------------------------------------------------------------------------
#
rep.title('FLUX example')
proc = start_flux()
use_flux()
use_flux()
stop_flux(proc)
rep.header('done')
# ------------------------------------------------------------------------------
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment