Skip to content

Instantly share code, notes, and snippets.

@SteVwonder
Last active December 7, 2017 22:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save SteVwonder/a01ebf54fee1be74ecc9520465838c98 to your computer and use it in GitHub Desktop.
Save SteVwonder/a01ebf54fee1be74ecc9520465838c98 to your computer and use it in GitHub Desktop.
{
"levels": [
{
"cores_per_child": 36,
"num_children": 8
},
{
"cores_per_child": 1,
"num_children": 36
}
],
"num_levels": 3
}
#!/usr/bin/env python
import csv
import time
import json
import os
import sys
import argparse
import subprocess
import socket
import syslog
import math
import re
import psutil
import flux
from flux import kvs, jsc
from flux.rpc import RPC
from flux.core.inner import ffi, raw
ENV_FILTER = re.compile(r"^(SLURM_|FLUX_)")
def get_environment():
env = dict()
for key in os.environ:
if ENV_FILTER.match(key):
continue
env[key] = os.environ[key]
env.pop('HOSTNAME', None)
env.pop('ENVIRONMENT', None)
# Make MVAPICH behave...
env['MPIRUN_RSH_LAUNCH'] = '1'
# Pass some Flux variables through
env['FLUX_MODULE_PATH'] = os.environ['FLUX_MODULE_PATH']
if 'FLUX_SCHED_RC_NOOP' in os.environ:
env['FLUX_SCHED_RC_NOOP'] = os.environ['FLUX_SCHED_RC_NOOP']
return env
def build_cmd(args):
init_prog_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'initial_program.py')
print "Initial program path: {}".format(init_prog_path)
assert os.path.isfile(init_prog_path)
flux_cmd = ['flux', 'start']
broker_opts = ""
init_prog_cmd = [init_prog_path]
if args.log_dir:
init_prog_cmd.extend(['--log_dir', args.log_dir])
if args.persist_dir:
broker_opts = '-o,-Spersist-filesystem={}'.format(args.persist_dir)
if args.light_logging:
broker_opts += ",-Slog-forward-level=6"
init_prog_cmd.extend(['--persist_dir', args.persist_dir])
if args.results:
init_prog_cmd.extend(['--results', args.results])
if args.sched_plugin:
init_prog_cmd.extend(['--sched_plugin', args.sched_plugin])
if args.light_logging:
init_prog_cmd.append('--light_logging')
if args.verbose:
init_prog_cmd.append('--verbose')
init_prog_cmd.extend([args.child_config, '{}'.format(args.num_levels), 'child', '{}'.format(args.level+1), str(args.full_jobid)])
flux_cmd.append(broker_opts)
full_cmd = flux_cmd + init_prog_cmd
return full_cmd
def child_spec_generator(args):
child_cmd = build_cmd(args)
print "Full CMD: {}".format(child_cmd)
with open(args.child_config, 'r') as config_file:
json_obj = json.load(config_file)
assert args.level <= json_obj['num_levels']
if json_obj['num_levels'] == 1:
return
my_childrens_info = json_obj['levels'][args.level - 1]
ncpus = my_childrens_info['cores_per_child']
for _ in xrange(my_childrens_info['num_children']):
nnodes = int(math.ceil(ncpus / float(psutil.cpu_count(logical=False))))
environ = get_environment()
job_spec = {
'nnodes': nnodes,
'ntasks': ncpus,
'cmdline': child_cmd,
'environ': environ,
'cwd': os.getcwd(),
'walltime' : 60 * 60 * 24,
}
print "Creating childinstance with {} cpus and {} nodes".format(ncpus, nnodes)
yield job_spec
def submit_job(flux_handle, job_json_str):
resp = flux_handle.rpc_send('job.submit', job_json_str)
if resp is None:
raise RuntimeError("RPC response invalid")
if resp.get('errnum', None) is not None:
raise RuntimeError("Job creation failed with error code {}".format(
resp['errnum']))
job_id = resp['jobid']
print "Just submitted job #{}".format(job_id)
def load_sched_module(args):
''' Load the sched module into the enclosing flux instance '''
load_cmd = ['flux', 'module', 'load', 'sched']
if args.results:
load_cmd.append("resultsfolder={}".format(args.results))
if args.sched_plugin:
load_cmd.append("plugin={}".format(args.sched_plugin))
if args.verbose:
load_cmd.append('verbosity={}'.format(1))
print "Loading sched module: {}".format(load_cmd)
output = subprocess.check_output(load_cmd)
if len(output) > 0:
print output
def load_offload_module(args):
''' Load the offload module into the enclosing flux instance '''
load_cmd = ['flux', 'module', 'load', 'offload',
"num_levels={}".format(args.num_levels)]
if args.root:
load_cmd.append("root=1")
elif args.internal:
load_cmd.append("internal=1")
elif args.leaf:
load_cmd.append("leaf=1")
else:
raise RuntimeError("Unknown position within hierarchy")
print "Loading offload module: {}".format(load_cmd)
output = subprocess.check_output(load_cmd)
if len(output) > 0:
print output
def load_modules(args):
''' Loads all of the necessary modules into the enclosing flux instance '''
if args.child:
load_sched_module(args)
load_offload_module(args)
pending_jobs = []
running_jobs = []
completed_jobs = []
def get_jsc_cb(outstream):
fieldnames = ['id', 'nnodes', 'ntasks', 'starting-time', 'complete-time',
'walltime', 'is_hierarchical']
if outstream:
writer = csv.DictWriter(outstream, fieldnames)
writer.writeheader()
else:
writer = None
def jsc_cb(jcb_str, arg, errnum):
(flux_handle, args) = arg
jcb = json.loads(jcb_str)
nstate = jcb[jsc.JSC_STATE_PAIR][jsc.JSC_STATE_PAIR_NSTATE]
jobid = jcb['jobid']
value = jsc.job_num2state(nstate)
print "JSC Event - jobid: {}, value: {}".format(jobid, value)
if value == 'submitted':
pending_jobs.append(jobid)
elif value == 'running':
pending_jobs.remove(jobid)
running_jobs.append(jobid)
elif value == 'complete':
print "Job completed: {}".format(jobid)
running_jobs.remove(jobid)
completed_jobs.append(jobid)
jobdir_key = 'lwj.{}'.format(jobid)
complete_key = '{}.complete-time'.format(jobdir_key)
print "Looking for kvs entry {}, since job {} completed".format(complete_key, jobid)
while not kvs.exists(flux_handle, complete_key):
print "{} kvs entry not found, waiting for it to be created".format(complete_key)
time.sleep(1)
job_kvs = kvs.get_dir(flux_handle, jobdir_key)
rowdict = {}
for key in fieldnames:
try:
rowdict[key] = job_kvs[key]
except KeyError:
pass
rowdict['id'] = jobid
if writer:
writer.writerow(rowdict)
if len(completed_jobs) > 0 and len(running_jobs) == 0 and len(pending_jobs) == 0:
if args.child:
if should_exit_when_done:
print "All children are dead, and I already received the exit event, exiting"
flux_handle.reactor_stop(flux_handle.get_reactor())
else:
print "All children are dead, but I haven't received the exit event"
elif args.root:
print "All children are dead, exiting"
flux_handle.reactor_stop(flux_handle.get_reactor())
return jsc_cb
def no_new_jobs_cb(flux_handle, watcher, msg, args):
print "Received an event that there are no new jobs"
time_dict = args
time_dict['no_new_jobs_event'] = time.time()
should_exit_when_done = False
def exit_event_cb(flux_handle, watcher, msg, args):
global should_exit_when_done
print "Received an exit event"
if len(running_jobs) == 0 and len(pending_jobs) == 0:
if len(completed_jobs) > 0 or (len(completed_jobs) == 0 and args.leaf):
print "Received an event that I should exit and all jobs are done, exiting"
flux_handle.reactor_stop(flux_handle.get_reactor())
else:
print "Received an event that I should exit but not jobs have run yet, waiting"
should_exit_when_done = True
else:
print "Received an event that I should exit, but jobs still running/pending, waiting"
should_exit_when_done = True
def event_reactor_proc(flux_handle):
print "initial_program.py: Starting event reactor"
if flux_handle.reactor_run(flux_handle.get_reactor(), 0) < 0:
flux_handle.fatal_error("event_reactor_proc", "reactor run failed")
print "initial_program.py: Event reactor exited"
class Tee(object):
'''
Allows for printing to a file and flux's dmesg buffer simultaneously
Modeled after the Unix 'tee' comand
'''
def __init__(self, name, mode, buffering=None, flux_handle=None):
self.file = open(name, mode, buffering=buffering)
if buffering:
self.stdout = os.fdopen(sys.stdout.fileno(), 'w', buffering)
else:
self.stdout = sys.stdout
self.flux_handle = flux_handle
sys.stdout = self
def __del__(self):
sys.stdout = self.stdout
self.file.close()
def write(self, data):
self.file.write(data)
if self.flux_handle:
new_data = data.strip()
if len(new_data) > 0:
self.flux_handle.log(syslog.LOG_DEBUG, new_data)
def flush(self):
self.file.flush()
def setup_logging(args, flux_handle=None):
'''
Replace sys.stdout with an instance of Tee
Also set the enclosing broker to write out its logs to a file
'''
if args.log_dir:
filename = os.path.join(args.log_dir, "{}-initprog-pid{}.out".format(args.full_jobid, os.getpid()))
Tee(filename, 'w', buffering=0, flux_handle=flux_handle)
if flux_handle:
flux_handle.log_set_appname("init_prog")
else:
sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 0)
if args.log_dir:
log_filename = os.path.join(args.log_dir, "{}-broker.out".format(args.full_jobid))
setattr_cmd = ['flux', 'setattr', 'log-filename', log_filename]
subprocess.check_output(setattr_cmd)
def main():
print "Argv: ", sys.argv
parser = argparse.ArgumentParser()
parser.add_argument('child_config')
# can be calculated from child_config, but i'm in a hurry, this is easier
parser.add_argument('num_levels', type=int)
parser.add_argument('--results', '-o',
help="directory to store the results in")
parser.add_argument('--light_logging', action='store_true', help='only log "info" to filesystem and only print "errors" to stderr')
parser.add_argument('--log_dir', help="log stdout to a file (arg = directory)")
parser.add_argument('--persist_dir', help="directory to store flux's sqlite content db (arg = directory)")
parser.add_argument('--sched_plugin', help="which sched plugin to use")
parser.add_argument('--verbose', '-v', action='store_true')
subparsers = parser.add_subparsers()
root_parser = subparsers.add_parser('root')
root_parser.set_defaults(root=True, child=False,
prefix=None, level=1)
child_parser = subparsers.add_parser('child')
child_parser.set_defaults(child=True, root=False)
child_parser.add_argument('level', type=int, help="what level in the hierarchy (should be > 1)")
child_parser.add_argument('prefix',
help="name prefix, not including the id of "
"this job (e.g. 4.2.5)")
args = parser.parse_args()
if args.root:
args.local_jobid = "0"
args.full_jobid = "0"
args.internal = False
args.leaf = False
else:
assert args.level > 1
assert args.level <= args.num_levels
if args.level < args.num_levels:
args.internal = True
args.leaf = False
elif args.level == args.num_levels:
args.leaf = True
args.internal = False
args.local_jobid = str(int(os.environ['FLUX_JOB_ID']))
args.full_jobid = "{}.{}".format(args.prefix, args.local_jobid)
assert os.path.isfile(args.child_config)
if args.root:
assert os.path.isfile(args.child_config)
if args.results:
assert os.path.isdir(args.results)
if args.child and args.prefix:
for prefix_id in args.prefix.split('.'):
assert prefix_id.isdigit()
flux_handle = flux.Flux()
setup_logging(args, flux_handle)
if args.child:
print "Subscribing to init_prog.exit"
flux_handle.event_subscribe("init_prog.exit")
exit_event_watcher = flux_handle.msg_watcher_create(exit_event_cb,
type_mask=raw.FLUX_MSGTYPE_EVENT,
topic_glob="init_prog.exit",
args=args)
exit_event_watcher.start()
print "Loading modules"
load_modules(args)
print "Registering callback with JSC"
if args.results:
outfilename = os.path.join(args.results, "job-{}".format(args.full_jobid))
outfile = open(outfilename, 'wb', 0)
jsc.notify_status(flux_handle, get_jsc_cb(outfile), (flux_handle, args))
time_dict = {'init_prog_start' : time.time()}
if args.root or args.internal:
for child_spec in child_spec_generator(args):
submit_job(flux_handle, child_spec)
if args.root:
flux_handle.event_subscribe("no_new_jobs")
no_new_jobs_watcher = flux_handle.msg_watcher_create(no_new_jobs_cb,
type_mask=raw.FLUX_MSGTYPE_EVENT,
topic_glob="no_new_jobs",
args=time_dict)
no_new_jobs_watcher.start()
uq_controller_path = os.path.join(os.environ['HOME'], 'Repositories/UQP/uq_controller.py')
uqp_pid = os.spawnvp(os.P_NOWAIT, "python", ['python', uq_controller_path, 'problem'])
if args.root:
print "Root node within hierarchy"
elif args.leaf:
print "Leaf node within hierarchy"
elif args.internal:
print "Internal node within hierarchy"
event_reactor_proc(flux_handle)
if args.root:
time_dict['init_prog_end'] = time.time()
if args.results:
with open(os.path.join(args.results, "init_prog.timer"), 'w') as time_outfile:
json.dump(time_dict, time_outfile)
os.waitpid(uqp_pid, 0)
time_dict['uqp_end'] = time.time()
if args.results:
with open(os.path.join(args.results, "init_prog.timer"), 'w') as time_outfile:
json.dump(time_dict, time_outfile)
if outfile:
outfile.close()
if __name__ == "__main__":
main()
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <libgen.h>
#include <errno.h>
#include <libgen.h>
#include <czmq.h>
#include <dlfcn.h>
#include <stdbool.h>
#include <flux/core.h>
#include "src/common/libutil/log.h"
#include "src/common/libutil/shortjson.h"
#include "src/common/libutil/xzmalloc.h"
typedef struct {
flux_t h;
flux_t parent_h;
zlist_t *job_queue;
zlist_t *request_for_work_queue;
bool root_node;
bool leaf_node;
bool internal_node;
bool child_should_exit;
bool no_new_work;
int num_levels;
} ctx_t;
int request_work_from_parent (ctx_t *ctx);
static void freectx (void *arg)
{
ctx_t *ctx = arg;
zlist_destroy(&ctx->job_queue);
zlist_destroy(&ctx->request_for_work_queue);
free (ctx);
}
static ctx_t *getctx (flux_t h)
{
ctx_t *ctx = (ctx_t *)flux_aux_get (h, "offload");
if (!ctx) {
ctx = malloc (sizeof (*ctx));
ctx->h = h;
ctx->root_node = false;
ctx->leaf_node = false;
ctx->internal_node = false;
ctx->child_should_exit = false;
ctx->no_new_work = false;
ctx->num_levels = -1;
ctx->job_queue = zlist_new ();
ctx->request_for_work_queue = zlist_new ();
flux_aux_set (h, "offload", ctx, freectx);
}
return ctx;
}
int process_args (int argc, char **argv, ctx_t *ctx)
{
int i = 0;
for (i = 0; i < argc; i++) {
if (!strncmp ("root=", argv[i], sizeof ("root"))) {
ctx->root_node = atoi (strstr (argv[i], "=") + 1);
} else if (!strncmp ("leaf=", argv[i], sizeof ("leaf"))) {
ctx->leaf_node = atoi (strstr (argv[i], "=") + 1);
} else if (!strncmp ("internal=", argv[i], sizeof ("internal"))) {
ctx->internal_node = atoi (strstr (argv[i], "=") + 1);
} else if (!strncmp ("num_levels=", argv[i], sizeof ("internal"))) {
ctx->num_levels = atoi (strstr (argv[i], "=") + 1);
} else {
errno = EINVAL;
return -1;
}
}
if (!(ctx->leaf_node || ctx->root_node || ctx->internal_node)) {
flux_log (ctx->h, LOG_ERR,
"%s: position within hierarchy not specified", __FUNCTION__);
return -1;
}
if ((ctx->leaf_node + ctx->root_node + ctx->internal_node) > 1) {
flux_log (ctx->h, LOG_ERR,
"%s: too many positions within hierarchy specified", __FUNCTION__);
return -1;
}
if (ctx->leaf_node) {
flux_log (ctx->h, LOG_DEBUG,
"%s: leaf node within hierarchy", __FUNCTION__);
} else if (ctx->root_node) {
flux_log (ctx->h, LOG_DEBUG,
"%s: root node within hierarchy", __FUNCTION__);
} else if (ctx->internal_node) {
flux_log (ctx->h, LOG_DEBUG,
"%s: internal node within hierarchy", __FUNCTION__);
}
return 0;
}
static void submit_job(flux_t h, const char *job_spec_str)
{
flux_log (h, LOG_DEBUG, "%s: sending job to local scheduler", __FUNCTION__);
flux_rpc_t *rpc = flux_rpc (h, "job.submit", job_spec_str, FLUX_NODEID_ANY, 0);
char *job_submit_response = NULL;
flux_rpc_get (rpc, NULL, &job_spec_str);
flux_rpc_destroy (rpc);
}
static void respond_with_job_and_free (flux_t h, const flux_msg_t *msg, char *job_spec_str)
{
flux_log (h, LOG_DEBUG, "%s: responding with a job and then free'ing the job spec", __FUNCTION__);
if (flux_respond (h, msg, 0, job_spec_str) < 0) {
flux_log (h, LOG_ERR, "%s: flux_respond", __FUNCTION__);
}
free (job_spec_str);
}
static void new_job_cb (flux_t h, flux_msg_handler_t *w, const flux_msg_t *msg, void *arg)
{
ctx_t *ctx = arg;
const char *job_spec_str = NULL;
if (flux_msg_get_payload_json (msg, &job_spec_str) < 0) {
flux_log (h, LOG_ERR, "%s: bad message", __FUNCTION__);
return;
}
flux_log (h, LOG_DEBUG, "%s: received a new job, enqueuing", __FUNCTION__);
zlist_append (ctx->job_queue, strdup (job_spec_str));
flux_log (h, LOG_DEBUG, "%s: %zu jobs are now in the job_queue", __FUNCTION__, zlist_size (ctx->job_queue));
flux_respond (h, msg, 0, "{\"jobid\":1}");
}
void send_no_more_work_response (flux_t h, const flux_msg_t *msg)
{
const char *job_spec_str = "{\"no_more_work\":\"true\"}";
if (flux_respond (h, msg, 0, job_spec_str) < 0) {
flux_log (h, LOG_ERR, "%s: flux_respond", __FUNCTION__);
}
}
static void child_needs_a_job_cb (flux_t h, flux_msg_handler_t *w, const flux_msg_t *msg, void *arg)
{
ctx_t *ctx = arg;
flux_log (h, LOG_DEBUG, "%s: received a request for work", __FUNCTION__);
if (ctx->no_new_work) {
if (zlist_size (ctx->job_queue) == 0) {
flux_log (h, LOG_DEBUG, "%s: no outstanding jobs for running, killing child", __FUNCTION__);
send_no_more_work_response (h, msg);
} else {
flux_log (h, LOG_DEBUG, "%s: Jobs are queued, responding immediately to request for work", __FUNCTION__);
respond_with_job_and_free (h, msg, zlist_pop (ctx->job_queue));
flux_log (h, LOG_DEBUG, "%s: %zu jobs are now in the job_queue", __FUNCTION__, zlist_size (ctx->job_queue));
}
} else {
flux_log (h, LOG_DEBUG, "%s: jobs not all submitted yet, queueing work request", __FUNCTION__);
zlist_append (ctx->request_for_work_queue, flux_msg_copy(msg, false));
}
}
static void handle_all_queued_work_requests (ctx_t *ctx) {
flux_t h = ctx->h;
flux_msg_t *curr_work_req_msg = NULL;
for (curr_work_req_msg = zlist_first (ctx->request_for_work_queue);
curr_work_req_msg;
curr_work_req_msg = zlist_next (ctx->request_for_work_queue))
{
char *job_spec_str = zlist_pop (ctx->job_queue);
if (job_spec_str) {
respond_with_job_and_free (h, curr_work_req_msg, job_spec_str);
flux_log (h, LOG_DEBUG, "%s: %zu jobs are now in the job_queue", __FUNCTION__, zlist_size (ctx->job_queue));
} else {
send_no_more_work_response (h, curr_work_req_msg);
}
flux_msg_destroy (curr_work_req_msg);
}
}
static void no_new_jobs_cb (flux_t h, flux_msg_handler_t *w, const flux_msg_t *msg, void *arg)
{
ctx_t *ctx = arg;
flux_log (h, LOG_DEBUG, "%s: received a message that no new jobs will be submitted", __FUNCTION__);
ctx->no_new_work = true;
assert (ctx->root_node);
if (ctx->num_levels == 1) { //we are the only scheduler
flux_log (h, LOG_DEBUG, "%s: the root sched is the only sched, submitting all jobs to the local scheduler", __FUNCTION__);
char *job_spec_str = NULL;
for (job_spec_str = zlist_pop (ctx->job_queue);
job_spec_str;
job_spec_str = zlist_pop (ctx->job_queue))
{
submit_job (h, job_spec_str);
flux_log (h, LOG_DEBUG, "%s: %zu jobs are now in the job_queue", __FUNCTION__, zlist_size (ctx->job_queue));
free (job_spec_str);
}
} else {
handle_all_queued_work_requests(ctx);
}
}
static void child_recv_work_cb (flux_rpc_t *rpc, void *arg)
{
ctx_t *ctx = arg;
const char *job_spec_str = NULL;
flux_log (ctx->h, LOG_DEBUG, "%s: received work request response from parent, calling rpc_get", __FUNCTION__);
flux_rpc_get (rpc, NULL, &job_spec_str);
flux_log (ctx->h, LOG_DEBUG, "%s: successfully ran rpc_get", __FUNCTION__);
JSON response = Jfromstr (job_spec_str);
bool no_more_work = false;
if (!Jget_bool (response, "no_more_work", &no_more_work)) {
// key doesn't exist, so its probably a job spec
flux_log (ctx->h, LOG_DEBUG, "%s: no_more_work not in response, probably a job spec", __FUNCTION__);
no_more_work = false;
}
Jput (response);
if (no_more_work) {
flux_log (ctx->h, LOG_DEBUG, "%s: parent has informed us there will be no more work", __FUNCTION__);
if (ctx->internal_node) {
handle_all_queued_work_requests(ctx);
}
ctx->no_new_work = true;
ctx->child_should_exit = true;
flux_msg_t *exit_event = flux_event_encode("init_prog.exit", NULL);
flux_send (ctx->h, exit_event, 0);
} else {
if (ctx->internal_node) {
flux_log (ctx->h, LOG_DEBUG, "%s: internal node received new work, enqueueing ", __FUNCTION__);
zlist_append (ctx->job_queue, strdup (job_spec_str));
flux_log (ctx->h, LOG_DEBUG, "%s: %zu jobs are now in the job_queue", __FUNCTION__, zlist_size (ctx->job_queue));
} else if (ctx->leaf_node) {
flux_log (ctx->h, LOG_DEBUG, "%s: leaf node received new work, submitting ", __FUNCTION__);
submit_job (ctx->h, job_spec_str);
}
flux_log (ctx->h, LOG_DEBUG, "%s: requesting more work from parent", __FUNCTION__);
request_work_from_parent (ctx);
}
}
int request_work_from_parent (ctx_t *ctx)
{
flux_log (ctx->h, LOG_DEBUG, "requesting work from parent");
flux_rpc_t *rpc = flux_rpc (ctx->parent_h, "offload.need_job", NULL, FLUX_NODEID_ANY, 0);
if (flux_rpc_then (rpc, child_recv_work_cb, ctx) < 0) {
flux_log (ctx->h, LOG_ERR, "%s: rpc_then", __FUNCTION__);
return -1;
}
return 0;
}
struct flux_msg_handler_spec root_htab[] = {
{FLUX_MSGTYPE_REQUEST, "offload.new_job", new_job_cb},
{FLUX_MSGTYPE_REQUEST, "offload.need_job", child_needs_a_job_cb},
{FLUX_MSGTYPE_EVENT, "no_new_jobs", no_new_jobs_cb},
FLUX_MSGHANDLER_TABLE_END,
};
struct flux_msg_handler_spec internal_htab[] = {
{FLUX_MSGTYPE_REQUEST, "offload.new_job", new_job_cb},
{FLUX_MSGTYPE_REQUEST, "offload.need_job", child_needs_a_job_cb},
FLUX_MSGHANDLER_TABLE_END,
};
int mod_main (flux_t h, int argc, char **argv)
{
int rc = -1;
uint32_t rank = 1;
ctx_t *ctx = getctx (h);
if (flux_get_rank (h, &rank)) {
flux_log (h, LOG_ERR, "failed to determine rank");
goto done;
} else if (process_args (argc, argv, ctx) != 0) {
flux_log (h, LOG_ERR, "can't process module args");
goto done;
}
flux_log (h, LOG_DEBUG, "offload module is running on rank %d", rank);
if (ctx->root_node) {
flux_event_subscribe(h, "no_new_jobs");
if (flux_msg_handler_addvec (h, root_htab, (void *)ctx) < 0) {
flux_log (h, LOG_ERR, "flux_msg_handler_addvec: %s", strerror (errno));
goto done;
}
} else if (ctx->internal_node) {
if (flux_msg_handler_addvec (h, internal_htab, (void *)ctx) < 0) {
flux_log (h, LOG_ERR, "flux_msg_handler_addvec: %s", strerror (errno));
goto done;
}
}
if (ctx->leaf_node || ctx->internal_node) {
flux_log (h, LOG_DEBUG, "opening handle to parent");
ctx->parent_h = flux_open(flux_attr_get(ctx->h, "parent-uri", NULL), FLUX_O_TRACE);
flux_set_reactor(ctx->parent_h, flux_get_reactor(ctx->h));
request_work_from_parent (ctx);
}
if (flux_reactor_run (flux_get_reactor (h), 0) < 0) {
flux_log (h, LOG_ERR, "flux_reactor_run: %s", strerror (errno));
goto done;
}
rc = 0;
done:
return rc;
}
MOD_NAME ("offload");
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment