Skip to content

Instantly share code, notes, and snippets.

@raddy
Created July 22, 2022 16:48
Show Gist options
  • Save raddy/9a26e71eede92cb78a4eb0f6f13c478e to your computer and use it in GitHub Desktop.
Save raddy/9a26e71eede92cb78a4eb0f6f13c478e to your computer and use it in GitHub Desktop.
example
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/epoll.h>
#include <fcntl.h>
#include <thread>
#include <cstring>
#include <iostream>
#include <unistd.h>
#include <vector>
#include <optional>
#include <cassert>
#define LOCAL_PORT 59696
#define REMOTE_PORT 60069
#define REMOTE_IP "127.0.0.1"
#define TIMEOUT 3000
// globals
int epoll_fd;
std::vector<int32_t> fds;
std::vector<epoll_event> events;
int counter{0}; // not thread safe but not relevant here
std::pair<std::optional<int>, size_t> read(void* data, size_t size)
{
int event_count = epoll_wait(epoll_fd, events.data(), events.size(), TIMEOUT);
if (event_count <= 0) return std::make_pair(std::nullopt, 0); // must use nullopt with pair here
auto fd = events[0].data.fd;
size_t bytes_read = read(fd, data, size);
std::cout << "bytes read " << bytes_read << std::endl;
return std::make_pair(fd, bytes_read);
}
bool add_to_epoll(int fd)
{
epoll_event event;
event.events = EPOLLIN;
event.data.fd = fd;
if(epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd, &event)) {
close(epoll_fd);
return false;
}
events.resize(fds.size());
return true;
}
void display_receiver_info(int fd)
{
struct sockaddr_in local;
socklen_t len = sizeof(local);
getsockname(fd, (struct sockaddr *)&local, &(len));
char ip[INET_ADDRSTRLEN];
inet_ntop(AF_INET, &local.sin_addr, ip, INET_ADDRSTRLEN);
std::cout << "receiver: " << ip << " : " << htons(local.sin_port) << std::endl;
}
void run_receiver(int fd)
{
std::cout << "Starting receiver thread" << std::endl;
epoll_fd = epoll_create1(0);
fcntl(fd, F_SETFL, O_NONBLOCK); // need this?
fds.push_back(fd);
assert(add_to_epoll(fd));
display_receiver_info(fd);
uint8_t buffer[2048];
constexpr size_t sz = sizeof(buffer);
while (1)
{
auto [fd, bytes_read] = read(buffer, sz);
if (!fd || bytes_read < 1 || fd.value() == -1 || bytes_read > 2048)
{
::usleep(1); // safe to continue on fd error?
continue;
}
std::cout << "received: " << std::string((char*)buffer, bytes_read) << std::endl;
counter++;
}
}
void set_sock_reuse(int fd)
{
const int enable = 1;
setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(int));
setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &enable, sizeof(int));
}
int bind_local(int fd)
{
struct sockaddr_in listen;
memset(&listen, 0, sizeof(listen));
listen.sin_family = PF_INET;
listen.sin_addr.s_addr = INADDR_ANY;
listen.sin_port = htons(LOCAL_PORT);
return bind(fd, (struct sockaddr *)&listen, sizeof(listen));
}
int bind_remote(int fd)
{
struct sockaddr_in remote;
memset(&remote, 0, sizeof(remote));
remote.sin_family = PF_INET;
remote.sin_addr.s_addr = inet_addr(REMOTE_IP);
remote.sin_port = htons(REMOTE_PORT);
return connect(fd, (struct sockaddr *)&remote, sizeof(remote));
}
int send_remote(int fd)
{
const std::string& msg = std::to_string(counter);
return ::send(fd, msg.c_str(), msg.length(), MSG_DONTWAIT);
}
int main(int argc, char* argv[])
{
int fd = ::socket(PF_INET, SOCK_DGRAM, 0);
set_sock_reuse(fd); // don't hold open on kill
assert(bind_local(fd) == 0); // cannot switch these two lines
assert(bind_remote(fd) == 0);
std::unique_ptr<std::thread> thr;
thr.reset(new std::thread([fd]{ run_receiver(fd); }));
while (1)
{
sleep(1);
send_remote(fd);
};
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment