Skip to content

Instantly share code, notes, and snippets.

@jsanders
Created July 22, 2012 04:15
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jsanders/3158383 to your computer and use it in GitHub Desktop.
Save jsanders/3158383 to your computer and use it in GitHub Desktop.
Echo server with connection migration
#include <stdlib.h>
#include <netinet/in.h>
#include <sys/un.h>
#include <string.h>
#include <stdio.h>
#include <unistd.h>
#define MAX_CONNECTIONS 5
#define BUFSIZE 1024
#define SOCKPATH "/tmp/migrate.sock"
typedef struct {
int fds[MAX_CONNECTIONS];
int num;
} connections_t;
void die(char *msg) {
perror(msg);
exit(1);
}
void bind_listener(int listener, unsigned short port) {
struct sockaddr_in address;
memset(&address, 0, sizeof(address));
address.sin_family = AF_INET;
address.sin_addr.s_addr = htonl(INADDR_ANY);
address.sin_port = htons(port);
if(bind(listener, (struct sockaddr *) &address, sizeof(address)) < 0) {
die("ERROR on binding");
}
}
int create_listener(unsigned short port) {
int listener = socket(AF_INET, SOCK_STREAM, 0);
if (listener < 0) { die("ERROR opening socket"); }
bind_listener(listener, port);
if(listen(listener, 5) < 0) { die("ERROR on listen"); }
return listener;
}
void bind_migrator(int migrator) {
struct sockaddr_un address;
memset(&address, 0, sizeof(address));
address.sun_family = AF_UNIX;
strcpy(address.sun_path, SOCKPATH);
unlink(address.sun_path);
if(bind(migrator, (struct sockaddr *) &address, sizeof(address)) < 0) {
die("ERROR on binding migrator");
}
}
int create_migrator_server() {
int migrator = socket(AF_UNIX, SOCK_STREAM, 0);
if(migrator < 0) { die("ERROR opening migrator socket"); }
bind_migrator(migrator);
if(listen(migrator, 5) < 0) { die("ERROR on migrator listen"); }
return migrator;
}
void connect_migrator(int migrator) {
struct sockaddr_un address;
memset(&address, 0, sizeof(address));
address.sun_family = AF_UNIX;
strcpy(address.sun_path, SOCKPATH);
if(connect(migrator, (struct sockaddr *) &address, sizeof(address)) < 0) {
die("ERROR on connecting migrator");
}
}
int create_migrator_client() {
int migrator = socket(AF_UNIX, SOCK_STREAM, 0);
if(migrator < 0) { die("ERROR opening migrator socket"); }
connect_migrator(migrator);
return migrator;
}
// Either creates a new listener and blank connections or migrates all connections
int init_or_migrate(int argc, char **argv, connections_t *connections) {
char *end;
unsigned short port;
int listener;
if (argc != 2) {
fprintf(stderr, "usage: %s <port> OR %s migrate\n", argv[0], argv[0]);
exit(1);
}
port = (unsigned short) strtol(argv[1], &end, 10);
if(*end != '\0') {
// Given anything besides a port number, we migrate
listener = migrate_in(connections);
} else {
listener = create_listener(port);
}
return listener;
}
int set_fds_for_connections(connections_t *connections, fd_set *fds, int fd_max) {
int curr_fd, i;
for(i = 0; i < connections->num; i++) {
curr_fd = connections->fds[i];
FD_SET(curr_fd, fds);
if(curr_fd > fd_max) { fd_max = curr_fd; }
}
return fd_max;
}
int set_fds_for_select(int listener, int migrator, connections_t *connections, fd_set *fds) {
int fd_max = (listener < migrator) ? migrator : listener;
FD_ZERO(fds);
FD_SET(listener, fds);
FD_SET(migrator, fds);
return set_fds_for_connections(connections, fds, fd_max);
}
void add_connection(connections_t *connections, int fd) {
if(connections->num + 1 > MAX_CONNECTIONS) { die("ERROR too many connections"); }
connections->fds[connections->num++] = fd;
}
int accept_connection(int listener) {
struct sockaddr address;
socklen_t address_size = sizeof(address);
int fd = accept(listener, (struct sockaddr *) &address, &address_size);
if(fd < 0) { die("ERROR on accept"); }
return fd;
}
void handle_connection(int fd) {
char buf[BUFSIZE] = {0};
int n = read(fd, buf, BUFSIZE);
if(n < 0) { die("ERROR reading from socket"); }
if(n == 0) { return; } // Punt on handling EOF properly
printf("server received %d bytes: %s", n, buf);
if(write(fd, buf, strlen(buf)) < 0) { die("ERROR writing to socket"); }
}
void handle_echo(connections_t *connections, fd_set *fds) {
int curr_fd, i;
for(i = 0; i < connections->num; i++) {
curr_fd = connections->fds[i];
if(FD_ISSET(curr_fd, fds)) {
handle_connection(curr_fd);
}
}
}
int receive_fd(int migrator){
char iovbuf[1];
struct iovec iov;
struct msghdr msg = {0};
struct cmsghdr *cmsg;
char buf[CMSG_SPACE(sizeof(int))];
int fd, size;
unsigned char *data;
iov.iov_base = iovbuf;
iov.iov_len = 1;
memset(&msg, 0, sizeof(msg));
msg.msg_iov = &iov;
msg.msg_iovlen = 1;
msg.msg_control = buf;
msg.msg_controllen = sizeof(buf);
size = recvmsg(migrator, &msg, 0);
if(size == 0) { return -2; }
if(size < 0) { return -1; }
cmsg = CMSG_FIRSTHDR(&msg);
if(!cmsg->cmsg_type == SCM_RIGHTS) { die("Not SCM_RIGHTS"); }
if(cmsg->cmsg_len != CMSG_LEN(sizeof(int))) { die("Wrong length"); }
memcpy(&fd, CMSG_DATA(cmsg), sizeof(int));
return fd;
}
void send_fd(int migrator, int fd) {
// Data necessary to build control message
char iovbuf[1];
struct iovec iov;
struct msghdr msg = {0};
struct cmsghdr *cmsg;
char buf[CMSG_SPACE(sizeof(int))];
// Configure control message
iovbuf[0] = 0;
iov.iov_base = iovbuf;
iov.iov_len = 1;
memset(&msg, 0, sizeof(msg));
msg.msg_iov = &iov;
msg.msg_iovlen = 1;
msg.msg_control = buf;
msg.msg_controllen = CMSG_LEN(sizeof(int));
cmsg = CMSG_FIRSTHDR(&msg);
cmsg->cmsg_level = SOL_SOCKET;
cmsg->cmsg_type = SCM_RIGHTS; // This is the magic that makes this thing work - see man 7 unix
cmsg->cmsg_len = CMSG_LEN(sizeof(int));
memmove(CMSG_DATA(cmsg), &fd, sizeof(int));
msg.msg_controllen = cmsg->cmsg_len;
if(sendmsg(migrator, &msg, 0) != iov.iov_len) { die("ERROR in sendmsg"); }
}
int migrate_in(connections_t *connections) {
fd_set migrate_fds;
char something[] = "$";
int migrator = create_migrator_client();
int listener, newfd, i;
// Write something (anything), to kick off the migration process
if(write(migrator, &something, 1) < 0) { die("ERROR failed to write to migrator"); }
listener = receive_fd(migrator);
if(listener < 0) { die("ERROR couldn't migrate listener"); }
for(i = 0; i < MAX_CONNECTIONS; i++) {
newfd = receive_fd(migrator);
if(newfd < 0) { break; }
add_connection(connections, newfd);
}
return listener;
}
void migrate_out(int migrator, int listener, connections_t *connections) {
int migrator_conn = accept_connection(migrator);
int i, fd;
send_fd(migrator_conn, listener);
for(i = 0; i < connections->num; i++) {
fd = connections->fds[i];
send_fd(migrator_conn, fd);
}
}
void handle_connections(int listener, int migrator, connections_t *connections) {
fd_set fds;
fd_set migrate_fds;
int fd_max = set_fds_for_select(listener, migrator, connections, &fds);
int ready = select(fd_max + 1, &fds, NULL, NULL, NULL);
int migrator_conn, i, fd;
if(ready < 0) { die("ERROR on select"); }
if(ready == 0) { return; }
if(FD_ISSET(migrator, &fds)) {
migrate_out(migrator, listener, connections);
exit(0);
}
if(FD_ISSET(listener, &fds)) {
add_connection(connections, accept_connection(listener));
}
handle_echo(connections, &fds);
}
int main(int argc, char **argv) {
connections_t connections;
int listener, migrator;
memset(&connections, 0, sizeof(connections));
listener = init_or_migrate(argc, argv, &connections);
migrator = create_migrator_server();
while(1) {
handle_connections(listener, migrator, &connections);
}
}
@jsanders
Copy link
Author

This is an echo server that supports migrating its connections to a new instance of itself transparently to any clients (both new clients and those already connected). To see it in action:

$ gcc -o echod echod.c
$ ./echod 9999

In another window:

$ telnet localhost 9999
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
Hello from a client!
Hello from a client!

In your original window you'll see:

$ ./echod 9999
server received 22 bytes: Hello from a client!

Open a third session:

$ ./echod migrate

The server running in your first session will exit. Go to your telnet session:

$ telnet localhost 9999
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
Hello from a client!
Hello from a client!
Hello again from a client!
Hello again from a client!

In the session running your new server, you'll see:

$ ./echod migrate
server received 28 bytes: Hello again from a client!

Make sure you can still make new connections:

$ telnet localhost 9999
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
Hello from another client!
Hello from another client!

Now in your session running the new server, you'll see:

$ ./echod migrate
server received 28 bytes: Hello again from a client!
server received 28 bytes: Hello from another client!

Ta-da!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment