Skip to content

Instantly share code, notes, and snippets.

@joerns
Last active August 22, 2023 07:11
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save joerns/7f6a97a6504214db04c68df6dac5b253 to your computer and use it in GitHub Desktop.
Save joerns/7f6a97a6504214db04c68df6dac5b253 to your computer and use it in GitHub Desktop.
libfabric RC Example
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <assert.h>
#include <rdma/fabric.h>
#include <rdma/fi_eq.h>
#include <rdma/fi_endpoint.h>
#include <rdma/fi_cm.h>
#include <rdma/fi_errno.h>
#include <sys/epoll.h>
#include "common.h"
struct send_socket
{
struct fi_info* fi;
struct fid_fabric *fabric;
struct fid_eq *eq;
struct fid_domain *domain;
struct fid_ep *ep;
struct fid_cq* cq;
int fd;
};
void print_sockaddr_type(struct fi_info* fi)
{
switch(fi->addr_format) {
case FI_SOCKADDR_IN:
printf("sockaddr type: FI_SOCKADDR_IN\n");
break;
case FI_FORMAT_UNSPEC:
printf("sockaddr type: FI_FORMAT_UNSPEC\n");
break;
case FI_SOCKADDR_IN6:
printf("sockaddr type: FI_SOCKADDR_IN6\n");
break;
case FI_SOCKADDR_IB:
printf("sockaddr type: FI_SOCKADDR_IB\n");
break;
case FI_SOCKADDR:
printf("sockaddr type: FI_SOCKADDR\n");
break;
default:
fprintf(stderr, "error: sockaddr type unkown\n");
break;
}
}
void send_socket_connect(int efd, struct send_socket * socket)
{
struct fi_eq_cm_entry entry;
uint32_t event;
ssize_t rd;
int ret;
struct fi_info* hints;
struct fi_eq_attr eq_attr = {
.wait_obj = FI_WAIT_UNSPEC // waiting only through fi_ calls (no epoll)
};
hints = fi_allocinfo();
hints->ep_attr->type = FI_EP_MSG;
hints->caps = FI_MSG;
hints->mode = FI_LOCAL_MR;
hints->tx_attr->mode = FI_LOCAL_MR;
hints->rx_attr->mode = FI_LOCAL_MR;
if(ret = fi_getinfo(FI_VERSION(1, 0), "10.10.0.196", "12345", 0, hints, &socket->fi))
{
ERROR("fi_getinfo failed: %d '%s'", ret, fi_strerror(-ret));
}
print_sockaddr_type(socket->fi);
printf("info: %s\n", fi_tostr(socket->fi, FI_TYPE_INFO));
if(ret = fi_fabric(socket->fi->fabric_attr, &socket->fabric, NULL)) {
ERROR("fi_fabric failed: %d", ret);
}
if(ret = fi_eq_open(socket->fabric, &eq_attr, &socket->eq, NULL)) {
ERROR("fi_eq_open failed: %d", ret);
}
if(ret = fi_domain(socket->fabric, socket->fi, &socket->domain, NULL)) {
ERROR("fi_domain failed: %d", ret);
}
if(ret = fi_endpoint(socket->domain, socket->fi, &socket->ep, NULL)) {
ERROR("fi_endpoint failed: %d", ret);
}
if(ret = fi_ep_bind((socket->ep), &socket->eq->fid, 0)) {
ERROR("fi_ep_bind failed: %d", ret);
}
struct fi_cq_attr cq_attr;
cq_attr.size = 64; /* # entries for CQ */
cq_attr.flags = 0; /* operation flags */
cq_attr.format = FI_CQ_FORMAT_CONTEXT; /* completion format */
cq_attr.wait_obj= FI_WAIT_FD; /* requested wait object */
cq_attr.signaling_vector = 0; /* interrupt affinity */
cq_attr.wait_cond = FI_WAIT_NONE; /* wait condition format */
cq_attr.wait_set = NULL; /* optional wait set */
if(ret = fi_cq_open(socket->domain, &cq_attr, &socket->cq, NULL))
{
ERROR("fi_cq_open failed: %d '%s'", ret, fi_strerror(-ret));
}
if(ret = fi_ep_bind((socket->ep), &socket->cq->fid, FI_TRANSMIT|FI_RECV)) {
ERROR("fi_ep_bind failed: %d", ret);
}
/* if(ret = fi_enable(socket->ep)) {
ERROR("fi_enable failed: %d", ret);
}*/
/* Connect to server */
if(ret = fi_connect(socket->ep, socket->fi->dest_addr, NULL, 0)) {
ERROR("fi_connect failed: %d '%s'", ret, fi_strerror(-ret));
}
/* Wait for the connection to be established */
rd = fi_eq_sread(socket->eq, &event, &entry, sizeof entry, -1, 0);
if(rd == -FI_EAVAIL)
{
struct fi_eq_err_entry err_entry;
if((ret = fi_eq_readerr(socket->eq, &err_entry, 0)) < 0)
{
ERROR("fi_eq_readerr failed: %d", ret);
}
char buf[1024];
fprintf(stderr, "error event: %s\n", fi_eq_strerror(socket->eq, err_entry.prov_errno,
err_entry.err_data, buf, 1024));
exit(EXIT_FAILURE);
}
else if(rd < 0)
{
ERROR("fi_eq_sread failed: %d '%s'", rd, fi_strerror(-rd));
}
if (rd != sizeof entry) {
ERROR("reading from event queue failed (after connect): rd=%d", rd);
}
printf("received event\n");
if (event != FI_CONNECTED || entry.fid != &socket->ep->fid) {
ERROR("Unexpected CM event %d fid %p (ep %p)", event, entry.fid, socket->ep);
}
printf("connected\n");
if(ret = fi_control(&socket->cq->fid, FI_GETWAIT, &socket->fd))
{
ERROR("fi_control failed: %d", ret);
}
printf("CQ FD: %d\n", socket->fd);
struct epoll_event ev;
ev.events = EPOLLIN;
ev.data.ptr = socket;
if(ret = epoll_ctl(efd, EPOLL_CTL_ADD, socket->fd, &ev)) {
ERROR("epoll_ctl failed: %d", ret);
}
}
void send_socket_send(struct send_socket* socket, char* buf, size_t len)
{
int ret;
struct fid_mr *mr;
if(ret = fi_mr_reg(socket->domain, buf, len, FI_SEND, 0, 0, 0, &mr, NULL))
{
ERROR("fi_mr_reg failed: %d", ret);
}
void* desc = fi_mr_desc(mr);
if(ret = fi_send(socket->ep, buf, len, desc, 0, NULL))
{
ERROR("fi_send failed: %d, '%s'", ret, fi_strerror(-ret));
}
}
int main()
{
int ret, i, n;
int efd = epoll_create(64);
struct send_socket socket;
send_socket_connect(efd, &socket);
char buf[] = "hello, world!";
size_t len = 14;
send_socket_send(&socket, buf, len);
struct epoll_event events[64];
n = epoll_wait(efd, events, 64, -1);
if(n < 0) {
ERROR("epoll_wait failed: %d", n);
}
for(i=0; i<n; i++)
{
// we know these are send sockets
struct send_socket* socket = events[i].data.ptr;
struct fi_cq_entry completion_entry;
fi_cq_read(socket->cq, &completion_entry, 1);
printf("message sent\n");
}
return 0;
}
#ifndef COMMON_H
#define COMMON_H
#include <time.h>
#define ERROR(x, ...) \
do { \
fprintf(stderr, "error: "x"\n", __VA_ARGS__);\
exit(EXIT_FAILURE);\
} while(0)
double now()
{
struct timespec t;
if(clock_gettime(CLOCK_MONOTONIC, &t))
ERROR("clock_gettime failed: %d (%s)", errno, strerror(errno));
return ((double)t.tv_sec) + 1e-9*t.tv_nsec;
}
void handle_cq_error(struct fid_cq* cq)
{
int r;
struct fi_cq_err_entry err_entry;
if((r = fi_cq_readerr(cq, &err_entry, 0)) < 0)
{
ERROR("fi_eq_readerr failed", r);
}
fprintf(stderr, "error in the CQ: %s / %s\n", fi_strerror(err_entry.err), fi_strerror(err_entry.prov_errno));
exit(-1);
}
/*
* Checks if any completions arrived and returns their number. Does
* not block.
*/
int check_for_completions(struct fid_cq* cq, struct fi_cq_entry* completions, unsigned max_completions)
{
ssize_t num_completions = fi_cq_read(cq, completions, max_completions);
if(num_completions < 0)
{
if(num_completions == -FI_EAGAIN)
return 0;
if(num_completions == -FI_EAVAIL)
{
handle_cq_error(cq);
}
ERROR("fi_cq_read failed: %d (%s)", num_completions, fi_strerror(-num_completions));
}
return num_completions;
}
/*
* Blocks until `num` completions arrived.
*/
int wait_for_completions(struct fid_cq* cq, unsigned num, struct fi_cq_entry* completions, unsigned max_completions)
{
if(num == 0)
return 0;
unsigned completions_arrived = 0;
while(completions_arrived < num)
{
ssize_t num_completions = fi_cq_read(cq, completions, max_completions);
if(num_completions < 0)
{
if(num_completions == -FI_EAGAIN)
continue;
if(num_completions == -FI_EAVAIL)
handle_cq_error(cq);
ERROR("fi_cq_read failed: %d (%s)", num_completions, fi_strerror(-num_completions));
}
completions_arrived += num_completions;
}
return completions_arrived;
}
#endif
#include <stdio.h>
#include <stdlib.h>
#include <rdma/fabric.h>
#include <rdma/fi_eq.h>
#include <rdma/fi_endpoint.h>
#include <rdma/fi_cm.h>
#include <rdma/fi_errno.h>
#include <sys/epoll.h>
#include "common.h"
struct listen_socket
{
struct fi_info* fi;
struct fid_fabric *fabric;
struct fid_eq *eq;
struct fid_pep *pep;
int fd;
};
struct recv_socket
{
struct fid_domain *domain;
struct fid_ep *ep;
struct fid_cq* cq;
int fd;
struct fid_mr *mr;
char* buf;
size_t len;
};
void listen_server(int efd, struct listen_socket* socket)
{
int ret;
struct epoll_event ev;
struct fi_info* hints;
struct fi_eq_attr eq_attr = {
.wait_obj = FI_WAIT_FD
};
hints = fi_allocinfo();
hints->ep_attr->type = FI_EP_MSG;
hints->caps = FI_MSG;
hints->mode = FI_LOCAL_MR;
if(ret = fi_getinfo(FI_VERSION(1, 0), "127.0.0.1", "12345", FI_SOURCE, hints, &socket->fi)) {
ERROR("fi_getinfo failed: %d '%s'", ret, fi_strerror(-ret));
}
printf("info: %s\n", fi_tostr(socket->fi, FI_TYPE_INFO));
if(ret = fi_fabric(socket->fi->fabric_attr, &socket->fabric, NULL)) {
ERROR("fi_fabric failed: %d", ret);
}
if(ret = fi_eq_open(socket->fabric, &eq_attr, &socket->eq, NULL)) {
ERROR("fi_eq_open failed: %d", ret);
}
if(ret = fi_passive_ep(socket->fabric, socket->fi, &socket->pep, NULL)) {
ERROR("fi_passive_ep failed: %d", ret);
}
if(ret = fi_pep_bind(socket->pep, &socket->eq->fid, 0)) {
ERROR("fi_pep_bind failed: %d", ret);
}
if(ret = fi_listen(socket->pep)) {
ERROR("fi_listen failed: %d '%s'", ret, fi_strerror(-ret));
}
if(ret = fi_control(&socket->eq->fid, FI_GETWAIT, &socket->fd))
{
ERROR("fi_control failed: %d '%s'", ret, fi_strerror(-ret));
}
ev.events = EPOLLIN;
ev.data.ptr = socket;
if(ret = epoll_ctl(efd, EPOLL_CTL_ADD, socket->fd, &ev)) {
ERROR("epoll_ctl failed: %d", ret);
}
}
void handle_connection_request(int efd, struct fi_info* info, struct listen_socket* lsocket)
{
struct epoll_event ev;
int ret;
ssize_t rd;
uint32_t event;
struct fi_eq_cm_entry entry;
struct recv_socket * rsocket = malloc(sizeof(struct recv_socket));
if(ret = fi_domain(lsocket->fabric, info, &rsocket->domain, NULL)) {
ERROR("fi_domain failed: %d", ret);
}
if(ret = fi_endpoint(rsocket->domain, info, &rsocket->ep, NULL)) {
ERROR("fi_endpoint failed: %d", ret);
}
if(ret = fi_ep_bind((rsocket->ep), &lsocket->eq->fid, 0)) {
ERROR("fi_ep_bind failed: %d", ret);
}
struct fi_cq_attr cq_attr;
cq_attr.size = 64; /* # entries for CQ */
cq_attr.flags = 0; /* operation flags */
cq_attr.format = FI_CQ_FORMAT_CONTEXT; /* completion format */
cq_attr.wait_obj= FI_WAIT_FD; /* requested wait object */
cq_attr.signaling_vector = 0; /* interrupt affinity */
cq_attr.wait_cond = FI_WAIT_NONE; /* wait condition format */
cq_attr.wait_set = NULL; /* optional wait set */
if(ret = fi_cq_open(rsocket->domain, &cq_attr, &rsocket->cq, NULL))
{
ERROR("fi_cq_open failed: %d", ret);
}
if(ret = fi_ep_bind((rsocket->ep), &rsocket->cq->fid, FI_TRANSMIT|FI_RECV)) {
ERROR("fi_ep_bind failed: %d", ret);
}
if(ret = fi_accept(rsocket->ep, NULL, 0)) {
ERROR("fi_accept failed: %d '%s'", ret, fi_strerror(-ret));
}
/* Wait for the connection to be established */
rd = fi_eq_sread(lsocket->eq, &event, &entry, sizeof entry, -1, 0);
if(rd < 0)
{
ERROR("fi_eq_sread failed: %d '%s'", rd, fi_strerror(-rd));
}
if (rd != sizeof entry) {
ERROR("reading from event queue failed (after listen): rd=%d", rd);
}
if (event != FI_CONNECTED || entry.fid != &rsocket->ep->fid) {
ERROR("Unexpected CM event %d fid %p (ep %p)", event, entry.fid, rsocket->ep);
}
printf("connected\n");
rsocket->len = 1024*1024;
rsocket->buf = malloc(rsocket->len);
if(ret = fi_mr_reg(rsocket->domain, rsocket->buf, rsocket->len, FI_RECV, 0, 0, 0, &rsocket->mr, NULL))
{
ERROR("fi_mr_reg failed: %d", ret);
}
void* desc = fi_mr_desc(rsocket->mr);
if(ret = fi_recv(rsocket->ep, rsocket->buf, rsocket->len, desc, 0, NULL))
{
ERROR("fi_recv failed: %d, '%s'", ret, fi_strerror(-ret));
}
if(ret = fi_control(&rsocket->cq->fid, FI_GETWAIT, &rsocket->fd))
{
ERROR("fi_control failed: %d", ret);
}
printf("CQ FD: %d\n", rsocket->fd);
ev.events = EPOLLIN;
ev.data.ptr = rsocket;
if(ret = epoll_ctl(efd, EPOLL_CTL_ADD, rsocket->fd, &ev)) {
ERROR("epoll_ctl failed: %d", ret);
}
}
int main()
{
int ret;
struct listen_socket lsocket;
int efd = epoll_create(64);
listen_server(efd, &lsocket);
struct epoll_event ev;
struct epoll_event events[1024];
int i, n;
while(1)
{
n = epoll_wait(efd, events, 1024, -1);
for(i = 0; i < n; i++)
{
if(events[i].data.ptr == &lsocket)
{
// This is a connection request
struct listen_socket * socket = events[i].data.ptr;
struct fi_eq_cm_entry entry;
ssize_t rd;
uint32_t event;
rd = fi_eq_sread(socket->eq, &event, &entry, sizeof entry, 0, 0);
if(rd < 0)
{
ERROR("fi_eq_sread failed: %d '%s'", rd, fi_strerror(-rd));
}
if (rd != sizeof entry) {
ERROR("reading from event queue failed (after listen): rd=%d", rd);
}
printf("received event\n");
switch(event)
{
case FI_CONNREQ:
handle_connection_request(efd, entry.info, socket);
break;
case FI_SHUTDOWN:
printf("remote disconnected\n");
break;
default:
ERROR("unexpected event: %d", event);
}
}
else
{
// This is a completion
struct fi_cq_entry completion_entry;
struct recv_socket * socket = events[i].data.ptr;
ret = fi_cq_sread(socket->cq, &completion_entry, 1, NULL, 0);
if(ret < 0)
{
// my guess why this happens: we set FI_TRANSMIT on the CQ, although we do not need it. This is due to a bug in libfabric 1.2.0
if(ret == -FI_EAGAIN)
continue;
ERROR("fi_cq_sread failed: %d '%s'", ret, fi_strerror(-ret));
}
printf("message received: %s\n", socket->buf);
}
}
}
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment