Created
August 25, 2018 00:41
-
-
Save keisukefukuda/5ab5b36d63cdb8a5acdd7428cd380f2a to your computer and use it in GitHub Desktop.
Modified uct_hello_world.c
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Copyright (C) UT-Battelle, LLC. 2015. ALL RIGHTS RESERVED. | |
* See file LICENSE for terms. | |
*/ | |
#include <uct/api/uct.h> | |
#include <assert.h> | |
#include <ctype.h> | |
#include <stdlib.h> | |
#include <string.h> | |
#include <unistd.h> | |
#define CHKERR_JUMP(_cond, _msg, _label) \ | |
do { \ | |
if (_cond) { \ | |
fprintf(stderr, "Failed to %s\n", _msg); \ | |
goto _label; \ | |
} \ | |
} while (0) | |
typedef enum { | |
FUNC_AM_SHORT, | |
FUNC_AM_BCOPY, | |
FUNC_AM_ZCOPY | |
} func_am_t; | |
typedef struct { | |
int is_uct_desc; | |
} recv_desc_t; | |
typedef struct { | |
func_am_t func_am_type; | |
const char *dev_name; | |
const char *tl_name; | |
long test_strlen; | |
} cmd_args_t; | |
typedef struct { | |
uct_iface_attr_t attr; /* Interface attributes: capabilities and limitations */ | |
uct_iface_h iface; /* Communication interface context */ | |
uct_md_h pd; /* Memory domain */ | |
uct_worker_h worker; /* Workers represent allocated resources in a communication thread */ | |
} iface_info_t; | |
/* Helper data type for am_short */ | |
typedef struct { | |
uint64_t header; | |
char *payload; | |
size_t len; | |
} am_short_args_t; | |
/* Helper data type for am_bcopy */ | |
typedef struct { | |
char *data; | |
size_t len; | |
} am_bcopy_args_t; | |
/* Helper data type for am_zcopy */ | |
typedef struct { | |
uct_completion_t uct_comp; | |
uct_md_h md; | |
uct_mem_h memh; | |
} zcopy_comp_t; | |
static void* desc_holder = NULL; | |
static char *func_am_t_str(func_am_t func_am_type) | |
{ | |
switch (func_am_type) { | |
case FUNC_AM_SHORT: | |
return "uct_ep_am_short"; | |
case FUNC_AM_BCOPY: | |
return "uct_ep_am_bcopy"; | |
case FUNC_AM_ZCOPY: | |
return "uct_ep_am_zcopy"; | |
} | |
return NULL; | |
} | |
static size_t func_am_max_size(func_am_t func_am_type, | |
const uct_iface_attr_t *attr) | |
{ | |
switch (func_am_type) { | |
case FUNC_AM_SHORT: | |
return attr->cap.am.max_short; | |
case FUNC_AM_BCOPY: | |
return attr->cap.am.max_bcopy; | |
case FUNC_AM_ZCOPY: | |
return attr->cap.am.max_zcopy; | |
} | |
return 0; | |
} | |
/* Helper function for am_short */ | |
void am_short_params_pack(char *buf, size_t len, am_short_args_t *args) | |
{ | |
args->header = *(uint64_t *)buf; | |
if (len > sizeof(args->header)) { | |
args->payload = (buf + sizeof(args->header)); | |
args->len = len - sizeof(args->header); | |
} else { | |
args->payload = NULL; | |
args->len = 0; | |
} | |
} | |
ucs_status_t do_am_short(uct_ep_h ep, uint8_t id, const cmd_args_t *cmd_args, | |
char *buf) | |
{ | |
am_short_args_t send_args; | |
am_short_params_pack(buf, cmd_args->test_strlen, &send_args); | |
/* Send active message to remote endpoint */ | |
return uct_ep_am_short(ep, id, send_args.header, send_args.payload, | |
send_args.len); | |
} | |
/* Pack callback for am_bcopy */ | |
size_t am_bcopy_data_pack_cb(void *dest, void *arg) | |
{ | |
am_bcopy_args_t *bc_args = arg; | |
memcpy(dest, bc_args->data, bc_args->len); | |
return bc_args->len; | |
} | |
ucs_status_t do_am_bcopy(uct_ep_h ep, uint8_t id, const cmd_args_t *cmd_args, | |
char *buf) | |
{ | |
am_bcopy_args_t args; | |
ssize_t len; | |
args.data = buf; | |
args.len = cmd_args->test_strlen; | |
/* Send active message to remote endpoint */ | |
len = uct_ep_am_bcopy(ep, id, am_bcopy_data_pack_cb, &args, 0); | |
/* Negative len is an error code */ | |
return (len >= 0) ? UCS_OK : len; | |
} | |
/* Completion callback for am_zcopy */ | |
void zcopy_completion_cb(uct_completion_t *self, ucs_status_t status) | |
{ | |
zcopy_comp_t *comp = (zcopy_comp_t *)self; | |
assert((comp->uct_comp.count == 0) && (status == UCS_OK)); | |
uct_md_mem_dereg(comp->md, comp->memh); | |
desc_holder = (void *)0xDEADBEEF; | |
} | |
ucs_status_t do_am_zcopy(iface_info_t *if_info, uct_ep_h ep, uint8_t id, | |
const cmd_args_t *cmd_args, char *buf) | |
{ | |
uct_mem_h memh; | |
uct_iov_t iov; | |
zcopy_comp_t comp; | |
ucs_status_t status = uct_md_mem_reg(if_info->pd, buf, cmd_args->test_strlen, | |
0, &memh); | |
iov.buffer = buf; | |
iov.length = cmd_args->test_strlen; | |
iov.memh = memh; | |
iov.stride = 0; | |
iov.count = 1; | |
comp.uct_comp.func = zcopy_completion_cb; | |
comp.uct_comp.count = 1; | |
comp.md = if_info->pd; | |
comp.memh = memh; | |
if (status == UCS_OK) { | |
status = uct_ep_am_zcopy(ep, id, NULL, 0, &iov, 1, 0, | |
(uct_completion_t *)&comp); | |
if (status == UCS_INPROGRESS) { | |
while (!desc_holder) { | |
/* Explicitly progress outstanding active message request */ | |
uct_worker_progress(if_info->worker); | |
} | |
status = UCS_OK; | |
} | |
} | |
return status; | |
} | |
static void print_strings(const char *label, const char *local_str, | |
const char *remote_str) | |
{ | |
fprintf(stdout, "\n\n----- UCT TEST SUCCESS ----\n\n"); | |
fprintf(stdout, "[%s] %s sent %s", label, local_str, remote_str); | |
fprintf(stdout, "\n\n---------------------------\n"); | |
fflush(stdout); | |
} | |
/* Callback to handle receive active message */ | |
static ucs_status_t hello_world(void *arg, void *data, size_t length, unsigned flags) | |
{ | |
recv_desc_t *rdesc; | |
func_am_t func_am_type = *(func_am_t *)arg; | |
print_strings("callback", func_am_t_str(func_am_type), data); | |
if (flags & UCT_CB_PARAM_FLAG_DESC) { | |
rdesc = (recv_desc_t *)data - 1; | |
/* Hold descriptor to release later and return UCS_INPROGRESS */ | |
rdesc->is_uct_desc = 1; | |
desc_holder = rdesc; | |
return UCS_INPROGRESS; | |
} | |
/* We need to copy-out data and return UCS_OK if want to use the data | |
* outside the callback */ | |
rdesc = malloc(sizeof(*rdesc) + length); | |
rdesc->is_uct_desc = 0; | |
memcpy(rdesc + 1, data, length); | |
desc_holder = rdesc; | |
return UCS_OK; | |
} | |
/* init the transport by its name */ | |
static ucs_status_t init_iface(char *dev_name, char *tl_name, | |
func_am_t func_am_type, | |
iface_info_t *iface_p) | |
{ | |
ucs_status_t status; | |
uct_iface_config_t *config; /* Defines interface configuration options */ | |
uct_iface_params_t params; | |
params.open_mode = UCT_IFACE_OPEN_MODE_DEVICE; | |
params.mode.device.tl_name = tl_name; | |
params.mode.device.dev_name = dev_name; | |
params.stats_root = NULL; | |
params.rx_headroom = sizeof(recv_desc_t); | |
UCS_CPU_ZERO(¶ms.cpu_mask); | |
/* Read transport-specific interface configuration */ | |
status = uct_md_iface_config_read(iface_p->pd, tl_name, NULL, NULL, &config); | |
CHKERR_JUMP(UCS_OK != status, "setup iface_config", error_ret); | |
/* Open communication interface */ | |
status = uct_iface_open(iface_p->pd, iface_p->worker, ¶ms, config, | |
&iface_p->iface); | |
uct_config_release(config); | |
CHKERR_JUMP(UCS_OK != status, "open temporary interface", error_ret); | |
/* Enable progress on the interface */ | |
uct_iface_progress_enable(iface_p->iface, | |
UCT_PROGRESS_SEND | UCT_PROGRESS_RECV); | |
/* Get interface attributes */ | |
status = uct_iface_query(iface_p->iface, &iface_p->attr); | |
CHKERR_JUMP(UCS_OK != status, "query iface", error_iface); | |
/* Check if current device and transport support required active messages */ | |
if ((func_am_type == FUNC_AM_SHORT) && | |
(iface_p->attr.cap.flags & UCT_IFACE_FLAG_AM_SHORT)) { | |
return UCS_OK; | |
} | |
if ((func_am_type == FUNC_AM_BCOPY) && | |
(iface_p->attr.cap.flags & UCT_IFACE_FLAG_AM_BCOPY)) { | |
return UCS_OK; | |
} | |
if ((func_am_type == FUNC_AM_ZCOPY) && | |
(iface_p->attr.cap.flags & UCT_IFACE_FLAG_AM_ZCOPY)) { | |
return UCS_OK; | |
} | |
error_iface: | |
uct_iface_close(iface_p->iface); | |
error_ret: | |
return UCS_ERR_UNSUPPORTED; | |
} | |
/* Device and transport to be used are determined by minimum latency */ | |
static ucs_status_t dev_tl_lookup(const cmd_args_t *cmd_args, | |
iface_info_t *iface_p) | |
{ | |
uct_md_resource_desc_t *md_resources; /* Memory domain resource descriptor */ | |
uct_tl_resource_desc_t *tl_resources; /*Communication resource descriptor */ | |
unsigned num_md_resources; /* Number of protected domain */ | |
unsigned num_tl_resources; /* Number of transport resources resource objects created */ | |
uct_md_config_t *md_config; | |
ucs_status_t status; | |
int i; | |
int j; | |
status = uct_query_md_resources(&md_resources, &num_md_resources); | |
CHKERR_JUMP(UCS_OK != status, "query for memory domain resources", error_ret); | |
/* Iterate through protected domain resources */ | |
for (i = 0; i < num_md_resources; ++i) { | |
status = uct_md_config_read(md_resources[i].md_name, NULL, NULL, &md_config); | |
CHKERR_JUMP(UCS_OK != status, "read PD config", release_pd); | |
status = uct_md_open(md_resources[i].md_name, md_config, &iface_p->pd); | |
uct_config_release(md_config); | |
CHKERR_JUMP(UCS_OK != status, "open memory domains", release_pd); | |
status = uct_md_query_tl_resources(iface_p->pd, &tl_resources, &num_tl_resources); | |
CHKERR_JUMP(UCS_OK != status, "query transport resources", close_pd); | |
/* Go through each available transport and find the proper name */ | |
for (j = 0; j < num_tl_resources; ++j) { | |
if (!strcmp(cmd_args->dev_name, tl_resources[j].dev_name) && | |
!strcmp(cmd_args->tl_name, tl_resources[j].tl_name)) { | |
status = init_iface(tl_resources[j].dev_name, | |
tl_resources[j].tl_name, | |
cmd_args->func_am_type, iface_p); | |
if (UCS_OK == status) { | |
fprintf(stdout, "Using %s with %s.\n", | |
tl_resources[j].dev_name, | |
tl_resources[j].tl_name); | |
fflush(stdout); | |
uct_release_tl_resource_list(tl_resources); | |
goto release_pd; | |
} | |
} | |
} | |
uct_release_tl_resource_list(tl_resources); | |
uct_md_close(iface_p->pd); | |
} | |
fprintf(stderr, "No supported (dev/tl) found (%s/%s)\n", | |
cmd_args->dev_name, cmd_args->tl_name); | |
status = UCS_ERR_UNSUPPORTED; | |
release_pd: | |
uct_release_md_resource_list(md_resources); | |
error_ret: | |
return status; | |
close_pd: | |
uct_md_close(iface_p->pd); | |
goto release_pd; | |
} | |
int print_err_usage() | |
{ | |
const char func_template[] = " -%c Select \"%s\" function to send the message%s\n"; | |
fprintf(stderr, "Usage: uct_hello_world [parameters]\n"); | |
fprintf(stderr, "UCT hello world client/server example utility\n"); | |
fprintf(stderr, "\nParameters are:\n"); | |
fprintf(stderr, func_template, 'i', func_am_t_str(FUNC_AM_SHORT), " (default)"); | |
fprintf(stderr, func_template, 'b', func_am_t_str(FUNC_AM_BCOPY), ""); | |
fprintf(stderr, func_template, 'z', func_am_t_str(FUNC_AM_ZCOPY), ""); | |
fprintf(stderr, " -d Select device name\n"); | |
fprintf(stderr, " -t Select transport layer\n"); | |
fprintf(stderr, " -s size Set test string length (default:16)\n"); | |
fprintf(stderr, "\n"); | |
return UCS_ERR_UNSUPPORTED; | |
} | |
int parse_cmd(int argc, char * const argv[], cmd_args_t *args) | |
{ | |
int c = 0, index = 0; | |
assert(args); | |
memset(args, 0, sizeof(*args)); | |
/* Defaults */ | |
args->func_am_type = FUNC_AM_SHORT; | |
args->test_strlen = 16; | |
opterr = 0; | |
while ((c = getopt(argc, argv, "ibzd:t:n:p:s:h")) != -1) { | |
switch (c) { | |
case 'i': | |
args->func_am_type = FUNC_AM_SHORT; | |
break; | |
case 'b': | |
args->func_am_type = FUNC_AM_BCOPY; | |
break; | |
case 'z': | |
args->func_am_type = FUNC_AM_ZCOPY; | |
break; | |
case 'd': | |
args->dev_name = optarg; | |
break; | |
case 't': | |
args->tl_name = optarg; | |
break; | |
case 's': | |
args->test_strlen = atol(optarg); | |
if (args->test_strlen <= 0) { | |
fprintf(stderr, "Wrong string size %ld\n", args->test_strlen); | |
return UCS_ERR_UNSUPPORTED; | |
} | |
break; | |
case '?': | |
if (optopt == 's') { | |
fprintf(stderr, "Option -%c requires an argument.\n", optopt); | |
} else if (isprint (optopt)) { | |
fprintf(stderr, "Unknown option `-%c'.\n", optopt); | |
} else { | |
fprintf(stderr, "Unknown option character `\\x%x'.\n", optopt); | |
} | |
case 'h': | |
default: | |
return print_err_usage(); | |
} | |
} | |
for (index = optind; index < argc; index++) { | |
fprintf(stderr, "WARNING: Non-option argument %s\n", argv[index]); | |
} | |
if (args->dev_name == NULL) { | |
fprintf(stderr, "WARNING: device is not set\n"); | |
return print_err_usage(); | |
} | |
if (args->tl_name == NULL) { | |
fprintf(stderr, "WARNING: transport layer is not set\n"); | |
return print_err_usage(); | |
} | |
return UCS_OK; | |
} | |
int main(int argc, char **argv) | |
{ | |
uct_device_addr_t *own_dev; | |
uct_device_addr_t *peer_dev = NULL; | |
uct_iface_addr_t *own_iface; | |
uct_iface_addr_t *peer_iface = NULL; | |
uct_ep_addr_t *own_ep; | |
uct_ep_addr_t *peer_ep = NULL; | |
ucs_status_t status = UCS_OK; /* status codes for UCS */ | |
uct_ep_h ep; /* Remote endpoint */ | |
ucs_async_context_t *async; /* Async event context manages | |
times and fd notifications */ | |
cmd_args_t cmd_args; | |
iface_info_t if_info; | |
uint8_t id = 0; | |
/* Parse the command line */ | |
if (parse_cmd(argc, argv, &cmd_args)) { | |
status = UCS_ERR_INVALID_PARAM; | |
goto out; | |
} | |
/* Initialize context | |
* It is better to use different contexts for different workers | |
*/ | |
status = ucs_async_context_create(UCS_ASYNC_MODE_THREAD, &async); | |
CHKERR_JUMP(UCS_OK != status, "init async context", out); | |
/* Create a worker object */ | |
status = uct_worker_create(async, UCS_THREAD_MODE_SINGLE, &if_info.worker); | |
CHKERR_JUMP(UCS_OK != status, "create worker", out_cleanup_async); | |
/* Search for the desired transport */ | |
status = dev_tl_lookup(&cmd_args, &if_info); | |
CHKERR_JUMP(UCS_OK != status, "find supported device and transport", | |
out_destroy_worker); | |
own_dev = (uct_device_addr_t*)calloc(1, if_info.attr.device_addr_len); | |
CHKERR_JUMP(NULL == own_dev, "allocate memory for dev addr", | |
out_destroy_iface); | |
own_iface = (uct_iface_addr_t*)calloc(1, if_info.attr.iface_addr_len); | |
CHKERR_JUMP(NULL == own_iface, "allocate memory for if addr", | |
out_free_dev_addrs); | |
/************************************************************************/ | |
if (if_info.attr.cap.flags & UCT_IFACE_FLAG_CONNECT_TO_IFACE) { | |
printf("UCT_IFACE_FLAG_CONNECT_TO_IFACE\n"); | |
} | |
if (if_info.attr.cap.flags & UCT_IFACE_FLAG_CONNECT_TO_EP) { | |
printf("UCT_IFACE_FLAG_CONNECT_TO_EP\n"); | |
} | |
/************************************************************************/ | |
out_free_if_addrs: | |
free(own_iface); | |
free(peer_iface); | |
out_free_dev_addrs: | |
free(own_dev); | |
free(peer_dev); | |
out_destroy_iface: | |
uct_iface_close(if_info.iface); | |
uct_md_close(if_info.pd); | |
out_destroy_worker: | |
uct_worker_destroy(if_info.worker); | |
out_cleanup_async: | |
ucs_async_context_destroy(async); | |
out: | |
return status == UCS_ERR_UNSUPPORTED ? UCS_OK : status; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment