Skip to content

Instantly share code, notes, and snippets.

@eurocat2k
Last active September 23, 2024 15:31
Show Gist options
  • Save eurocat2k/b20efc85d28698adc632e6b6df5d2cfd to your computer and use it in GitHub Desktop.
Save eurocat2k/b20efc85d28698adc632e6b6df5d2cfd to your computer and use it in GitHub Desktop.
TCP non-blocking client connection using kqueue
#include "tcpsocket.h"
/**
* @brief Create a Socket object
* @name CreateSocket
* @param int domain
* @param bool keepalive
* @param bool nonblock
* @return int socket
*/
int CreateSocket(int domain, bool keepalive, bool nonblock) {
int optval;
socklen_t optlen = sizeof(optval);
static int s, on = 1;
if ((s = socket(domain, SOCK_STREAM, 0)) == -1) {
fprintf(stderr, "creating socket: %s", strerror(errno));
return NET_ERROR;
}
// for server sockets it is usefull to set NONBLOCK flag to true
if (nonblock) {
int flags;
// or we can use NonBlock(int fd) method...
if ((flags = fcntl(s, F_GETFL)) == -1) {
fprintf(stderr, "fcntl(F_GETFL): %s", strerror(errno));
return NET_ERROR;
}
if (fcntl(s, F_SETFL, flags | O_NONBLOCK) == -1) {
fprintf(stderr, "fcntl(F_SETFL,O_NONBLOCK): %s", strerror(errno));
return NET_ERROR;
}
}
/* Make sure connection-intensive things like the redis benckmark
* will be able to close/open sockets a zillion of times */
if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) == -1) {
fprintf(stderr, "setsockopt SO_REUSEADDR: %s", strerror(errno));
return NET_ERROR;
}
if (keepalive) {
if (getsockopt(s, SOL_SOCKET, SO_KEEPALIVE, &optval, &optlen) < 0) {
fprintf(stderr, "getsockopt SO_KEEPALIVE failed: %s", strerror(errno));
close(s);
return NET_ERROR;
}
optval = 1;
optlen = sizeof(optval);
if (setsockopt(s, SOL_SOCKET, SO_KEEPALIVE, &optval, optlen) < 0) {
fprintf(stderr, "setsockopt SO_KEEPALIVE failed: %s", strerror(errno));
close(s);
return NET_ERROR;
}
}
return s;
}
/**
* @name CreateSocketNB
* @brief creates non-blocking TCPsocket for client or server.
*
* @param int domain - domain argument specifies a communications domain this selects the protocol family which
* should be used.
* @param bool isServer - boolean flag determines that the socket is going to be used by TCP server or not.
* @return int socket if all went OK, otherwise NET_ERROR (-1) will be sent back to the caller
*/
int CreateSocketNB(int domain, bool isServer) {
int optval;
socklen_t optlen = sizeof(optval);
static int s, on = 1;
if ((s = socket(domain, SOCK_STREAM | SOCK_NONBLOCK, 0)) == -1) {
fprintf(stderr, "creating socket: %s", strerror(errno));
return NET_ERROR;
}
if (isServer) {
/*
* Make sure connection-intensive things like the redis benckmark
* will be able to close/open sockets a zillion of times
*/
if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) == -1) {
fprintf(stderr, "setsockopt SO_REUSEADDR: %s", strerror(errno));
return NET_ERROR;
}
}
optval = 1;
optlen = sizeof(optval);
if (setsockopt(s, SOL_SOCKET, SO_KEEPALIVE, &optval, optlen) < 0) {
fprintf(stderr, "setsockopt SO_KEEPALIVE failed: %s", strerror(errno));
close(s);
return NET_ERROR;
}
int val = 1;
assert(0 == setsockopt(s, 0, TCP_NODELAY, (char*)&optval, optlen));
return s;
}
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <sys/event.h>
/**
* @brief Registers socket to kqueue write event handler
* @name Register_write_Socket
* @param int _kq - kqueue descriptor
* @param struct kevent *_change_event - change list
* @param int socket - the socket to be registered
* @param void *ctx - event handler context
*/
void Register_write_Socket(int _kq, struct kevent* _change_event, int socket, void* ctx) {
EV_SET(_change_event, socket, EVFILT_WRITE, EV_ADD | EV_ENABLE | EV_ERROR | EV_EOF | EV_CLEAR, 0, 0, ctx);
if (kevent(_kq, _change_event, 1, NULL, 0, NULL) == -1) {
perror("kevent");
close(_kq);
exit(1);
}
}
/**
* @brief Deregisters - removes - previously registered write event handler of a socket
* @name Remove_write_Socket
* @param int _kq - kqueue descriptor
* @param struct kevent *_change_event - change list
* @param int socket - the socket to be removed from event queue handler's list
*/
void Remove_write_Socket(int _kq, struct kevent* _change_event, int socket) {
EV_SET(_change_event, socket, EVFILT_WRITE, EV_DELETE, 0, 0, 0);
if (kevent(_kq, _change_event, 1, NULL, 0, NULL) == -1) {
perror("kevent");
close(_kq);
exit(1);
}
}
#ifndef __MYEVENT_H__
#define __MYEVENT_H__
#include <stdio.h>
#include <errno.h>
#include <unistd.h>
#include <string.h>
#include <sys/event.h>
void Register_write_Socket(int _kq, struct kevent* _change_event, int socket, void* ctx);
void Remove_write_Socket(int _kq, struct kevent* _change_event, int socket);
#endif //__MYEVENT_H__
#include "tcpsocket.h"
#include "myevent.h"
#include <stdbool.h>
/**
* @brief Set socket non-blocking
* @name NonBlock
* @param int fd - the socket
* @return int tretval 0 if success, -1 if does not
*/
int NonBlock(int fd) {
int flags;
if ((flags = fcntl(fd, F_GETFL)) == -1) {
fprintf(stderr, "fcntl(F_GETFL): %s", strerror(errno));
return NET_ERROR;
}
if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1) {
fprintf(stderr, "fcntl(F_SETFL,O_NONBLOCK): %s", strerror(errno));
return NET_ERROR;
}
return NET_OK;
}
/**
* @name TCPClientConnect
* @brief this method tries to establish TCP connection to the remote server - it uses non-blocking socket from
* the result of CreateSocket function. To be able to determine that everything went good during the connection
* request, this function utilizes kqueue event handler to do that. More on this in the code below.
* @param char* address - address string or FQDN host name of the remote site.
* @param int port - the listening port of the remote site.
* @return int connected_socket if success, otherwise NET_ERROR (-1)
*/
int TCPClientConnect(char *address, int port) {
int s, _kq; // local kqueue handler
struct kevent _event[1], _events[1]; // for kqueue
struct sockaddr_in sa; // socket address structure
// We create the non-blocking socket first
if ((s = CreateSocket(AF_INET,true,false)) == NET_ERROR) {
return NET_ERROR;
}
// Specify the address family
sa.sin_family = AF_INET; // domain AF_INET
// add port number
sa.sin_port = htons(port); // register port number
// try to resolve IP address string, if it fails, then resolve by name
if (inet_aton(address, &sa.sin_addr) == 0) { // if the address conforms "aaa.bbb.ccc.ddd", it should be 0
struct hostent *he; // otherwise we try to resolve as hostname - string like FQDN
he = gethostbyname(address); // call help from system's resolver
if (he == NULL) { // it could not resolve the hostname - it is not in the known hosts list
fprintf(stderr, "can't resolve: %s", address);
close(s); // close socket
return NET_ERROR; // return -1
}
memcpy(&sa.sin_addr, he->h_addr, sizeof(struct in_addr));
}
// we got the remote address info, try to connect
if (connect(s, (struct sockaddr*)&sa, sizeof(sa)) == -1) {
fprintf(stdout, " - Connection message: [%d] %s", errno, strerror(errno));
if (errno == 0) { // if no error, we successfully connected
NonBlock(s); // set our socket non-blocking
return s; // return socket
} else if (errno == EINPROGRESS) {
// Because the socket is set to NON-BLOCKING, there is a chance to get some error during the attempt...
// In that case when we want to run this code on Linux, then we need to call the following function
// to get the process status: getsockopt(..., SOL_SOCKET, SO_ERROR, ...)
// otherwise we could miss the reason of the non blocking socket connection failure - connect returns
// -1 if the request is still pending due to some reasons. So, we definitely need to wait the response
// from the OS about the last connection request. If any other kind of error comes back at the end of
// connecting, than we need to sign that this connection request failed, and probably we need to try it
// again later on. Maybe we can setup a timer, and a connection status check count - which is decrementing
// every getsockopt call - (if we do not use kqueue), and if it reaches zero, then we can conclude that the
// connection to the remote size is not OK for this time.
// If we use kqueue however we can skip that test by registering new socket to the write event queue list.
// What does it mean? Well, if we register our non-blocking socket to the write list we can wait - either
// specifying some valuable timeout - or let it wait indefinitely until something happens to the connecting socket.
// Latter case it will - most probably - refused, or rejected, beacuse event.fflags contains the real
// reason of the error (as it would set by connect by default) which is not 0 definitely.
// So, until it is 0, then we continue to wait the positive or negative response from OS about the
// pending connect status.
Kqueue((int *)&_kq); // utilize kqueue
assert(-1 != _kq); // check it is correctly created
// we register the writer event on the socket to see if error occurs or not during the connecting process
Register_write_Socket(_kq, &_event[0], s, NULL); // register new socket in kqueue's write event handler
struct kevent events[1];
// Wait for the event on the write end, if something goes wrong, the event handler will detect...
// bool quit = false;
// for (;;) {
// if (quit) {
// break;
// }
int n = kevent(_kq, NULL, 0, _events, 1, NULL);
if (_events[0].filter == EVFILT_WRITE) {
errno = 0;
if (_events[0].flags & EV_EOF) {
errno = _events[0].fflags;
// if (errno == 0) {
// continue; // it is still in progress, wait if flags becomes 0 or not EINPROGRESS
// }
if (errno != EINPROGRESS) {
// quit = true;
fprintf(stderr, " - connect: %s", strerror(errno));
close(s);
close(_kq);
return NET_ERROR;
}
}
}
// } // end of infinite for loop
} else {
debug_error(" - connect: %s", strerror(errno));
close(s);
close(_kq);
return NET_ERROR;
}
}
close(_kq);
return s;
}
#include <arpa/inet.h>
#include <assert.h>
#include <ctype.h>
#include <errno.h>
#include <netdb.h>
#include <netinet/in.h>
#include <netinet/sctp_uio.h>
#include <stdbool.h>
#include <stdint.h>
#include <stdlib.h>
#include <string.h>
#include <sys/_timespec.h>
#include <sys/_timeval.h>
#include <sys/event.h>
#include <sys/socket.h>
#include <sys/select.h>
#include <syslog.h>
#include <unistd.h>
// MACROS USED BY FUNCTION BELOW
#ifndef debug_print
#define debug_print(fmt, ...) \
do { \
fprintf(stdout, fmt " : in %s at line %d in %s() function\n", \
##__VA_ARGS__, __FILE__, __LINE__, __FUNCTION__); \
syslog(LOG_INFO, fmt " : in %s at line %d in %s() function\n", \
##__VA_ARGS__, __FILE__, __LINE__, __FUNCTION__); \
} while (0)
#endif
#ifndef debug_warning
#define debug_warning(fmt, ...) \
do { \
fprintf(stdout, \
"Warning : " fmt ": in %s at line %d in %s() function\n", \
##__VA_ARGS__, __FILE__, __LINE__, __FUNCTION__); \
syslog(LOG_WARNING, fmt " : in %s at line %d in %s() function\n", \
##__VA_ARGS__, __FILE__, __LINE__, __FUNCTION__); \
} while (0)
#endif
#ifndef debug_info
#define debug_info(fmt, ...) \
do { \
fprintf(stdout, \
"Info : " fmt ": in %s at line %d in %s() function\n", \
##__VA_ARGS__, __FILE__, __LINE__, __FUNCTION__); \
syslog(LOG_NOTICE, fmt " : in %s at line %d in %s() function\n", \
##__VA_ARGS__, __FILE__, __LINE__, __FUNCTION__); \
} while (0)
#endif
#ifndef debug_error
#define debug_error(fmt, ...) \
do { \
fprintf(stderr, \
"Error : " fmt ": in %s at line %d in %s() function\n", \
##__VA_ARGS__, __FILE__, __LINE__, __FUNCTION__); \
syslog(LOG_ERR, fmt " : in %s at line %d in %s() function\n", \
##__VA_ARGS__, __FILE__, __LINE__, __FUNCTION__); \
} while (0)
#endif
#ifndef debug_fatal
#define debug_fatal(fmt, ...) \
do { \
fprintf(stderr, \
"Fatal error : " fmt ": in %s at line %d in %s() function\n", \
##__VA_ARGS__, __FILE__, __LINE__, __FUNCTION__); \
syslog(LOG_CRIT, fmt " : in %s at line %d in %s() function\n", \
##__VA_ARGS__, __FILE__, __LINE__, __FUNCTION__); \
} while (0)
#endif
#ifndef debug_debug
#define debug_debug(fmt, ...) \
do { \
fprintf(stdout, \
"Debug : " fmt ": in %s at line %d in %s() function\n", \
##__VA_ARGS__, __FILE__, __LINE__, __FUNCTION__); \
syslog(LOG_DEBUG, fmt " : in %s at line %d in %s() function\n", \
##__VA_ARGS__, __FILE__, __LINE__, __FUNCTION__); \
} while (0)
#endif
#ifndef debug_log
#define debug_log(fmt, ...) \
do { \
fprintf(stdout, \
"Log : " fmt ": in %s at line %d in %s() function\n", \
##__VA_ARGS__, __FILE__, __LINE__, __FUNCTION__); \
syslog(LOG_DEBUG, \
"Log : " fmt ": in %s at line %d in %s() function\n", \
##__VA_ARGS__, __FILE__, __LINE__, __FUNCTION__); \
} while (0)
#endif
#endif
/**
* @brief Connects to remote server using non-blocking socket
*
* @param address "aaa.bbb.ccc.ddd" address string or hostname
* @param port service number - !!! NOT service name !!!
* @param isNB flag - should be true
* @return int connected socket or NET_ERROR
*/
int TCPClientConnect(char *address, int port, bool isNB) {
fd_set wfd;
FD_ZERO(&wfd);
struct sockaddr_in sa = {0};
// Create a socket for STREAM
int sock = socket(AF_INET, SOCK_STREAM, 0);
if (sock < 0) {
return NET_ERROR;
}
// Set socket non-blocking if needed
if (isNB) {
int flags = fcntl(sock, F_GETFL, 0);
fcntl(sock, F_SETFL, flags | O_NONBLOCK);
}
// Try to resolve address from hostname
struct hostent * he = gethostbyname(address);
sa.sin_family = AF_INET;
sa.sin_port = htons(port);
if (he == NULL) {
// Failed, then do expect address as a IPv4 address string already
debug_warning("[W]CAN'T RESOLVE HOSTNAME %s", address);
// TRY WITH ADDRESS
if (inet_aton(address, &sa.sin_addr) == 0) {
// Failed again, close socket and return error
debug_error("inet_aton failed: %s", strerror(errno));
close(sock);
return NET_ERROR;
} else {
// Up to now, it is OK, we have got the address object for connect
debug_debug("[D]ADDRESS VALIDATED AS %s", inet_ntoa(sa.sin_addr));
}
} else {
// Host entry resolved correctly, use its address
sa.sin_addr = *(struct in_addr*)he->h_addr;
debug_debug("[D]HOSTNAME RESOLVED AS %s", inet_ntoa(sa.sin_addr));
}
// Prepare connect
int errCode = 0;
// Do connect
int result = connect(sock, (const struct sockaddr *)&sa, sizeof(sa));
debug_debug("[D]result = %d, sock = %d", result, sock);
// IF NON-BLOCKING SOCKET USED, THIS CALL BLOCKS UNTIL CALL RETURNS
// WITH EITHER A CONNECTED SOCKET, OR ERROR
if (result < 0) {
errCode = errno;
debug_info("[I]error code: %d - %s", errCode, strerror(errCode));
// THIS ERROR OCCURS ONLY NON-BLOCKING SOCKET USED
if (errCode == EINPROGRESS) {
// PREPARE select OBJECTS
fd_set wfd;
FD_ZERO(&wfd);
FD_SET(sock, &wfd);
// INITIALIZE TIMEOUT STRUCT
struct timeval tv;
tv.tv_sec = 5;
tv.tv_usec = 50000;
// CALL select(2) SYSTEM CALL
result = select(sock + 1, NULL, &wfd, NULL, &tv);
if (result > 0) {
// IF SOCKET SIGNS IT IS WRITEABLE, THEN CHECK IF IT IS READY
// OR NOT...
socklen_t len = sizeof(errCode);
result = getsockopt(sock, SOL_SOCKET, SO_ERROR, &errCode, &len);
if (result < 0){
// getsockopt(3) FAILED, NO GO!
errCode = errno;
close(sock);
debug_info("[I]error code: %d - %s", errCode, strerror(errCode));
} else {
// getsockopt(3) COMPLETED, NOW CHECK IF THERE IS AN ERROR,
// OR SOCKET IS READY NOW TO BE USED AS CONNECTION SOCKET
result = (errCode == 0) ? 0 : -1;
debug_info("[I]error code: %d - %s", errCode, strerror(errCode));
}
} else if (result == 0) {
// THIS MEANS THAT select(2) TIMED OUT, NO GO!
close(sock);
errCode = ETIMEDOUT;
result = -1;
debug_info("[I]error code: %d - %s", errCode, strerror(errCode));
} else {
// BY DEFAULT, IF select(2) FAILS, NO GO!
close(sock);
errCode = errno;
debug_info("[I]error code: %d - %s", errCode, strerror(errCode));
}
}
}
if (result == 0) {
// THIS result IS EITHER FROM connect(2) OR getsockopt(3) ANY WAY,
// THAT MEANS NO ERROR, THEREFORE ***GO***!!!!
return sock;
} else {
// IN ANY OTHER CASE, NO GO!
close(sock);
}
return NET_ERROR;
}
#ifndef _WRAPSOCK_H
#define _WRAPSOCK_H
#include <stdio.h>
#include <errno.h>
#include <unistd.h>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/select.h>
#include <sys/event.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <netdb.h>
#include <fcntl.h>
#include <netinet/tcp.h>
#include <sys/un.h>
#include <sys/stat.h>
#ifndef NET_ERROR
#define NET_ERROR -1
#endif // !NET_ERROR
#ifndef NET_OK
#define NET_OK 0
#endif // !NET_OK
int NonBlock(int fd);
int TCPClientConnect(char *address, int port);
int CreateSocket(int domain, bool keepalive, bool nonblock);
int CreateSocketNB(int domain, bool isServer);
#endif /* _WRAPSOCK_H */
@eurocat2k
Copy link
Author

eurocat2k commented Nov 29, 2022

Connection issues with non-blocking TCP client socket coped with kqueue

This is how I solved the non-blocking TCP client connection difficulties - got and misshandled errors during the connections.

Now extended with all needed wrappers and headers.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment