Skip to content

Instantly share code, notes, and snippets.

@jalcine
Last active March 18, 2024 07:20
Show Gist options
  • Star 6 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save jalcine/d76e70d674a167b971ed to your computer and use it in GitHub Desktop.
Save jalcine/d76e70d674a167b971ed to your computer and use it in GitHub Desktop.
Getting libuv and zeromq working together (polling setup)

Getting libuv and zeromq Working Together

I like libuv and zeromq. This gist holds a bit of code of me attempting to get it working together

  • implement a polling state.
  • implement a timer state.
  • add tests to confirm working state.
  • ....
  • Profit

I plan on making this a full fledge repository of examples when I'm done hammering out details. For now, one can build the examples using CMake.

#include <zmq.h>
#include <unistd.h>
int main(void)
{
void* ctx = zmq_ctx_new();
void* socket = zmq_socket(ctx, ZMQ_REQ);
zmq_connect(socket, "tcp://0.0.0.0:3939");
int request_nbr;
for (request_nbr = 0; request_nbr != 20; request_nbr++)
{
char buffer [10];
printf ("Sending Hello %d…\n", request_nbr);
zmq_send (socket, "Hello", 5, 0);
zmq_recv (socket, buffer, 10, 0);
printf ("Received World %s %d\n", buffer, request_nbr);
}
zmq_close (socket);
zmq_ctx_destroy (ctx);
return 0;
}
#include <zmq.h>
#include <unistd.h>
int main(void)
{
void* ctx = zmq_ctx_new();
void* socket = zmq_socket(ctx, ZMQ_REP);
zmq_bind(socket, "tcp://0.0.0.0:3939");
while (1)
{
char buffer[10];
printf("Listening..");
zmq_recv (socket, buffer, 10, 0);
printf("%s", buffer);
usleep(100);
zmq_send(socket, "World", 5, 0);
}
return 0;
}
# Find libuv
cmake_minimum_required(VERSION 2.6)
project(libuvzeromexamples)
include(FindPkgConfig)
pkg_search_module(ZMQ libzmq REQUIRED)
pkg_search_module(UV libuv REQUIRED)
include_directories(
${_INCLUDE_DIR}
${ZMQ_INCLUDE_DIRS}
${UV_INCLUDE_DIRS}
)
add_executable(polling_server polling_server.c)
add_executable(polling_client polling_client.c)
add_executable(blocking_server blocking_server.c)
add_executable(blocking_client blocking_client.c)
macro(link_bin _tgt)
target_link_libraries(${_tgt}
${ZMQ_LIBRARIES}
${UV_LIBRARIES}
)
target_compile_options(${_tgt} PUBLIC
-g
-O0
)
set_target_properties(${_tgt} PROPERTIES
LINK_FLAGS "-L${UV_LIBRARY_DIRS}"
)
endmacro(link_bin)
link_bin(polling_client)
link_bin(polling_server)
link_bin(blocking_server)
link_bin(blocking_client)
#include "polling_utils.h"
static int count = 0;
void on_socket_polled(uv_poll_t* poller, int status, int events);
void send_server_message(void* socket);
void on_socket_polled(uv_poll_t* poller, int status, int events)
{
assert(poller);
count++;
send_server_message(poller->data);
if (count >= MAX_MSG_COUNT)
{
echo("client", "Killed after %d messages.\n", MAX_MSG_COUNT);
uv_poll_stop(poller);
}
}
void send_server_message(void* socket)
{
assert(socket);
char buffer [BUFFER_SIZE];
int notTheSame = 0;
zmq_send (socket, CLIENT_MSG, BUFFER_SIZE, 0);
while (notTheSame < 0)
{
zmq_recv (socket, buffer, BUFFER_SIZE, 0);
notTheSame = strcmp(buffer, SERVER_MSG);
}
echo("client", "Received '%s' from server; sent %s.\n", buffer, CLIENT_MSG);
sleep(SLEEP_INTERVAL / 3);
}
int main(void)
{
// djj
int zmq_fd = 0;
size_t zmq_fd_len = 0;
void* context = NULL;
void* socket = NULL;
uv_loop_t* loop = NULL;
uv_poll_t poller;
loop = uv_default_loop();
assert(loop != NULL);
context = zmq_ctx_new();
assert(context);
socket = zmq_socket(context, ZMQ_REQ);
assert(socket);
zmq_connect(socket, CONNECTION_STR);
assert(zmq_getsockopt(socket, ZMQ_FD, &zmq_fd, &zmq_fd_len));
uv_poll_init(loop, &poller, zmq_fd);
uv_poll_start(&poller, UV_READABLE, &on_socket_polled);
uv_run(loop, UV_RUN_DEFAULT);
zmq_close(socket);
zmq_ctx_destroy(context);
return 0;
}
#include "polling_utils.h"
void on_timer_elapsed(uv_timer_t* timer)
{
void* socket = timer->data;
char buffer[BUFFER_SIZE];
int notTheSame = 0;
while (notTheSame < 0)
{
zmq_recv (socket, buffer, BUFFER_SIZE, 0);
notTheSame = strcmp(buffer, CLIENT_MSG);
}
zmq_send(socket, SERVER_MSG, BUFFER_SIZE, 0);
echo("server", "Received '%s' from client; Sent '%s'.\n", buffer, SERVER_MSG);
}
int main(void)
{
int r = 0, zmq_fd;
uv_loop_t* loop = NULL;
uv_timer_t timer;
void* context = NULL;
void* server = NULL;
size_t zmq_fd_len;
loop = uv_default_loop();
assert(loop != NULL);
context = zmq_ctx_new();
assert(context);
server = zmq_socket(context, ZMQ_PUB);
zmq_bind(server, CONNECTION_STR);
assert(r == 0);
uv_timer_init(loop, &timer);
assert(r == 0);
timer.data = socket;
zmq_getsockopt(server, ZMQ_FD, &zmq_fd, &zmq_fd_len);
printf("%u", (unsigned int) zmq_fd_len);
uv_timer_set_repeat(&timer, SLEEP_INTERVAL);
uv_timer_start(&timer, &on_timer_elapsed, TIMEOUT, SLEEP_INTERVAL);
uv_run(loop, UV_RUN_DEFAULT);
zmq_close(server);
zmq_ctx_destroy(context);
return 0;
}
#include <uv.h>
#include <zmq.h>
#include <assert.h>
#include <time.h>
#define BUFFER_SIZE 6
#define MAX_MSG_COUNT 2000
#define TIMEOUT 20
#define SLEEP_INTERVAL 100
#define CLIENT_MSG "Hello\0"
#define SERVER_MSG "World\0"
#define CONNECTION_STR "tcp://*:3939"
#define echo(tag, msg, ...) printf("[ %s ] " msg, tag, __VA_ARGS__)
@hintjens
Copy link

For what it's worth, here is the blocking_client.c rewritten to use CZMQ:

#include <czmq.h>
int main(void)
{
    zsock_t *req = zsock_new_req ("tcp://0.0.0.0:3939");
    int request_nbr;
    for (request_nbr = 0; request_nbr < 20; request_nbr++) {
        zstr_send (req, "Hello");
        char *reply = zstr_recv (req);
        printf ("Received World %s %d\n", reply, request_nbr);
        zstr_free (&reply);
    }
    zsock_destroy (&req);
    return 0;
}

@calvin2021y
Copy link

calvin2021y commented Jun 19, 2022

This look nice, I have few questiobn.

It will handle auto reconenction ?
zmq_send could block, or return without send all data ?
zmq_connect cloud block ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment