Skip to content

Instantly share code, notes, and snippets.

@taotetek
Last active July 12, 2018 15:11
Show Gist options
  • Save taotetek/bdf7253d3c271df03768 to your computer and use it in GitHub Desktop.
Save taotetek/bdf7253d3c271df03768 to your computer and use it in GitHub Desktop.
Updated Credit Based Flow Control Example ( See: http://hintjens.com/blog:15 )
#include "czmq.h"
// DEBUG - set to true for some output
#define DEBUG false
#define TRANSIT_TOTAL 1024 * 1024
#define TRANSIT_SLICE TRANSIT_TOTAL / 4
#define FRAGMENT_SIZE 65536
#define SERVER_HWM (TRANSIT_TOTAL / FRAGMENT_SIZE) * 2
#define TICK_SECONDS 5
// -------------------------------------------------------------------
// client is a client that requests work with credits
typedef struct {
bool terminated; // client exits if set to true
zsock_t *dealer; // dealer socket for communicating to server
zsock_t *pipe; // pair socket for commands from parent thread
zpoller_t *poller; // poller for polling sockets
int64_t expected_seq; // next expected sequence number
int64_t current_seq; // currently received sequence number
int received; // bytes received
} client_t;
// ------------------------------------------------------------------
// client_new accepts a pipe from a zactor and returns a new client
client_t *
client_new (zsock_t *pipe) {
client_t *self = ( client_t *) zmalloc (sizeof (client_t));
assert (self);
self->expected_seq = 0;
self->current_seq = 0;
self->received = 0;
self->terminated = false;
self->pipe = pipe;
self->dealer = zsock_new_dealer ("tcp://127.0.0.1:10001");
self->poller = zpoller_new (self->pipe, self->dealer, NULL);
return self;
}
// ------------------------------------------------------------------
// s_client_handle_pipe handles commands from the parent thread
static void
s_client_handle_pipe (client_t *self)
{
char *command = zstr_recv (self->pipe);
if (streq (command, "$TERM")) {
self->terminated = true;
}
else {
zsys_error ("client: invalid command: %s", command);
assert (false);
}
zstr_free (&command);
}
// ------------------------------------------------------------------
// s_client_handle_dealer handles messages from a server
static void
s_client_handle_dealer (client_t *self)
{
zframe_t *content = zframe_new_empty ();
zsock_brecv (self->dealer, "8f", &self->current_seq, &content);
if (self->current_seq != self->expected_seq) {
zsys_error ("server dropped %d messages, exit (%d/%d)",
(int) (self->current_seq - self->expected_seq),
(int) self->current_seq, (int) self->expected_seq);
exit (1);
}
if (DEBUG)
zsys_info ("received message: %" PRId64 "", self->current_seq);
self->expected_seq++;
self->received += zframe_size (content);
if (self->received > TRANSIT_SLICE) {
self->received -= TRANSIT_SLICE;
zsock_bsend (self->dealer, "4", TRANSIT_SLICE);
}
zframe_destroy (&content);
}
// ------------------------------------------------------------------
// s_client_destroy cleans up a client before it exits
static void
s_client_destroy (client_t **self_p)
{
assert (self_p);
if (*self_p) {
client_t *self = *self_p;
zpoller_destroy (&self->poller);
zsock_destroy (&self->dealer);
free (self);
*self_p = NULL;
}
}
// ------------------------------------------------------------------
// client_actor is a credit flow control based client. It sends
// "credits" that represent bytes it is allowed to receive to a
// server, that sends work back in response. the client earns the
// right to receive more work by earning credits for completed
// work, and sending those credits back to the server
void
client_actor (zsock_t *pipe, void *args)
{
client_t *self = client_new (pipe);
zsock_signal (pipe, 0);
if (DEBUG)
zsys_info ("client sent %" PRId64 " credit", TRANSIT_TOTAL);
zsock_bsend (self->dealer, "4", TRANSIT_TOTAL);
while (!self->terminated) {
zsock_t *which = (zsock_t *) zpoller_wait (self->poller, -1);
if (zpoller_terminated (self->poller))
break;
else
if (which == self->pipe) {
s_client_handle_pipe (self);
}
else
if (which == self->dealer) {
s_client_handle_dealer (self);
}
}
s_client_destroy (&self);
}
// -------------------------------------------------------------------
// client_ledger is a ledger that tracks the credit that a client
// has earned, and tracks the current sequence for that client
// that is used to detect dropped messages
typedef struct {
zframe_t *identity; // the identity frame for the client
int credit; // current credit tally for the client
int64_t sequence; // current sequence number for the client
} client_ledger_t;
// -------------------------------------------------------------------
// server is a server that receives credits from clients, and returns
// work to them.
typedef struct {
bool terminated; // server exits if set to true
zsock_t *router; // router for the server - receives client request
zsock_t *pipe; // pipe receives commands from the parent thread
zpoller_t *poller; // poller polls the sockets
zlist_t *clients; // list containing client ledgers
int64_t bytes_sent; // total bytes sent
} server_t;
// ------------------------------------------------------------------
// server_new accepts a pipe from a zactor and returns a new server
server_t *
server_new (zsock_t *pipe) {
if (DEBUG)
zsys_info ("server_new...");
server_t *self = ( server_t *) zmalloc (sizeof (server_t));
assert (self);
self->clients = zlist_new ();
self->pipe = pipe;
self->router = zsock_new (ZMQ_ROUTER);
zsock_set_sndhwm (self->router, SERVER_HWM);
zsock_bind (self->router, "tcp://*:10001");
self->poller = zpoller_new (self->pipe, self->router, NULL);
return self;
}
int64_t
server_stats (zactor_t *server) {
zstr_send (server, "$STATS");
int64_t bytes_sent;
zsock_brecv (server, "8", &bytes_sent);
return bytes_sent;
}
// ------------------------------------------------------------------
// s_server_handle_pipe handles commands from the parent thread
static void
s_server_handle_pipe (server_t *self)
{
char *command = zstr_recv (self->pipe);
if (streq (command, "$TERM")) {
self->terminated = true;
}
else
if (streq (command, "$STATS")) {
zsock_bsend (self->pipe, "8", self->bytes_sent);
}
else {
zsys_error ("server: invalid command: %s", command);
assert (false);
}
zstr_free (&command);
}
// ------------------------------------------------------------------
// s_server_handle_router handles messages from clients
static void
s_server_handle_router (server_t *self)
{
int credit;
zframe_t *client_frame = zframe_recv (self->router);
zsock_brecv (self->router, "4", &credit);
client_ledger_t *client = (client_ledger_t *) zlist_first (self->clients);
while (client) {
if (zframe_eq (client->identity, client_frame))
break;
client = (client_ledger_t *) zlist_next (self->clients);
}
if (client == NULL) {
client = (client_ledger_t *) zmalloc (sizeof (client_ledger_t));
client->identity = client_frame;
zlist_append (self->clients, client);
}
else
zframe_destroy (&client_frame);
client->credit += credit;
client = (client_ledger_t *) zlist_first (self->clients);
while (client) {
while (client->credit >= FRAGMENT_SIZE) {
int msgsize = FRAGMENT_SIZE + randof (1000) - randof (1000);
zframe_t *content = zframe_new (NULL, msgsize);
zframe_send (&client->identity, self->router,
ZFRAME_MORE + ZFRAME_REUSE);
zsock_bsend (self->router, "8f", client->sequence, content);
client->sequence++;
self->bytes_sent += msgsize;
client->credit -= msgsize;
zframe_destroy (&content);
}
client = (client_ledger_t *) zlist_next (self->clients);
}
}
// ------------------------------------------------------------------
// s_server_destroy cleans up a client before it exits
static void
s_server_destroy (server_t **self_p)
{
assert (self_p);
if (*self_p) {
server_t *self = *self_p;
zpoller_destroy (&self->poller);
zsock_destroy (&self->router);
zlist_destroy (&self->clients);
free (self);
*self_p = NULL;
}
}
// ------------------------------------------------------------------
// server_actor is a credit flow control based server. It receives
// "credits" that represent bytes from clients, and sends work
// to clients as they earn the right to receive bytes.
void
server_actor (zsock_t *pipe, void *args)
{
server_t *self = server_new (pipe);
zsock_signal (self->pipe, 0);
while (!zctx_interrupted) {
zsock_t *which = (zsock_t *) zpoller_wait (self->poller, -1);
if (zpoller_terminated (self->poller))
break;
else
if (which == self->pipe) {
s_server_handle_pipe (self);
}
else
if (which == self->router) {
s_server_handle_router (self);
}
}
s_server_destroy (&self);
}
int main (void)
{
zsys_info ("starting server...");
zactor_t *server = zactor_new (server_actor, NULL);
zsys_info ("starting client...");
zactor_t *client = zactor_new (client_actor, NULL);
int64_t previous_bytes_sent = 0;
while (!zctx_interrupted) {
zclock_sleep (TICK_SECONDS * 1000);
int64_t new_bytes_sent = server_stats (server);
int64_t interval_bytes_sent = new_bytes_sent - previous_bytes_sent;
int64_t bytes_per_second = interval_bytes_sent / TICK_SECONDS;
int64_t gigabits_per_second = (bytes_per_second * 8) / 1000000000;
zsys_info ("tick! sent: %" PRId64 " bytes in %d seconds ( %" PRId64 " bytes/s : %" PRId64 " gigabits/s)",
interval_bytes_sent, TICK_SECONDS, bytes_per_second, gigabits_per_second);
previous_bytes_sent = new_bytes_sent;
}
zactor_destroy (&client);
zactor_destroy (&server);
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment