Last active
July 12, 2018 15:11
-
-
Save taotetek/bdf7253d3c271df03768 to your computer and use it in GitHub Desktop.
Updated Credit Based Flow Control Example ( See: http://hintjens.com/blog:15 )
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include "czmq.h" | |
// DEBUG - set to true for some output | |
#define DEBUG false | |
#define TRANSIT_TOTAL 1024 * 1024 | |
#define TRANSIT_SLICE TRANSIT_TOTAL / 4 | |
#define FRAGMENT_SIZE 65536 | |
#define SERVER_HWM (TRANSIT_TOTAL / FRAGMENT_SIZE) * 2 | |
#define TICK_SECONDS 5 | |
// ------------------------------------------------------------------- | |
// client is a client that requests work with credits | |
typedef struct { | |
bool terminated; // client exits if set to true | |
zsock_t *dealer; // dealer socket for communicating to server | |
zsock_t *pipe; // pair socket for commands from parent thread | |
zpoller_t *poller; // poller for polling sockets | |
int64_t expected_seq; // next expected sequence number | |
int64_t current_seq; // currently received sequence number | |
int received; // bytes received | |
} client_t; | |
// ------------------------------------------------------------------ | |
// client_new accepts a pipe from a zactor and returns a new client | |
client_t * | |
client_new (zsock_t *pipe) { | |
client_t *self = ( client_t *) zmalloc (sizeof (client_t)); | |
assert (self); | |
self->expected_seq = 0; | |
self->current_seq = 0; | |
self->received = 0; | |
self->terminated = false; | |
self->pipe = pipe; | |
self->dealer = zsock_new_dealer ("tcp://127.0.0.1:10001"); | |
self->poller = zpoller_new (self->pipe, self->dealer, NULL); | |
return self; | |
} | |
// ------------------------------------------------------------------ | |
// s_client_handle_pipe handles commands from the parent thread | |
static void | |
s_client_handle_pipe (client_t *self) | |
{ | |
char *command = zstr_recv (self->pipe); | |
if (streq (command, "$TERM")) { | |
self->terminated = true; | |
} | |
else { | |
zsys_error ("client: invalid command: %s", command); | |
assert (false); | |
} | |
zstr_free (&command); | |
} | |
// ------------------------------------------------------------------ | |
// s_client_handle_dealer handles messages from a server | |
static void | |
s_client_handle_dealer (client_t *self) | |
{ | |
zframe_t *content = zframe_new_empty (); | |
zsock_brecv (self->dealer, "8f", &self->current_seq, &content); | |
if (self->current_seq != self->expected_seq) { | |
zsys_error ("server dropped %d messages, exit (%d/%d)", | |
(int) (self->current_seq - self->expected_seq), | |
(int) self->current_seq, (int) self->expected_seq); | |
exit (1); | |
} | |
if (DEBUG) | |
zsys_info ("received message: %" PRId64 "", self->current_seq); | |
self->expected_seq++; | |
self->received += zframe_size (content); | |
if (self->received > TRANSIT_SLICE) { | |
self->received -= TRANSIT_SLICE; | |
zsock_bsend (self->dealer, "4", TRANSIT_SLICE); | |
} | |
zframe_destroy (&content); | |
} | |
// ------------------------------------------------------------------ | |
// s_client_destroy cleans up a client before it exits | |
static void | |
s_client_destroy (client_t **self_p) | |
{ | |
assert (self_p); | |
if (*self_p) { | |
client_t *self = *self_p; | |
zpoller_destroy (&self->poller); | |
zsock_destroy (&self->dealer); | |
free (self); | |
*self_p = NULL; | |
} | |
} | |
// ------------------------------------------------------------------ | |
// client_actor is a credit flow control based client. It sends | |
// "credits" that represent bytes it is allowed to receive to a | |
// server, that sends work back in response. the client earns the | |
// right to receive more work by earning credits for completed | |
// work, and sending those credits back to the server | |
void | |
client_actor (zsock_t *pipe, void *args) | |
{ | |
client_t *self = client_new (pipe); | |
zsock_signal (pipe, 0); | |
if (DEBUG) | |
zsys_info ("client sent %" PRId64 " credit", TRANSIT_TOTAL); | |
zsock_bsend (self->dealer, "4", TRANSIT_TOTAL); | |
while (!self->terminated) { | |
zsock_t *which = (zsock_t *) zpoller_wait (self->poller, -1); | |
if (zpoller_terminated (self->poller)) | |
break; | |
else | |
if (which == self->pipe) { | |
s_client_handle_pipe (self); | |
} | |
else | |
if (which == self->dealer) { | |
s_client_handle_dealer (self); | |
} | |
} | |
s_client_destroy (&self); | |
} | |
// ------------------------------------------------------------------- | |
// client_ledger is a ledger that tracks the credit that a client | |
// has earned, and tracks the current sequence for that client | |
// that is used to detect dropped messages | |
typedef struct { | |
zframe_t *identity; // the identity frame for the client | |
int credit; // current credit tally for the client | |
int64_t sequence; // current sequence number for the client | |
} client_ledger_t; | |
// ------------------------------------------------------------------- | |
// server is a server that receives credits from clients, and returns | |
// work to them. | |
typedef struct { | |
bool terminated; // server exits if set to true | |
zsock_t *router; // router for the server - receives client request | |
zsock_t *pipe; // pipe receives commands from the parent thread | |
zpoller_t *poller; // poller polls the sockets | |
zlist_t *clients; // list containing client ledgers | |
int64_t bytes_sent; // total bytes sent | |
} server_t; | |
// ------------------------------------------------------------------ | |
// server_new accepts a pipe from a zactor and returns a new server | |
server_t * | |
server_new (zsock_t *pipe) { | |
if (DEBUG) | |
zsys_info ("server_new..."); | |
server_t *self = ( server_t *) zmalloc (sizeof (server_t)); | |
assert (self); | |
self->clients = zlist_new (); | |
self->pipe = pipe; | |
self->router = zsock_new (ZMQ_ROUTER); | |
zsock_set_sndhwm (self->router, SERVER_HWM); | |
zsock_bind (self->router, "tcp://*:10001"); | |
self->poller = zpoller_new (self->pipe, self->router, NULL); | |
return self; | |
} | |
int64_t | |
server_stats (zactor_t *server) { | |
zstr_send (server, "$STATS"); | |
int64_t bytes_sent; | |
zsock_brecv (server, "8", &bytes_sent); | |
return bytes_sent; | |
} | |
// ------------------------------------------------------------------ | |
// s_server_handle_pipe handles commands from the parent thread | |
static void | |
s_server_handle_pipe (server_t *self) | |
{ | |
char *command = zstr_recv (self->pipe); | |
if (streq (command, "$TERM")) { | |
self->terminated = true; | |
} | |
else | |
if (streq (command, "$STATS")) { | |
zsock_bsend (self->pipe, "8", self->bytes_sent); | |
} | |
else { | |
zsys_error ("server: invalid command: %s", command); | |
assert (false); | |
} | |
zstr_free (&command); | |
} | |
// ------------------------------------------------------------------ | |
// s_server_handle_router handles messages from clients | |
static void | |
s_server_handle_router (server_t *self) | |
{ | |
int credit; | |
zframe_t *client_frame = zframe_recv (self->router); | |
zsock_brecv (self->router, "4", &credit); | |
client_ledger_t *client = (client_ledger_t *) zlist_first (self->clients); | |
while (client) { | |
if (zframe_eq (client->identity, client_frame)) | |
break; | |
client = (client_ledger_t *) zlist_next (self->clients); | |
} | |
if (client == NULL) { | |
client = (client_ledger_t *) zmalloc (sizeof (client_ledger_t)); | |
client->identity = client_frame; | |
zlist_append (self->clients, client); | |
} | |
else | |
zframe_destroy (&client_frame); | |
client->credit += credit; | |
client = (client_ledger_t *) zlist_first (self->clients); | |
while (client) { | |
while (client->credit >= FRAGMENT_SIZE) { | |
int msgsize = FRAGMENT_SIZE + randof (1000) - randof (1000); | |
zframe_t *content = zframe_new (NULL, msgsize); | |
zframe_send (&client->identity, self->router, | |
ZFRAME_MORE + ZFRAME_REUSE); | |
zsock_bsend (self->router, "8f", client->sequence, content); | |
client->sequence++; | |
self->bytes_sent += msgsize; | |
client->credit -= msgsize; | |
zframe_destroy (&content); | |
} | |
client = (client_ledger_t *) zlist_next (self->clients); | |
} | |
} | |
// ------------------------------------------------------------------ | |
// s_server_destroy cleans up a client before it exits | |
static void | |
s_server_destroy (server_t **self_p) | |
{ | |
assert (self_p); | |
if (*self_p) { | |
server_t *self = *self_p; | |
zpoller_destroy (&self->poller); | |
zsock_destroy (&self->router); | |
zlist_destroy (&self->clients); | |
free (self); | |
*self_p = NULL; | |
} | |
} | |
// ------------------------------------------------------------------ | |
// server_actor is a credit flow control based server. It receives | |
// "credits" that represent bytes from clients, and sends work | |
// to clients as they earn the right to receive bytes. | |
void | |
server_actor (zsock_t *pipe, void *args) | |
{ | |
server_t *self = server_new (pipe); | |
zsock_signal (self->pipe, 0); | |
while (!zctx_interrupted) { | |
zsock_t *which = (zsock_t *) zpoller_wait (self->poller, -1); | |
if (zpoller_terminated (self->poller)) | |
break; | |
else | |
if (which == self->pipe) { | |
s_server_handle_pipe (self); | |
} | |
else | |
if (which == self->router) { | |
s_server_handle_router (self); | |
} | |
} | |
s_server_destroy (&self); | |
} | |
int main (void) | |
{ | |
zsys_info ("starting server..."); | |
zactor_t *server = zactor_new (server_actor, NULL); | |
zsys_info ("starting client..."); | |
zactor_t *client = zactor_new (client_actor, NULL); | |
int64_t previous_bytes_sent = 0; | |
while (!zctx_interrupted) { | |
zclock_sleep (TICK_SECONDS * 1000); | |
int64_t new_bytes_sent = server_stats (server); | |
int64_t interval_bytes_sent = new_bytes_sent - previous_bytes_sent; | |
int64_t bytes_per_second = interval_bytes_sent / TICK_SECONDS; | |
int64_t gigabits_per_second = (bytes_per_second * 8) / 1000000000; | |
zsys_info ("tick! sent: %" PRId64 " bytes in %d seconds ( %" PRId64 " bytes/s : %" PRId64 " gigabits/s)", | |
interval_bytes_sent, TICK_SECONDS, bytes_per_second, gigabits_per_second); | |
previous_bytes_sent = new_bytes_sent; | |
} | |
zactor_destroy (&client); | |
zactor_destroy (&server); | |
return 0; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment