Last active
December 7, 2017 22:26
-
-
Save SteVwonder/a01ebf54fee1be74ecc9520465838c98 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"levels": [ | |
{ | |
"cores_per_child": 36, | |
"num_children": 8 | |
}, | |
{ | |
"cores_per_child": 1, | |
"num_children": 36 | |
} | |
], | |
"num_levels": 3 | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import csv | |
import time | |
import json | |
import os | |
import sys | |
import argparse | |
import subprocess | |
import socket | |
import syslog | |
import math | |
import re | |
import psutil | |
import flux | |
from flux import kvs, jsc | |
from flux.rpc import RPC | |
from flux.core.inner import ffi, raw | |
ENV_FILTER = re.compile(r"^(SLURM_|FLUX_)") | |
def get_environment(): | |
env = dict() | |
for key in os.environ: | |
if ENV_FILTER.match(key): | |
continue | |
env[key] = os.environ[key] | |
env.pop('HOSTNAME', None) | |
env.pop('ENVIRONMENT', None) | |
# Make MVAPICH behave... | |
env['MPIRUN_RSH_LAUNCH'] = '1' | |
# Pass some Flux variables through | |
env['FLUX_MODULE_PATH'] = os.environ['FLUX_MODULE_PATH'] | |
if 'FLUX_SCHED_RC_NOOP' in os.environ: | |
env['FLUX_SCHED_RC_NOOP'] = os.environ['FLUX_SCHED_RC_NOOP'] | |
return env | |
def build_cmd(args): | |
init_prog_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'initial_program.py') | |
print "Initial program path: {}".format(init_prog_path) | |
assert os.path.isfile(init_prog_path) | |
flux_cmd = ['flux', 'start'] | |
broker_opts = "" | |
init_prog_cmd = [init_prog_path] | |
if args.log_dir: | |
init_prog_cmd.extend(['--log_dir', args.log_dir]) | |
if args.persist_dir: | |
broker_opts = '-o,-Spersist-filesystem={}'.format(args.persist_dir) | |
if args.light_logging: | |
broker_opts += ",-Slog-forward-level=6" | |
init_prog_cmd.extend(['--persist_dir', args.persist_dir]) | |
if args.results: | |
init_prog_cmd.extend(['--results', args.results]) | |
if args.sched_plugin: | |
init_prog_cmd.extend(['--sched_plugin', args.sched_plugin]) | |
if args.light_logging: | |
init_prog_cmd.append('--light_logging') | |
if args.verbose: | |
init_prog_cmd.append('--verbose') | |
init_prog_cmd.extend([args.child_config, '{}'.format(args.num_levels), 'child', '{}'.format(args.level+1), str(args.full_jobid)]) | |
flux_cmd.append(broker_opts) | |
full_cmd = flux_cmd + init_prog_cmd | |
return full_cmd | |
def child_spec_generator(args): | |
child_cmd = build_cmd(args) | |
print "Full CMD: {}".format(child_cmd) | |
with open(args.child_config, 'r') as config_file: | |
json_obj = json.load(config_file) | |
assert args.level <= json_obj['num_levels'] | |
if json_obj['num_levels'] == 1: | |
return | |
my_childrens_info = json_obj['levels'][args.level - 1] | |
ncpus = my_childrens_info['cores_per_child'] | |
for _ in xrange(my_childrens_info['num_children']): | |
nnodes = int(math.ceil(ncpus / float(psutil.cpu_count(logical=False)))) | |
environ = get_environment() | |
job_spec = { | |
'nnodes': nnodes, | |
'ntasks': ncpus, | |
'cmdline': child_cmd, | |
'environ': environ, | |
'cwd': os.getcwd(), | |
'walltime' : 60 * 60 * 24, | |
} | |
print "Creating childinstance with {} cpus and {} nodes".format(ncpus, nnodes) | |
yield job_spec | |
def submit_job(flux_handle, job_json_str): | |
resp = flux_handle.rpc_send('job.submit', job_json_str) | |
if resp is None: | |
raise RuntimeError("RPC response invalid") | |
if resp.get('errnum', None) is not None: | |
raise RuntimeError("Job creation failed with error code {}".format( | |
resp['errnum'])) | |
job_id = resp['jobid'] | |
print "Just submitted job #{}".format(job_id) | |
def load_sched_module(args): | |
''' Load the sched module into the enclosing flux instance ''' | |
load_cmd = ['flux', 'module', 'load', 'sched'] | |
if args.results: | |
load_cmd.append("resultsfolder={}".format(args.results)) | |
if args.sched_plugin: | |
load_cmd.append("plugin={}".format(args.sched_plugin)) | |
if args.verbose: | |
load_cmd.append('verbosity={}'.format(1)) | |
print "Loading sched module: {}".format(load_cmd) | |
output = subprocess.check_output(load_cmd) | |
if len(output) > 0: | |
print output | |
def load_offload_module(args): | |
''' Load the offload module into the enclosing flux instance ''' | |
load_cmd = ['flux', 'module', 'load', 'offload', | |
"num_levels={}".format(args.num_levels)] | |
if args.root: | |
load_cmd.append("root=1") | |
elif args.internal: | |
load_cmd.append("internal=1") | |
elif args.leaf: | |
load_cmd.append("leaf=1") | |
else: | |
raise RuntimeError("Unknown position within hierarchy") | |
print "Loading offload module: {}".format(load_cmd) | |
output = subprocess.check_output(load_cmd) | |
if len(output) > 0: | |
print output | |
def load_modules(args): | |
''' Loads all of the necessary modules into the enclosing flux instance ''' | |
if args.child: | |
load_sched_module(args) | |
load_offload_module(args) | |
pending_jobs = [] | |
running_jobs = [] | |
completed_jobs = [] | |
def get_jsc_cb(outstream): | |
fieldnames = ['id', 'nnodes', 'ntasks', 'starting-time', 'complete-time', | |
'walltime', 'is_hierarchical'] | |
if outstream: | |
writer = csv.DictWriter(outstream, fieldnames) | |
writer.writeheader() | |
else: | |
writer = None | |
def jsc_cb(jcb_str, arg, errnum): | |
(flux_handle, args) = arg | |
jcb = json.loads(jcb_str) | |
nstate = jcb[jsc.JSC_STATE_PAIR][jsc.JSC_STATE_PAIR_NSTATE] | |
jobid = jcb['jobid'] | |
value = jsc.job_num2state(nstate) | |
print "JSC Event - jobid: {}, value: {}".format(jobid, value) | |
if value == 'submitted': | |
pending_jobs.append(jobid) | |
elif value == 'running': | |
pending_jobs.remove(jobid) | |
running_jobs.append(jobid) | |
elif value == 'complete': | |
print "Job completed: {}".format(jobid) | |
running_jobs.remove(jobid) | |
completed_jobs.append(jobid) | |
jobdir_key = 'lwj.{}'.format(jobid) | |
complete_key = '{}.complete-time'.format(jobdir_key) | |
print "Looking for kvs entry {}, since job {} completed".format(complete_key, jobid) | |
while not kvs.exists(flux_handle, complete_key): | |
print "{} kvs entry not found, waiting for it to be created".format(complete_key) | |
time.sleep(1) | |
job_kvs = kvs.get_dir(flux_handle, jobdir_key) | |
rowdict = {} | |
for key in fieldnames: | |
try: | |
rowdict[key] = job_kvs[key] | |
except KeyError: | |
pass | |
rowdict['id'] = jobid | |
if writer: | |
writer.writerow(rowdict) | |
if len(completed_jobs) > 0 and len(running_jobs) == 0 and len(pending_jobs) == 0: | |
if args.child: | |
if should_exit_when_done: | |
print "All children are dead, and I already received the exit event, exiting" | |
flux_handle.reactor_stop(flux_handle.get_reactor()) | |
else: | |
print "All children are dead, but I haven't received the exit event" | |
elif args.root: | |
print "All children are dead, exiting" | |
flux_handle.reactor_stop(flux_handle.get_reactor()) | |
return jsc_cb | |
def no_new_jobs_cb(flux_handle, watcher, msg, args): | |
print "Received an event that there are no new jobs" | |
time_dict = args | |
time_dict['no_new_jobs_event'] = time.time() | |
should_exit_when_done = False | |
def exit_event_cb(flux_handle, watcher, msg, args): | |
global should_exit_when_done | |
print "Received an exit event" | |
if len(running_jobs) == 0 and len(pending_jobs) == 0: | |
if len(completed_jobs) > 0 or (len(completed_jobs) == 0 and args.leaf): | |
print "Received an event that I should exit and all jobs are done, exiting" | |
flux_handle.reactor_stop(flux_handle.get_reactor()) | |
else: | |
print "Received an event that I should exit but not jobs have run yet, waiting" | |
should_exit_when_done = True | |
else: | |
print "Received an event that I should exit, but jobs still running/pending, waiting" | |
should_exit_when_done = True | |
def event_reactor_proc(flux_handle): | |
print "initial_program.py: Starting event reactor" | |
if flux_handle.reactor_run(flux_handle.get_reactor(), 0) < 0: | |
flux_handle.fatal_error("event_reactor_proc", "reactor run failed") | |
print "initial_program.py: Event reactor exited" | |
class Tee(object): | |
''' | |
Allows for printing to a file and flux's dmesg buffer simultaneously | |
Modeled after the Unix 'tee' comand | |
''' | |
def __init__(self, name, mode, buffering=None, flux_handle=None): | |
self.file = open(name, mode, buffering=buffering) | |
if buffering: | |
self.stdout = os.fdopen(sys.stdout.fileno(), 'w', buffering) | |
else: | |
self.stdout = sys.stdout | |
self.flux_handle = flux_handle | |
sys.stdout = self | |
def __del__(self): | |
sys.stdout = self.stdout | |
self.file.close() | |
def write(self, data): | |
self.file.write(data) | |
if self.flux_handle: | |
new_data = data.strip() | |
if len(new_data) > 0: | |
self.flux_handle.log(syslog.LOG_DEBUG, new_data) | |
def flush(self): | |
self.file.flush() | |
def setup_logging(args, flux_handle=None): | |
''' | |
Replace sys.stdout with an instance of Tee | |
Also set the enclosing broker to write out its logs to a file | |
''' | |
if args.log_dir: | |
filename = os.path.join(args.log_dir, "{}-initprog-pid{}.out".format(args.full_jobid, os.getpid())) | |
Tee(filename, 'w', buffering=0, flux_handle=flux_handle) | |
if flux_handle: | |
flux_handle.log_set_appname("init_prog") | |
else: | |
sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 0) | |
if args.log_dir: | |
log_filename = os.path.join(args.log_dir, "{}-broker.out".format(args.full_jobid)) | |
setattr_cmd = ['flux', 'setattr', 'log-filename', log_filename] | |
subprocess.check_output(setattr_cmd) | |
def main(): | |
print "Argv: ", sys.argv | |
parser = argparse.ArgumentParser() | |
parser.add_argument('child_config') | |
# can be calculated from child_config, but i'm in a hurry, this is easier | |
parser.add_argument('num_levels', type=int) | |
parser.add_argument('--results', '-o', | |
help="directory to store the results in") | |
parser.add_argument('--light_logging', action='store_true', help='only log "info" to filesystem and only print "errors" to stderr') | |
parser.add_argument('--log_dir', help="log stdout to a file (arg = directory)") | |
parser.add_argument('--persist_dir', help="directory to store flux's sqlite content db (arg = directory)") | |
parser.add_argument('--sched_plugin', help="which sched plugin to use") | |
parser.add_argument('--verbose', '-v', action='store_true') | |
subparsers = parser.add_subparsers() | |
root_parser = subparsers.add_parser('root') | |
root_parser.set_defaults(root=True, child=False, | |
prefix=None, level=1) | |
child_parser = subparsers.add_parser('child') | |
child_parser.set_defaults(child=True, root=False) | |
child_parser.add_argument('level', type=int, help="what level in the hierarchy (should be > 1)") | |
child_parser.add_argument('prefix', | |
help="name prefix, not including the id of " | |
"this job (e.g. 4.2.5)") | |
args = parser.parse_args() | |
if args.root: | |
args.local_jobid = "0" | |
args.full_jobid = "0" | |
args.internal = False | |
args.leaf = False | |
else: | |
assert args.level > 1 | |
assert args.level <= args.num_levels | |
if args.level < args.num_levels: | |
args.internal = True | |
args.leaf = False | |
elif args.level == args.num_levels: | |
args.leaf = True | |
args.internal = False | |
args.local_jobid = str(int(os.environ['FLUX_JOB_ID'])) | |
args.full_jobid = "{}.{}".format(args.prefix, args.local_jobid) | |
assert os.path.isfile(args.child_config) | |
if args.root: | |
assert os.path.isfile(args.child_config) | |
if args.results: | |
assert os.path.isdir(args.results) | |
if args.child and args.prefix: | |
for prefix_id in args.prefix.split('.'): | |
assert prefix_id.isdigit() | |
flux_handle = flux.Flux() | |
setup_logging(args, flux_handle) | |
if args.child: | |
print "Subscribing to init_prog.exit" | |
flux_handle.event_subscribe("init_prog.exit") | |
exit_event_watcher = flux_handle.msg_watcher_create(exit_event_cb, | |
type_mask=raw.FLUX_MSGTYPE_EVENT, | |
topic_glob="init_prog.exit", | |
args=args) | |
exit_event_watcher.start() | |
print "Loading modules" | |
load_modules(args) | |
print "Registering callback with JSC" | |
if args.results: | |
outfilename = os.path.join(args.results, "job-{}".format(args.full_jobid)) | |
outfile = open(outfilename, 'wb', 0) | |
jsc.notify_status(flux_handle, get_jsc_cb(outfile), (flux_handle, args)) | |
time_dict = {'init_prog_start' : time.time()} | |
if args.root or args.internal: | |
for child_spec in child_spec_generator(args): | |
submit_job(flux_handle, child_spec) | |
if args.root: | |
flux_handle.event_subscribe("no_new_jobs") | |
no_new_jobs_watcher = flux_handle.msg_watcher_create(no_new_jobs_cb, | |
type_mask=raw.FLUX_MSGTYPE_EVENT, | |
topic_glob="no_new_jobs", | |
args=time_dict) | |
no_new_jobs_watcher.start() | |
uq_controller_path = os.path.join(os.environ['HOME'], 'Repositories/UQP/uq_controller.py') | |
uqp_pid = os.spawnvp(os.P_NOWAIT, "python", ['python', uq_controller_path, 'problem']) | |
if args.root: | |
print "Root node within hierarchy" | |
elif args.leaf: | |
print "Leaf node within hierarchy" | |
elif args.internal: | |
print "Internal node within hierarchy" | |
event_reactor_proc(flux_handle) | |
if args.root: | |
time_dict['init_prog_end'] = time.time() | |
if args.results: | |
with open(os.path.join(args.results, "init_prog.timer"), 'w') as time_outfile: | |
json.dump(time_dict, time_outfile) | |
os.waitpid(uqp_pid, 0) | |
time_dict['uqp_end'] = time.time() | |
if args.results: | |
with open(os.path.join(args.results, "init_prog.timer"), 'w') as time_outfile: | |
json.dump(time_dict, time_outfile) | |
if outfile: | |
outfile.close() | |
if __name__ == "__main__": | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <stdio.h> | |
#include <stdlib.h> | |
#include <string.h> | |
#include <libgen.h> | |
#include <errno.h> | |
#include <libgen.h> | |
#include <czmq.h> | |
#include <dlfcn.h> | |
#include <stdbool.h> | |
#include <flux/core.h> | |
#include "src/common/libutil/log.h" | |
#include "src/common/libutil/shortjson.h" | |
#include "src/common/libutil/xzmalloc.h" | |
typedef struct { | |
flux_t h; | |
flux_t parent_h; | |
zlist_t *job_queue; | |
zlist_t *request_for_work_queue; | |
bool root_node; | |
bool leaf_node; | |
bool internal_node; | |
bool child_should_exit; | |
bool no_new_work; | |
int num_levels; | |
} ctx_t; | |
int request_work_from_parent (ctx_t *ctx); | |
static void freectx (void *arg) | |
{ | |
ctx_t *ctx = arg; | |
zlist_destroy(&ctx->job_queue); | |
zlist_destroy(&ctx->request_for_work_queue); | |
free (ctx); | |
} | |
static ctx_t *getctx (flux_t h) | |
{ | |
ctx_t *ctx = (ctx_t *)flux_aux_get (h, "offload"); | |
if (!ctx) { | |
ctx = malloc (sizeof (*ctx)); | |
ctx->h = h; | |
ctx->root_node = false; | |
ctx->leaf_node = false; | |
ctx->internal_node = false; | |
ctx->child_should_exit = false; | |
ctx->no_new_work = false; | |
ctx->num_levels = -1; | |
ctx->job_queue = zlist_new (); | |
ctx->request_for_work_queue = zlist_new (); | |
flux_aux_set (h, "offload", ctx, freectx); | |
} | |
return ctx; | |
} | |
int process_args (int argc, char **argv, ctx_t *ctx) | |
{ | |
int i = 0; | |
for (i = 0; i < argc; i++) { | |
if (!strncmp ("root=", argv[i], sizeof ("root"))) { | |
ctx->root_node = atoi (strstr (argv[i], "=") + 1); | |
} else if (!strncmp ("leaf=", argv[i], sizeof ("leaf"))) { | |
ctx->leaf_node = atoi (strstr (argv[i], "=") + 1); | |
} else if (!strncmp ("internal=", argv[i], sizeof ("internal"))) { | |
ctx->internal_node = atoi (strstr (argv[i], "=") + 1); | |
} else if (!strncmp ("num_levels=", argv[i], sizeof ("internal"))) { | |
ctx->num_levels = atoi (strstr (argv[i], "=") + 1); | |
} else { | |
errno = EINVAL; | |
return -1; | |
} | |
} | |
if (!(ctx->leaf_node || ctx->root_node || ctx->internal_node)) { | |
flux_log (ctx->h, LOG_ERR, | |
"%s: position within hierarchy not specified", __FUNCTION__); | |
return -1; | |
} | |
if ((ctx->leaf_node + ctx->root_node + ctx->internal_node) > 1) { | |
flux_log (ctx->h, LOG_ERR, | |
"%s: too many positions within hierarchy specified", __FUNCTION__); | |
return -1; | |
} | |
if (ctx->leaf_node) { | |
flux_log (ctx->h, LOG_DEBUG, | |
"%s: leaf node within hierarchy", __FUNCTION__); | |
} else if (ctx->root_node) { | |
flux_log (ctx->h, LOG_DEBUG, | |
"%s: root node within hierarchy", __FUNCTION__); | |
} else if (ctx->internal_node) { | |
flux_log (ctx->h, LOG_DEBUG, | |
"%s: internal node within hierarchy", __FUNCTION__); | |
} | |
return 0; | |
} | |
static void submit_job(flux_t h, const char *job_spec_str) | |
{ | |
flux_log (h, LOG_DEBUG, "%s: sending job to local scheduler", __FUNCTION__); | |
flux_rpc_t *rpc = flux_rpc (h, "job.submit", job_spec_str, FLUX_NODEID_ANY, 0); | |
char *job_submit_response = NULL; | |
flux_rpc_get (rpc, NULL, &job_spec_str); | |
flux_rpc_destroy (rpc); | |
} | |
static void respond_with_job_and_free (flux_t h, const flux_msg_t *msg, char *job_spec_str) | |
{ | |
flux_log (h, LOG_DEBUG, "%s: responding with a job and then free'ing the job spec", __FUNCTION__); | |
if (flux_respond (h, msg, 0, job_spec_str) < 0) { | |
flux_log (h, LOG_ERR, "%s: flux_respond", __FUNCTION__); | |
} | |
free (job_spec_str); | |
} | |
static void new_job_cb (flux_t h, flux_msg_handler_t *w, const flux_msg_t *msg, void *arg) | |
{ | |
ctx_t *ctx = arg; | |
const char *job_spec_str = NULL; | |
if (flux_msg_get_payload_json (msg, &job_spec_str) < 0) { | |
flux_log (h, LOG_ERR, "%s: bad message", __FUNCTION__); | |
return; | |
} | |
flux_log (h, LOG_DEBUG, "%s: received a new job, enqueuing", __FUNCTION__); | |
zlist_append (ctx->job_queue, strdup (job_spec_str)); | |
flux_log (h, LOG_DEBUG, "%s: %zu jobs are now in the job_queue", __FUNCTION__, zlist_size (ctx->job_queue)); | |
flux_respond (h, msg, 0, "{\"jobid\":1}"); | |
} | |
void send_no_more_work_response (flux_t h, const flux_msg_t *msg) | |
{ | |
const char *job_spec_str = "{\"no_more_work\":\"true\"}"; | |
if (flux_respond (h, msg, 0, job_spec_str) < 0) { | |
flux_log (h, LOG_ERR, "%s: flux_respond", __FUNCTION__); | |
} | |
} | |
static void child_needs_a_job_cb (flux_t h, flux_msg_handler_t *w, const flux_msg_t *msg, void *arg) | |
{ | |
ctx_t *ctx = arg; | |
flux_log (h, LOG_DEBUG, "%s: received a request for work", __FUNCTION__); | |
if (ctx->no_new_work) { | |
if (zlist_size (ctx->job_queue) == 0) { | |
flux_log (h, LOG_DEBUG, "%s: no outstanding jobs for running, killing child", __FUNCTION__); | |
send_no_more_work_response (h, msg); | |
} else { | |
flux_log (h, LOG_DEBUG, "%s: Jobs are queued, responding immediately to request for work", __FUNCTION__); | |
respond_with_job_and_free (h, msg, zlist_pop (ctx->job_queue)); | |
flux_log (h, LOG_DEBUG, "%s: %zu jobs are now in the job_queue", __FUNCTION__, zlist_size (ctx->job_queue)); | |
} | |
} else { | |
flux_log (h, LOG_DEBUG, "%s: jobs not all submitted yet, queueing work request", __FUNCTION__); | |
zlist_append (ctx->request_for_work_queue, flux_msg_copy(msg, false)); | |
} | |
} | |
static void handle_all_queued_work_requests (ctx_t *ctx) { | |
flux_t h = ctx->h; | |
flux_msg_t *curr_work_req_msg = NULL; | |
for (curr_work_req_msg = zlist_first (ctx->request_for_work_queue); | |
curr_work_req_msg; | |
curr_work_req_msg = zlist_next (ctx->request_for_work_queue)) | |
{ | |
char *job_spec_str = zlist_pop (ctx->job_queue); | |
if (job_spec_str) { | |
respond_with_job_and_free (h, curr_work_req_msg, job_spec_str); | |
flux_log (h, LOG_DEBUG, "%s: %zu jobs are now in the job_queue", __FUNCTION__, zlist_size (ctx->job_queue)); | |
} else { | |
send_no_more_work_response (h, curr_work_req_msg); | |
} | |
flux_msg_destroy (curr_work_req_msg); | |
} | |
} | |
static void no_new_jobs_cb (flux_t h, flux_msg_handler_t *w, const flux_msg_t *msg, void *arg) | |
{ | |
ctx_t *ctx = arg; | |
flux_log (h, LOG_DEBUG, "%s: received a message that no new jobs will be submitted", __FUNCTION__); | |
ctx->no_new_work = true; | |
assert (ctx->root_node); | |
if (ctx->num_levels == 1) { //we are the only scheduler | |
flux_log (h, LOG_DEBUG, "%s: the root sched is the only sched, submitting all jobs to the local scheduler", __FUNCTION__); | |
char *job_spec_str = NULL; | |
for (job_spec_str = zlist_pop (ctx->job_queue); | |
job_spec_str; | |
job_spec_str = zlist_pop (ctx->job_queue)) | |
{ | |
submit_job (h, job_spec_str); | |
flux_log (h, LOG_DEBUG, "%s: %zu jobs are now in the job_queue", __FUNCTION__, zlist_size (ctx->job_queue)); | |
free (job_spec_str); | |
} | |
} else { | |
handle_all_queued_work_requests(ctx); | |
} | |
} | |
static void child_recv_work_cb (flux_rpc_t *rpc, void *arg) | |
{ | |
ctx_t *ctx = arg; | |
const char *job_spec_str = NULL; | |
flux_log (ctx->h, LOG_DEBUG, "%s: received work request response from parent, calling rpc_get", __FUNCTION__); | |
flux_rpc_get (rpc, NULL, &job_spec_str); | |
flux_log (ctx->h, LOG_DEBUG, "%s: successfully ran rpc_get", __FUNCTION__); | |
JSON response = Jfromstr (job_spec_str); | |
bool no_more_work = false; | |
if (!Jget_bool (response, "no_more_work", &no_more_work)) { | |
// key doesn't exist, so its probably a job spec | |
flux_log (ctx->h, LOG_DEBUG, "%s: no_more_work not in response, probably a job spec", __FUNCTION__); | |
no_more_work = false; | |
} | |
Jput (response); | |
if (no_more_work) { | |
flux_log (ctx->h, LOG_DEBUG, "%s: parent has informed us there will be no more work", __FUNCTION__); | |
if (ctx->internal_node) { | |
handle_all_queued_work_requests(ctx); | |
} | |
ctx->no_new_work = true; | |
ctx->child_should_exit = true; | |
flux_msg_t *exit_event = flux_event_encode("init_prog.exit", NULL); | |
flux_send (ctx->h, exit_event, 0); | |
} else { | |
if (ctx->internal_node) { | |
flux_log (ctx->h, LOG_DEBUG, "%s: internal node received new work, enqueueing ", __FUNCTION__); | |
zlist_append (ctx->job_queue, strdup (job_spec_str)); | |
flux_log (ctx->h, LOG_DEBUG, "%s: %zu jobs are now in the job_queue", __FUNCTION__, zlist_size (ctx->job_queue)); | |
} else if (ctx->leaf_node) { | |
flux_log (ctx->h, LOG_DEBUG, "%s: leaf node received new work, submitting ", __FUNCTION__); | |
submit_job (ctx->h, job_spec_str); | |
} | |
flux_log (ctx->h, LOG_DEBUG, "%s: requesting more work from parent", __FUNCTION__); | |
request_work_from_parent (ctx); | |
} | |
} | |
int request_work_from_parent (ctx_t *ctx) | |
{ | |
flux_log (ctx->h, LOG_DEBUG, "requesting work from parent"); | |
flux_rpc_t *rpc = flux_rpc (ctx->parent_h, "offload.need_job", NULL, FLUX_NODEID_ANY, 0); | |
if (flux_rpc_then (rpc, child_recv_work_cb, ctx) < 0) { | |
flux_log (ctx->h, LOG_ERR, "%s: rpc_then", __FUNCTION__); | |
return -1; | |
} | |
return 0; | |
} | |
struct flux_msg_handler_spec root_htab[] = { | |
{FLUX_MSGTYPE_REQUEST, "offload.new_job", new_job_cb}, | |
{FLUX_MSGTYPE_REQUEST, "offload.need_job", child_needs_a_job_cb}, | |
{FLUX_MSGTYPE_EVENT, "no_new_jobs", no_new_jobs_cb}, | |
FLUX_MSGHANDLER_TABLE_END, | |
}; | |
struct flux_msg_handler_spec internal_htab[] = { | |
{FLUX_MSGTYPE_REQUEST, "offload.new_job", new_job_cb}, | |
{FLUX_MSGTYPE_REQUEST, "offload.need_job", child_needs_a_job_cb}, | |
FLUX_MSGHANDLER_TABLE_END, | |
}; | |
int mod_main (flux_t h, int argc, char **argv) | |
{ | |
int rc = -1; | |
uint32_t rank = 1; | |
ctx_t *ctx = getctx (h); | |
if (flux_get_rank (h, &rank)) { | |
flux_log (h, LOG_ERR, "failed to determine rank"); | |
goto done; | |
} else if (process_args (argc, argv, ctx) != 0) { | |
flux_log (h, LOG_ERR, "can't process module args"); | |
goto done; | |
} | |
flux_log (h, LOG_DEBUG, "offload module is running on rank %d", rank); | |
if (ctx->root_node) { | |
flux_event_subscribe(h, "no_new_jobs"); | |
if (flux_msg_handler_addvec (h, root_htab, (void *)ctx) < 0) { | |
flux_log (h, LOG_ERR, "flux_msg_handler_addvec: %s", strerror (errno)); | |
goto done; | |
} | |
} else if (ctx->internal_node) { | |
if (flux_msg_handler_addvec (h, internal_htab, (void *)ctx) < 0) { | |
flux_log (h, LOG_ERR, "flux_msg_handler_addvec: %s", strerror (errno)); | |
goto done; | |
} | |
} | |
if (ctx->leaf_node || ctx->internal_node) { | |
flux_log (h, LOG_DEBUG, "opening handle to parent"); | |
ctx->parent_h = flux_open(flux_attr_get(ctx->h, "parent-uri", NULL), FLUX_O_TRACE); | |
flux_set_reactor(ctx->parent_h, flux_get_reactor(ctx->h)); | |
request_work_from_parent (ctx); | |
} | |
if (flux_reactor_run (flux_get_reactor (h), 0) < 0) { | |
flux_log (h, LOG_ERR, "flux_reactor_run: %s", strerror (errno)); | |
goto done; | |
} | |
rc = 0; | |
done: | |
return rc; | |
} | |
MOD_NAME ("offload"); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment