Skip to content

Instantly share code, notes, and snippets.

@Asmod4n
Last active August 29, 2015 14:08
Show Gist options
  • Save Asmod4n/03afadfbf3b45c250334 to your computer and use it in GitHub Desktop.
Save Asmod4n/03afadfbf3b45c250334 to your computer and use it in GitHub Desktop.
czmq http_parser wslay ruby websockets
[submodule "picohttpparser"]
path = picohttpparser
url = https://github.com/h2o/picohttpparser.git
users
hallo
passwd = "$7$C6..../....m/68Yww6iu2cN.h/dijVZagz/K2avROoj/C6SRKzgH7$SZesp4Y0nCX39RFMrxGEUxuxZ6afiLMC8HKTuqkPHM8"
source 'https://rubygems.org'
gem 'ffi-czmq', github: 'Asmod4n/ruby-ffi-czmq'
gem 'http_parser.rb'
gem 'ffi-wslay', github: 'Asmod4n/ruby-ffi-wslay'
gem 'hitimes'
gem 'pry'
GIT
remote: git://github.com/Asmod4n/ruby-ffi-czmq.git
revision: f15a3a1a689ea61cd3c4222c615ce90e2873f9bf
specs:
ffi-czmq (0.1.1.pre)
ffi (>= 1.9.6)
GIT
remote: git://github.com/Asmod4n/ruby-ffi-wslay.git
revision: b3fffd24b74a94404b1242604af34c1fb4a0c119
specs:
ffi-wslay (0.0.1)
ffi (>= 1.9.6)
rake
GEM
remote: https://rubygems.org/
specs:
coderay (1.1.0)
ffi (1.9.6)
hitimes (1.2.2)
http_parser.rb (0.6.0)
method_source (0.8.2)
pry (0.10.1)
coderay (~> 1.1.0)
method_source (~> 0.8.1)
slop (~> 3.4)
rake (10.4.1)
slop (3.6.0)
PLATFORMS
ruby
DEPENDENCIES
ffi-czmq!
ffi-wslay!
hitimes
http_parser.rb
pry
/*
hallo
*/
#include <czmq.h>
#include <sodium.h>
#include "picohttpparser/picohttpparser.h"
#if defined (__UTYPE_LINUX)
#include <sys/inotify.h>
#define INOTIFY_EVENT_SIZE (sizeof (struct inotify_event) + NAME_MAX + 1)
#define INOTIFY_BUFF_SIZE (1000 * INOTIFY_EVENT_SIZE)
#endif
#define S_SELF_TAG 0x2342cafe
#define MAX_HEADERS 100
#define MALFORMED_REQUEST "HTTP/1.1 400 Bad Request\r\nConnection: close\r\n\r\n"
#define METHOD_NOT_ALLOWED "HTTP/1.1 405 Method not allowed\r\nConnection: close\r\n\r\n"
#define SERVICE_UNAVAILABLE "HTTP/1.1 503 Service Unavailable\r\nConnection: close\r\n\r\n"
#define NOT_AUTHORIZED "HTTP/1.1 401 Not Authorized\r\nWWW-Authenticate: Basic realm=\"%s\"\r\nConnection: close\r\n\r\n"
#define OKAY "HTTP/1.1 204 No Content\r\nConnection: close\r\n\r\n"
static bool
strrcmp(const char* s, size_t l, const char* t)
{
return strlen(t) == l && memcmp(s, t, l) == 0;
}
typedef struct {
uint32_t tag;
zsock_t *pipe;
zconfig_t *users;
char *not_authed;
zarmour_t *base64;
zloop_t *reactor;
#if defined (__UTYPE_LINUX)
int fd;
int wd;
#endif
bool verbose;
zsock_t *webserver;
int port;
} self_t;
static bool
s_self_is (void *self)
{
assert (self);
return ((self_t *) self)->tag == S_SELF_TAG;
}
static void
s_self_destroy (self_t **self_p)
{
assert (self_p);
if (*self_p) {
self_t *self = *self_p;
assert (s_self_is (self));
self->tag = 0xDeadBeef;
zconfig_destroy (&self->users);
zstr_free (&self->not_authed);
zarmour_destroy (&self->base64);
zloop_destroy (&self->reactor);
#if defined (__UTYPE_LINUX)
if (self->fd > 0 && self->wd > 0)
self->wd = inotify_rm_watch (self->fd, self->wd);
if (self->fd > 0)
self->fd = close (self->fd);
#endif
zsock_destroy (&self->webserver);
self->port = 0;
zsock_set_sndtimeo (self->pipe, 0);
(void) zsock_signal (self->pipe, 0);
free (self);
*self_p = NULL;
}
}
static int
s_auth_basic (self_t *self, const char *auth)
{
assert (self);
assert (s_self_is (self));
assert (auth);
int rc = -1;
size_t decoded_size;
char *decoded = (char *) zarmour_decode (self->base64, auth, &decoded_size);
if (decoded) {
char *user = strsep (&decoded, ":");
char *passwd = strsep (&decoded, ":");
if (user && passwd) {
char *path = zsys_sprintf ("/users/%s/passwd", user);
if (path) {
char *str = zconfig_resolve (self->users, path, NULL);
zstr_free (&path);
if (str)
rc = crypto_pwhash_scryptsalsa208sha256_str_verify(
str, passwd, strlen (passwd));
}
}
zstr_free (&decoded);
}
return rc;
}
static int
s_self_handle_webserver (zloop_t *reactor, zsock_t *webserver, void *arg)
{
self_t *self = (self_t *) arg;
assert (self);
assert (s_self_is (self));
zframe_t *peer_frame = zframe_recv (webserver);
if (!peer_frame)
return -1;
zframe_t *request = zframe_recv (webserver);
if (!request) {
zframe_destroy (&peer_frame);
return -1;
}
if (zframe_size (request) == 0) {
zframe_destroy (&peer_frame);
zframe_destroy (&request);
return 0;
}
const char* method;
size_t method_len;
const char* path;
size_t path_len;
int minor_version;
struct phr_header headers[MAX_HEADERS];
size_t num_headers = MAX_HEADERS;
int rc = phr_parse_request ((const char *) zframe_data (request),
zframe_size (request), &method, &method_len,
&path, &path_len, &minor_version,
headers, &num_headers, 0);
if (rc > 0) {
if (strrcmp (method, method_len, "GET")) {
char *auth_header = NULL;
bool has_auth_header = false;
for (int i = 0; i < num_headers; ++i) {
if (strrcmp (headers[i].name, headers[i].name_len, "Authorization") &&
headers[i].value_len >= 10) {
auth_header = strndup (headers[i].value, headers[i].value_len);
has_auth_header = true;
}
}
if (has_auth_header) {
if (auth_header) {
if (memcmp (auth_header, "Basic ", 6) == 0) {
if (s_auth_basic (self, auth_header + 6) == 0)
zframe_reset (request, OKAY, strlen (OKAY));
else
zframe_reset (request, self->not_authed, strlen (self->not_authed));
}
else
zframe_reset (request, self->not_authed, strlen (self->not_authed));
zstr_free (&auth_header);
}
else
zframe_reset (request, SERVICE_UNAVAILABLE, strlen (SERVICE_UNAVAILABLE));
}
else
zframe_reset (request, self->not_authed, strlen (self->not_authed));
}
else
zframe_reset (request, METHOD_NOT_ALLOWED, strlen (METHOD_NOT_ALLOWED));
}
else
zframe_reset (request, MALFORMED_REQUEST, strlen (MALFORMED_REQUEST));
rc = zframe_send (&peer_frame, webserver, ZFRAME_MORE + ZFRAME_REUSE);
rc = zframe_send (&request, webserver, ZFRAME_MORE);
int sndtimeo = zsock_sndtimeo (webserver);
zsock_set_sndtimeo (webserver, 0);
rc = zframe_send (&peer_frame, webserver, ZFRAME_MORE);
rc = zstr_sendm (webserver, "");
zsock_set_sndtimeo (webserver, sndtimeo);
return 0;
}
static int
s_self_bind (self_t *self, zmsg_t *request)
{
int rc = -1;
if (!self->webserver) {
self->webserver = zsock_new (ZMQ_STREAM);
if (self->webserver) {
rc = zloop_reader ( self->reactor,
self->webserver,
s_self_handle_webserver,
self);
}
}
else
rc = 0;
if (rc == 0) {
char *endpoint = zmsg_popstr (request);
if (endpoint) {
rc = zsock_bind (self->webserver, "%s", endpoint);
if (rc != -1) {
self->port = rc;
rc = 0;
}
zstr_free (&endpoint);
}
else
rc = -1;
}
return rc;
}
static int
s_self_handle_pipe (zloop_t *reactor, zsock_t *pipe, void *arg)
{
self_t *self = (self_t *) arg;
assert (self);
assert (s_self_is (self));
zmsg_t *request = zmsg_recv (pipe);
if (!request)
return -1;
int rc = -1;
char *command = zmsg_popstr (request);
if (command) {
if (self->verbose)
zsys_info ("API command=%s", command);
if (streq (command, "$TERM")) {
s_self_destroy (&self);
rc = 0;
}
else
if (streq (command, "VERBOSE")) {
self->verbose = true;
zloop_set_verbose (self->reactor, true);
rc = 0;
}
else
if (streq (command, "QUITE")) {
self->verbose = false;
zloop_set_verbose (self->reactor, false);
rc = 0;
}
else
if (streq (command, "REALM")) {
char *realm = zmsg_popstr (request);
if (realm) {
zstr_free (&self->not_authed);
self->not_authed = zsys_sprintf (NOT_AUTHORIZED, realm);
if (self->not_authed)
rc = 0;
zstr_free (&realm);
}
}
else
if (streq (command, "BIND"))
rc = s_self_bind (self, request);
else
if (streq (command, "UNBIND")) {
if (self->webserver) {
char *endpoint = zmsg_popstr (request);
if (endpoint) {
rc = zsock_unbind (self->webserver, "%s", endpoint);
zstr_free (&endpoint);
}
}
}
else
if (streq (command, "PORT"))
rc = zstr_sendf (pipe, "%d", self->port);
else
if (streq (command, "LAST_ENDPOINT")) {
char *endpoint = zsock_last_endpoint (self->webserver);
if (endpoint) {
rc = zstr_send (pipe, endpoint);
zstr_free (&endpoint);
}
else {
rc = zstr_send (pipe, "");
rc = -1;
}
}
else {
zsys_error ("invalid command: %s", command);
assert (false);
}
zstr_free (&command);
}
zmsg_destroy (&request);
return zsock_signal (pipe, rc);
}
#if defined (__UTYPE_LINUX)
static int
s_self_handle_inotify (zloop_t *loop, zmq_pollitem_t *item, void *arg)
{
if ((item->revents & ZMQ_POLLERR) > 0)
return -1;
self_t *self = (self_t *) arg;
assert (self);
assert (s_self_is (self));
char buffer[INOTIFY_BUFF_SIZE];
ssize_t length = read (item->fd, buffer, INOTIFY_BUFF_SIZE);
if (length <= 0)
return -1;
return zconfig_reload (&self->users);
}
#endif
static self_t *
s_self_new (zsock_t *pipe, const char *filename)
{
assert (pipe);
assert (zsock_is (pipe));
assert (filename);
self_t *self = (self_t *) zmalloc (sizeof (self_t));
if (!self)
return NULL;
int rc = -1;
self->tag = S_SELF_TAG;
self->pipe = pipe;
self->users = zconfig_load (filename);
if (self->users)
self->not_authed = zsys_sprintf (NOT_AUTHORIZED, "DeadBeef");
if (self->not_authed)
self->base64 = zarmour_new ();
if (self->base64)
self->reactor = zloop_new ();
if (self->reactor)
rc = zloop_reader (self->reactor, pipe, s_self_handle_pipe, self);
#if defined (__UTYPE_LINUX)
if (rc != -1)
rc = self->fd = inotify_init1 (IN_NONBLOCK);
if (rc != -1)
rc = self->wd = inotify_add_watch (self->fd, zconfig_filename (self->users), IN_CREATE | IN_MODIFY);
if (rc != -1) {
zmq_pollitem_t pollitem = { NULL, self->fd, ZMQ_POLLIN, 0 };
rc = zloop_poller (self->reactor, &pollitem, s_self_handle_inotify, self);
}
#endif
if (rc != -1) {
self->verbose = false;
self->webserver = NULL;
self->port = 0;
}
else
s_self_destroy(&self);
return self;
}
void
http_auth (zsock_t *pipe, void *filename)
{
assert (pipe);
assert (zsock_is (pipe));
assert (filename);
int rc = sodium_init ();
if (rc != -1) {
self_t *self = s_self_new (pipe, (const char *) filename);
if (self) {
zsock_signal (pipe, 0);
zloop_start (self->reactor);
s_self_destroy (&self);
}
}
}
int main (void)
{
int rc = -1;
zactor_t *auth = zactor_new (http_auth, "foo.txt");
if (auth) {
rc = zstr_send (auth, "VERBOSE");
if (rc == 0)
rc = zsock_wait (auth);
if (rc == 0)
rc = zstr_sendx (auth, "BIND", "tcp://127.0.0.1:8087", NULL);
if (rc == 0)
rc = zsock_wait (auth);
if (rc == -1)
return rc;
void *sock = zactor_resolve (auth);
while (!zsys_interrupted) {
if (zsocket_poll (sock, -1)) {
zmsg_t *msg = zmsg_recv (auth);
if (msg)
zmsg_destroy (&msg);
else
break;
}
else
break;
}
zactor_destroy (&auth);
}
return rc;
}
#if defined (__UTYPE_LINUX)
#undef INOTIFY_EVENT_SIZE
#undef INOTIFY_BUFF_SIZE
#endif
#undef S_SELF_TAG
#undef MAX_HEADERS
#undef MALFORMED_REQUEST
#undef METHOD_NOT_ALLOWED
#undef SERVICE_UNAVAILABLE
#undef NOT_AUTHORIZED
#undef OKAY
/*
hallo
*/
#ifdef __cplusplus
extern "C" {
#endif
CZMQ_EXPORT void
http_auth (zsock_t *pipe, void *filename);
#ifdef __cplusplus
}
#endif
/*
hallo
*/
#include <czmq.h>
#include "picohttpparser/picohttpparser.h"
#include <wslay/wslay.h>
#define S_SELF_TAG 0x2314cafe
#define S_PEER_TAG 0x2342cafe
#define MALFORMED_REQUEST "HTTP/1.1 400 Bad Request\r\nConnection: close\r\n\r\n"
#define METHOD_NOT_ALLOWED "HTTP/1.1 405 Method not allowed\r\nConnection: close\r\n\r\n"
#define SERVICE_UNAVAILABLE "HTTP/1.1 503 Service Unavailable\r\nConnection: close\r\n\r\n"
#define UPGRADE_REQUIRED "HTTP/1.1 426 Upgrade required\r\nUpgrade: websocket\r\nConnection: Upgrade\r\n\r\n"
#define WS_GUID "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"
#define WS_UPGRADE "HTTP/1.1 101 Switching Protocols\r\nUpgrade: websocket\r\nConnection: Upgrade\r\nSec-WebSocket-Accept: %s\r\n\r\n"
#define MAX_HEADERS 20
static bool
strrcmp(const char* s, size_t l, const char* t)
{
return strlen(t) == l && memcmp(s, t, l) == 0;
}
typedef struct {
uint32_t tag;
bool verbose;
bool running;
zsock_t *pipe;
zloop_t *reactor;
zsock_t *webserver;
int port;
zhashx_t *peers;
struct wslay_event_callbacks wslay_cbs;
} self_t;
typedef struct {
uint32_t tag;
zframe_t *peer_frame;
char *peer_id_hex;
bool accepted;
int64_t latency;
wslay_event_context_ptr wslay_ctx;
self_t *self;
void *ticket;
byte *buffer;
size_t buffer_size;
} peer_t;
static bool
s_self_is (void *self)
{
assert (self);
return ((self_t *) self)->tag == S_SELF_TAG;
}
static void
s_self_destroy (self_t **self_p)
{
assert (self_p);
if (*self_p) {
self_t *self = *self_p;
assert (s_self_is (self));
zhashx_destroy (&self->peers);
zloop_destroy (&self->reactor);
zsock_destroy (&self->webserver);
self->tag = 0xDeadBeef;
zsock_signal (self->pipe, 0);
free (self);
*self_p = NULL;
}
}
static bool
s_peer_is (void *peer)
{
assert (peer);
return ((peer_t *) peer)->tag == S_PEER_TAG;
}
static void
s_peer_destroy (peer_t **peer_p)
{
assert (peer_p);
if (*peer_p) {
peer_t *peer = *peer_p;
assert (s_peer_is (peer));
self_t *self = peer->self;
assert (self);
assert (s_self_is (self));
zframe_destroy (&peer->peer_frame);
zstr_free (&peer->peer_id_hex);
if (peer->ticket) {
zloop_ticket_delete (self->reactor, peer->ticket);
peer->ticket = NULL;
}
if (peer->wslay_ctx) {
wslay_event_context_free (peer->wslay_ctx);
peer->wslay_ctx = NULL;
}
peer->buffer = NULL;
peer->buffer_size = 0;
peer->tag = 0xDeadBeef;
free (peer);
*peer_p = NULL;
}
}
static void
s_peer_disconnect (peer_t *peer)
{
assert (peer);
assert (s_peer_is (peer));
self_t *self = peer->self;
assert (self);
assert (s_self_is (self));
int sndtimeo = zsock_sndtimeo (self->webserver);
zsock_set_sndtimeo (self->webserver, 0);
(void) zframe_send (&peer->peer_frame, self->webserver, ZFRAME_MORE);
(void) zstr_sendm (self->webserver, "");
zsock_set_sndtimeo (self->webserver, sndtimeo);
zhashx_delete (self->peers, peer->peer_id_hex);
}
static peer_t *
s_peer_new (self_t *self, zframe_t *peer_frame)
{
assert (self);
assert (s_self_is (self));
assert (peer_frame);
assert (zframe_is (peer_frame));
peer_t *peer = (peer_t *) zmalloc (sizeof (peer_t));
if (peer) {
peer->peer_frame = zframe_dup (peer_frame);
if (peer->peer_frame) {
peer->peer_id_hex = zframe_strhex (peer->peer_frame);
if (peer->peer_id_hex) {
peer->accepted = false;
peer->self = self;
peer->tag = S_PEER_TAG;
}
}
if (peer->tag != S_PEER_TAG)
s_peer_destroy (&peer);
}
return peer;
}
static int
s_peer_read (peer_t *peer, zframe_t *request)
{
assert (peer);
assert (s_peer_is (peer));
assert (request);
assert (zframe_is (request));
peer->buffer = zframe_data (request);
peer->buffer_size = zframe_size (request);
int rc = 0;
if (wslay_event_want_read (peer->wslay_ctx))
rc = wslay_event_recv (peer->wslay_ctx);
return rc;
}
static int
s_peer_write (peer_t *peer)
{
assert (peer);
assert (s_peer_is (peer));
int rc = 0;
while (wslay_event_want_write (peer->wslay_ctx)) {
rc = wslay_event_send (peer->wslay_ctx);
if (rc != 0)
break;
}
return rc;
}
static int
s_peer_timeout (zloop_t *loop, int timer_id, void *arg)
{
peer_t *peer = (peer_t *) arg;
assert (peer);
assert (s_peer_is (peer));
s_peer_disconnect (peer);
return 0;
}
static int
s_write (peer_t *peer, const uint8_t *data, size_t len)
{
assert (peer);
assert (s_peer_is (peer));
assert (data);
self_t *self = peer->self;
assert (self);
assert (s_self_is (self));
int rc = zframe_send (&peer->peer_frame, self->webserver, ZFRAME_MORE | ZFRAME_REUSE);
if (rc == 0) {
zframe_t *msg = zframe_new (data, len);
if (msg)
rc = zframe_send (&msg, self->webserver, ZFRAME_MORE);
else
rc = -1;
}
return rc;
}
static ssize_t
s_wslay_recv_cb (wslay_event_context_ptr ctx, uint8_t *buf, size_t len, int flags, void *user_data)
{
peer_t *peer = (peer_t *) user_data;
assert (peer);
assert (s_peer_is (peer));
if (peer->buffer) {
if (peer->buffer_size > len) {
memmove (buf, peer->buffer, len);
peer->buffer_size -= len;
peer->buffer += len;
return len;
}
else {
memmove (buf, peer->buffer, peer->buffer_size);
int buffer_size = peer->buffer_size;
peer->buffer = NULL;
peer->buffer_size = 0;
return buffer_size;
}
}
else {
wslay_event_set_error(ctx, WSLAY_ERR_WOULDBLOCK);
return -1;
}
}
static ssize_t
s_wslay_send_cb (wslay_event_context_ptr ctx, const uint8_t *data, size_t len, int flags, void *user_data)
{
peer_t *peer = (peer_t *) user_data;
assert (peer);
assert (s_peer_is (peer));
int rc = s_write (peer, data, len);
if (rc == -1) {
if (zmq_errno () == EAGAIN)
wslay_event_set_error(ctx, WSLAY_ERR_WOULDBLOCK);
else
wslay_event_set_error(ctx, WSLAY_ERR_CALLBACK_FAILURE);
return rc;
}
return len;
}
static void
s_wslay_on_msg_recv_cb (wslay_event_context_ptr ctx, const struct wslay_event_on_msg_recv_arg *arg, void *user_data)
{
peer_t *peer = (peer_t *) user_data;
assert (peer);
assert (s_peer_is (peer));
self_t *self = peer->self;
assert (self);
assert (s_self_is (self));
zloop_ticket_reset (self->reactor, peer->ticket);
if (wslay_is_ctrl_frame (arg->opcode)) {
if (arg->opcode == WSLAY_PONG)
peer->latency = zclock_usecs () - peer->latency;
}
else {
struct wslay_event_msg msgarg = {
arg->opcode, arg->msg, arg->msg_length
};
wslay_event_queue_msg (ctx, &msgarg);
(void) zsock_bsend (self->pipe, "s811S82", peer->peer_id_hex,
peer->latency, arg->rsv,
arg->opcode, arg->msg,
arg->msg_length,
arg->status_code);
}
}
static int
create_accept_key(char *dst, const char *peer_key)
{
assert (dst);
assert (peer_key);
int rc = -1;
byte key_src [60];
memmove (key_src, peer_key, 24);
memmove (key_src+24, WS_GUID, 36);
zdigest_t *digest = zdigest_new ();
if (digest) {
zdigest_update (digest, key_src, sizeof (key_src));
zarmour_t *zarmour = zarmour_new ();
if (zarmour) {
char *encoded = zarmour_encode (zarmour, zdigest_data (digest), zdigest_size (digest));
zarmour_destroy (&zarmour);
if (encoded) {
strcpy (dst, encoded);
zstr_free (&encoded);
rc = 0;
}
}
zdigest_destroy (&digest);
}
return rc;
}
static int
s_websocket_handshake (peer_t *peer, zframe_t *request)
{
assert (peer);
assert (s_peer_is (peer));
assert (request);
assert (zframe_is (request));
const char* method;
size_t method_len;
const char* path;
size_t path_len;
int minor_version;
struct phr_header headers[MAX_HEADERS];
size_t num_headers = MAX_HEADERS;
int rc = phr_parse_request ((const char *) zframe_data (request),
zframe_size (request), &method, &method_len,
&path, &path_len, &minor_version,
headers, &num_headers, 0);
if (rc > 0) {
if (strrcmp (method, method_len, "GET")) {
bool is_websocket = false;
bool has_ws_ver = false;
const char *secWebSocketKey = NULL;
for (int i = 0; i < num_headers; ++i) {
if (strrcmp (headers[i].name, headers[i].name_len, "Upgrade") &&
(strrcmp (headers[i].value, headers[i].value_len, "websocket") ||
strrcmp (headers[i].value, headers[i].value_len, "WebSocket")))
is_websocket = true;
if (strrcmp (headers[i].name, headers[i].name_len, "Sec-WebSocket-Version") &&
strrcmp (headers[i].value, headers[i].value_len, "13"))
has_ws_ver = true;
if (strrcmp (headers[i].name, headers[i].name_len, "Sec-WebSocket-Key") &&
headers[i].value_len == 24)
secWebSocketKey = headers[i].value;
}
if (is_websocket && has_ws_ver && secWebSocketKey) {
char accept_key [29];
rc = create_accept_key (accept_key, secWebSocketKey);
if (rc == 0) {
char *handshake = zsys_sprintf (WS_UPGRADE, accept_key);
if (handshake) {
rc = s_write (peer, (const unsigned char *) handshake, strlen (handshake));
zstr_free (&handshake);
if (rc == 0) {
self_t *self = peer->self;
assert (self);
assert (s_self_is (self));
rc = wslay_event_context_server_init(&peer->wslay_ctx, &self->wslay_cbs, peer);
if (rc == 0)
peer->ticket = zloop_ticket (self->reactor, (zloop_timer_fn *) s_peer_timeout, peer);
if (peer->ticket)
peer->accepted = true;
else {
rc = s_write (peer, (const unsigned char *) SERVICE_UNAVAILABLE, strlen (SERVICE_UNAVAILABLE));
rc = -1;
}
}
}
else {
rc = s_write (peer, (const unsigned char *) SERVICE_UNAVAILABLE, strlen (SERVICE_UNAVAILABLE));
rc = -1;
}
}
else {
rc = s_write (peer, (const unsigned char *) SERVICE_UNAVAILABLE, strlen (SERVICE_UNAVAILABLE));
rc = -1;
}
}
else {
rc = s_write (peer, (const unsigned char *) UPGRADE_REQUIRED, strlen (UPGRADE_REQUIRED));
rc = -1;
}
}
else {
rc = s_write (peer, (const unsigned char *) METHOD_NOT_ALLOWED, strlen (METHOD_NOT_ALLOWED));
rc = -1;
}
}
else {
rc = s_write (peer, (const unsigned char *) MALFORMED_REQUEST, strlen (MALFORMED_REQUEST));
rc = -1;
}
return rc;
}
static int
s_self_latency (zloop_t *loop, int timer_id, void *arg)
{
self_t *self = (self_t *) arg;
assert (self);
assert (s_self_is (self));
peer_t *peer = (peer_t *) zhashx_first (self->peers);
struct wslay_event_msg msgarg = {
WSLAY_PING, NULL, 0
};
int rc = 0;
while (peer && peer->wslay_ctx) {
assert (s_peer_is (peer));
rc = wslay_event_queue_msg (peer->wslay_ctx, &msgarg);
if (rc == 0) {
rc = s_peer_write (peer);
if (rc == 0)
peer->latency = zclock_usecs ();
}
peer = (peer_t *) zhashx_next (self->peers);
}
return 0;
}
static int
s_self_handle_webserver (zloop_t *reactor, zsock_t *webserver, void *arg)
{
zframe_t *peer_frame = zframe_recv (webserver);
if (!peer_frame)
return -1;
zframe_t *request = zframe_recv (webserver);
if (!request) {
zframe_destroy (&peer_frame);
return -1;
}
char *peer_id_hex = zframe_strhex (peer_frame);
if (peer_id_hex) {
self_t *self = (self_t *) arg;
assert (self);
assert (s_self_is (self));
peer_t *peer = NULL;
int rc = -1;
if (zframe_size (request) == 0) {
peer = (peer_t *) zhashx_lookup (self->peers, peer_id_hex);
if (peer) {
zhashx_delete (self->peers, peer_id_hex);
rc = 0;
}
else {
peer = s_peer_new (self, peer_frame);
if (peer)
rc = zhashx_insert (self->peers, peer_id_hex, peer);
else {
int sndtimeo = zsock_sndtimeo (webserver);
zsock_set_sndtimeo (webserver, 0);
(void) zframe_send (&peer_frame, webserver, ZFRAME_MORE);
(void) zstr_sendm (webserver, "");
zsock_set_sndtimeo (webserver, sndtimeo);
}
}
}
else {
peer = (peer_t *) zhashx_lookup (self->peers, peer_id_hex);
assert (peer);
assert (s_peer_is (peer));
if (peer->accepted) {
rc = s_peer_read (peer, request);
if (rc == 0) {
if (wslay_event_get_close_received (peer->wslay_ctx))
s_peer_disconnect (peer);
else {
rc = s_peer_write (peer);
if (rc == 0) {
if (wslay_event_get_close_sent (peer->wslay_ctx))
s_peer_disconnect (peer);
}
}
}
}
else
rc = s_websocket_handshake (peer, request);
}
if (rc == -1)
s_peer_disconnect (peer);
zstr_free (&peer_id_hex);
}
else {
int sndtimeo = zsock_sndtimeo (webserver);
zsock_set_sndtimeo (webserver, 0);
(void) zframe_send (&peer_frame, webserver, ZFRAME_MORE);
(void) zstr_sendm (webserver, "");
zsock_set_sndtimeo (webserver, sndtimeo);
}
zframe_destroy (&request);
zframe_destroy (&peer_frame);
return 0;
}
static int
s_self_start (self_t *self)
{
assert (self);
assert (s_self_is (self));
int rc = 0;
if (!self->running) {
rc = zloop_reader (self->reactor, self->webserver, (zloop_reader_fn *) s_self_handle_webserver, self);
if (rc != -1)
rc = zloop_timer (self->reactor, 4999, 0, (zloop_timer_fn *) s_self_latency, self);
if (rc != -1) {
rc = 0;
self->running = true;
}
else
zloop_reader_end (self->reactor, self->webserver);
}
return rc;
}
static int
s_self_bind (self_t *self, const char *endpoint)
{
assert (self);
assert (s_self_is (self));
assert (endpoint);
int port = zsock_bind (self->webserver, "%s", endpoint);
if (port != -1) {
self->port = port;
return 0;
}
return port;
}
static int
s_self_unbind (self_t *self, const char *endpoint)
{
assert (self);
assert (s_self_is (self));
assert (endpoint);
return zsock_unbind (self->webserver, "%s", endpoint);
}
static int
s_self_handle_pipe (zloop_t *reactor, zsock_t *pipe, void *arg)
{
zmsg_t *request = zmsg_recv (pipe);
if (!request)
return -1;
int rc = -1;
char *command = zmsg_popstr (request);
if (command) {
self_t *self = (self_t *) arg;
assert (self);
assert (s_self_is (self));
if (self->verbose)
zsys_info ("API command=%s", command);
if (streq (command, "$TERM")) {
s_self_destroy (&self);
rc = 0;
}
else
if (streq (command, "START"))
rc = s_self_start (self);
else
if (streq (command, "VERBOSE")) {
self->verbose = true;
zloop_set_verbose (reactor, true);
rc = 0;
}
else
if (streq (command, "BIND")) {
char *endpoint = zmsg_popstr (request);
if (endpoint) {
rc = s_self_bind (self, endpoint);
zstr_free (&endpoint);
}
}
else
if (streq (command, "UNBIND")) {
char *endpoint = zmsg_popstr (request);
if (endpoint) {
rc = s_self_unbind (self, endpoint);
zstr_free (&endpoint);
}
}
else
if (streq (command, "SEND_STR")) {
char *peer_id_hex = zmsg_popstr (request);
if (peer_id_hex) {
peer_t *peer = (peer_t *) zhashx_lookup (self->peers, peer_id_hex);
zstr_free (&peer_id_hex);
assert (peer);
assert (s_peer_is (peer));
zframe_t *msg = zmsg_pop (request);
struct wslay_event_msg msgarg = {
WSLAY_TEXT_FRAME, zframe_data (msg), zframe_size (msg)
};
zframe_destroy (&msg);
rc = wslay_event_queue_msg (peer->wslay_ctx, &msgarg);
if (rc == 0) {
rc = s_peer_write (peer);
if (rc != 0)
s_peer_disconnect (peer);
}
else
s_peer_disconnect (peer);
}
}
else {
zsys_error ("invalid command: %s", command);
assert (false);
}
zstr_free (&command);
}
zmsg_destroy (&request);
return zsock_signal (pipe, rc);
}
static self_t *
s_self_new (zsock_t *pipe)
{
assert (zsock_is (pipe));
self_t *self = (self_t *) zmalloc (sizeof (self_t));
if (!self)
return NULL;
self->verbose = false;
self->running = false;
self->pipe = pipe;
self->reactor = zloop_new ();
if (self->reactor)
self->webserver = zsock_new (ZMQ_STREAM);
self->port = 0;
if (self->webserver)
self->peers = zhashx_new ();
if (!self->peers) {
s_self_destroy (&self);
return NULL;
}
zhashx_set_destructor (self->peers, (czmq_destructor *) s_peer_destroy);
int rc = zloop_reader (self->reactor, self->pipe, (zloop_reader_fn *) s_self_handle_pipe, self);
if (rc == -1) {
s_self_destroy (&self);
return NULL;
}
zloop_set_ticket_delay (self->reactor, 45000);
self->wslay_cbs.recv_callback = (wslay_event_recv_callback) s_wslay_recv_cb;
self->wslay_cbs.send_callback = (wslay_event_send_callback) s_wslay_send_cb;
self->wslay_cbs.on_msg_recv_callback = (wslay_event_on_msg_recv_callback) s_wslay_on_msg_recv_cb;
self->tag = S_SELF_TAG;
return self;
}
void
zwebsock (zsock_t *pipe, void *unused)
{
assert (pipe);
assert (zsock_is (pipe));
assert (unused == NULL);
self_t *self = s_self_new (pipe);
if (self) {
zsock_signal (self->pipe, 0);
zloop_start (self->reactor);
s_self_destroy (&self);
}
}
int main(void)
{
int rc = -1;
zsys_set_ipv6 (1);
zsys_set_logident ("zwebsock");
zactor_t *websock = zactor_new (zwebsock, NULL);
assert (websock);
rc = zstr_send (websock, "VERBOSE");
if (rc == 0)
rc = zsock_wait (websock);
if (rc == 0)
rc = zstr_sendx (websock, "BIND", "tcp://127.0.0.1:9001", NULL);
if (rc == 0)
rc = zsock_wait (websock);
if (rc == 0)
rc = zstr_send (websock, "START");
if (rc == 0)
rc = zsock_wait (websock);
if (rc == -1)
goto fail;
void *sock = zactor_resolve (websock);
char *peer_id_hex;
int64_t latency;
uint8_t rsv;
uint8_t opcode;
uint8_t *msg;
size_t msg_length;
uint16_t status_code;
while (!zsys_interrupted) {
if (zsocket_poll (sock, -1)) {
rc = zsock_brecv (websock, "s811S82", &peer_id_hex, &latency, &rsv, &opcode, &msg, &msg_length, &status_code);
zsys_info ("recv: peer_id_hex: %s latency: %jd rsv: %u opcode: %u msg: %s length: %zu status_code: %u", peer_id_hex, latency, rsv, opcode, msg, msg_length, status_code);
free (msg);
}
else
break;
}
fail:
zactor_destroy (&websock);
return rc;
}
#undef S_SELF_TAG
#undef S_PEER_TAG
#undef MALFORMED_REQUEST
#undef METHOD_NOT_ALLOWED
#undef SERVICE_UNAVAILABLE
#undef UPGRADE_REQUIRED
#undef WS_GUID
#undef WS_UPGRADE
#undef MAX_HEADERS
require 'bundler/setup'
require 'czmq'
require 'http/parser'
require 'digest/sha1'
require 'base64'
require 'wslay'
require 'hitimes'
module Zwebsock
module Helpers
module_function
def destroy_zframe(zframe)
FFI::MemoryPointer.new(:pointer) do |zframe_ptr|
zframe_ptr.write_pointer(zframe)
CZMQ::Zframe.destructor(zframe_ptr)
end
end
def read_zframe(zframe)
CZMQ::Zframe.data(zframe).read_bytes(CZMQ::Zframe.size(zframe))
end
def recv_zframe(src)
if (zframe = CZMQ::Zframe.recv_zframe(src))
zframe
else
raise IOError, CZMQ::Utils.error, caller
end
end
def send_zframe(zframe, dest, flags)
FFI::MemoryPointer.new(:pointer) do |zframe_ptr|
zframe_ptr.write_pointer(zframe)
unless (CZMQ::Zframe.send_zframe(zframe_ptr, dest, flags)).zero?
unless CZMQ::Zsys.interrupted
raise IOError, CZMQ::Utils.error, caller
end
end
end
end
def new_zframe(data, len)
if (zframe = CZMQ::Zframe.constructor(data, len))
zframe
else
raise NoMemoryError, CZMQ::Utils.error, caller
end
end
def hex_zframe(zframe)
if (hex_frame = CZMQ::Zframe.strhex(zframe))
CZMQ::Zstr.read_string(hex_frame)
else
raise NoMemoryError, CZMQ::Utils.error, caller
end
end
end
end
module Zwebsock
class WslayError < StandardError; end
end
module Zwebsock
class Peer
attr_reader :peer_id, :peer_id_hex
attr_accessor :accepted, :latency, :wslay_ctx, :buffer, :buffer_size
def initialize(peer_id, peer_id_hex)
@peer_id = peer_id
@peer_id_hex = peer_id_hex
end
def tell(data)
case data
when String
case data.encoding
when Encoding::UTF_8
FFI::MemoryPointer.new(:uint8, data.bytesize) do |msg|
msg.write_bytes(data)
send_data(Wslay::OpCode[:text_frame], msg, msg.size)
end
when Encoding::ASCII_8BIT
FFI::MemoryPointer.new(:uint8, data.bytesize) do |msg|
msg.write_bytes(data)
send_data(Wslay::OpCode[:binary_frame], msg, msg.size)
end
else
fail ArgumentError, "Data String must be in UTF-8 or binary encoding"
end
when Array
FFI::MemoryPointer.new(:uint8, data.size) do |msg|
msg.write_array_of_bytes(data)
send_data(Wslay::OpCode[:binary_frame], msg, msg.size)
end
when FFI::Pointer
send_data(Wslay::OpCode[:binary_frame], data, data.size)
else
fail ArgumentError, "Data is neither a String, Array, or FFI::Pointer"
end
self
end
alias_method :<<, :tell
def send_data(opcode, msg, msg_length)
response = Wslay::Event::Msg.new
response[:opcode] = opcode
response[:msg] = msg
response[:msg_length] = msg_length
if (rc = Wslay::Event.queue_msg(wslay_ctx, response)) == 0
write
else
if rc == :nomem
raise NoMemoryError
else
raise WslayError, rc.to_s
end
end
end
def ping
msg = Wslay::Event::Msg.new
msg[:opcode] = Wslay::OpCode[:ping]
if (rc = Wslay::Event.queue_msg(wslay_ctx, msg)) == 0
write
latency.start
0
else
if rc == :nomem
raise NoMemoryError
else
raise WslayError, rc.to_s
end
end
end
def read(request)
rc = 0
if Wslay::Event.want_read(wslay_ctx)
self.buffer = CZMQ::Zframe.data(request)
self.buffer_size = CZMQ::Zframe.size(request)
unless (rc = Wslay::Event.read(wslay_ctx)) == 0
if rc == :nomem
raise NoMemoryError
else
raise WslayError, rc.to_s
end
end
end
rc
end
def write
rc = 0
while Wslay::Event.want_write(wslay_ctx)
unless (rc = Wslay::Event.write(wslay_ctx)) == 0
if rc == :nomem
raise NoMemoryError
else
raise WslayError, rc.to_s
end
end
end
rc
end
end
class Server
include Helpers
include CZMQ
extend Forwardable
GET = 'GET'.freeze
UPGRADE = 'Upgrade'.freeze
WEBSOCKET = 'websocket'.freeze
SEC_WEBSOCKET_KEY = 'Sec-WebSocket-Key'.freeze
SEC_WEBSOCKET_VERSION = 'Sec-WebSocket-Version'.freeze
CRLF = "\r\n".freeze
HANDSHAKE = "HTTP/1.1 101 Switching Protocols#{CRLF}#{UPGRADE}: #{WEBSOCKET}#{CRLF}Connection: #{UPGRADE}#{CRLF}Sec-WebSocket-Accept: %s#{CRLF * 2}".freeze
UPGRADE_REQUIRED = FFI::MemoryPointer.from_string("HTTP/1.1 426 #{UPGRADE} Required#{CRLF}#{UPGRADE}: #{WEBSOCKET}#{CRLF}Connection: #{UPGRADE}#{CRLF * 2}").freeze
EMPTY_STR = ''.freeze
def_delegators :@parent_pipe, :<<, :tell, :recv, :wait, :signal, :destructor
def initialize(endpoint, &callback)
@endpoint = endpoint
@callback = callback
@parent_pipe = Zactor.new_actor(&method(:run))
at_exit { destructor }
end
private
def write(peer, data, len)
send_zframe(new_zframe(peer.peer_id, peer.peer_id.size), @server.to_zsock, Zframe::MORE)
send_zframe(new_zframe(data, len), @server.to_zsock, Zframe::MORE)
rescue IOError => e
if Utils.errno == Errno::EHOSTUNREACH::Errno
delete_peer(peer)
else
raise e
end
end
def disconnect(peer)
@server.set_sndtimeo(0)
send_zframe(new_zframe(peer.peer_id, peer.peer_id.size), @server.to_zsock, Zframe::MORE)
Zstr.sendm(@server.to_zsock, EMPTY_STR)
@server.set_sndtimeo(-1)
rescue IOError
nil
ensure
delete_peer(peer)
end
def delete_peer(peer)
if peer.wslay_ctx
peer.wslay_ctx = Wslay::Event::Context.free(peer.wslay_ctx)
end
@peers.delete(peer.peer_id_hex)
end
def run(child_pipe)
Thread.current.abort_on_exception = true
@child_pipe = child_pipe
@peers = {}
@reactor = Zloop.new
@reactor.add_reader(@child_pipe, &method(:handle_pipe))
@server = Zsock.new(:stream)
@server.set_maxmsgsize(1024 * 1024 + 8192)
@port = @server.bind(@endpoint)
if Utils.version[:zmq][:major] >= 4 && Utils.version[:zmq][:minor] >= 1
zloop_reader_fn = FFI::Function.new(:int, [:pointer, :pointer, :pointer], blocking: true) do |zloop_t, zsock_t, args|
handle_server_4_1(zsock_t)
end
else
zloop_reader_fn = FFI::Function.new(:int, [:pointer, :pointer, :pointer], blocking: true) do |zloop_t, zsock_t, args|
handle_server(zsock_t)
end
end
Zloop.reader(@reactor.to_zloop, @server.to_zsock, zloop_reader_fn, nil)
@reactor.add_timer(4999, 0, &method(:handle_pings))
@wslay_callbacks = Wslay::Event::Callbacks.new
@wslay_callbacks.recv_callback(&method(:recv_callback))
@wslay_callbacks.send_callback(&method(:send_callback))
@wslay_callbacks.on_msg_recv_callback(&method(:on_msg_recv_callback))
@parser = HTTP::Parser.new
@child_pipe.signal(0)
Zsys.info "zwebsock: - started on #{@endpoint}, port: #{@port}"
@reactor.start
rescue => e
Zsys.error(format_exception(e))
@child_pipe.signal(-1)
end
def handle_pipe(zsock)
msg = zsock.recv
case msg.first.to_str
when '$TERM'
@peers.each_value do |peer|
disconnect(peer)
end
0
when 'PORT'
Zstr.send_zstr(zsock.to_zsock, @port.to_s)
end
rescue => e
Zsys.error(format_exception(e))
zsock.signal(-1)
-1
end
def handle_server(zsock)
peer_frame = nil
request = nil
peer = nil
peer_frame = recv_zframe(zsock)
request = recv_zframe(zsock)
unless @peers.key?(peer_id_hex = hex_zframe(peer_frame))
peer_id = FFI::MemoryPointer.new(:uchar, Zframe.size(peer_frame))
peer_id.__copy_from__(Zframe.data(peer_frame), peer_id.size)
peer = @peers[peer_id_hex] = Peer.new(peer_id, peer_id_hex)
end
peer = @peers[peer_id_hex]
if peer.accepted
peer.read(request)
if Wslay::Event.get_close_received(peer.wslay_ctx)
disconnect(peer)
else
peer.write
if Wslay::Event.get_close_sent(peer.wslay_ctx)
disconnect(peer)
end
end
else
websocket_handshake(peer, request)
end
0
rescue WslayError
disconnect(peer) if peer
0
rescue => e
Zsys.error(format_exception(e))
@child_pipe.signal(-1)
-1
ensure
destroy_zframe(request) if request
destroy_zframe(peer_frame) if peer_frame
end
def handle_server_4_1(zsock)
peer_frame = nil
request = nil
peer = nil
peer_frame = recv_zframe(zsock)
peer_id_hex = hex_zframe(peer_frame)
request = recv_zframe(zsock)
if Zframe.size(request).zero?
if @peers.key?(peer_id_hex)
delete_peer(@peers[peer_id_hex])
else
peer_id = FFI::MemoryPointer.new(:uchar, Zframe.size(peer_frame))
peer_id.__copy_from__(Zframe.data(peer_frame), peer_id.size)
peer = @peers[peer_id_hex] = Peer.new(peer_id, peer_id_hex)
end
else
peer = @peers[peer_id_hex]
if peer.accepted
peer.read(request)
if Wslay::Event.get_close_received(peer.wslay_ctx)
disconnect(peer)
else
peer.write
if Wslay::Event.get_close_sent(peer.wslay_ctx)
disconnect(peer)
end
end
else
websocket_handshake(peer, request)
end
end
0
rescue WslayError
disconnect(peer) if peer
0
rescue => e
Zsys.error(format_exception(e))
@child_pipe.signal(-1)
-1
ensure
destroy_zframe(request) if request
destroy_zframe(peer_frame) if peer_frame
end
def handle_pings(timer_id)
peer = nil
@peers.each_value do |peer_handler|
peer = peer_handler
peer.ping
end
0
rescue WslayError
disconnect(peer) if peer
0
rescue => e
Zsys.error(format_exception(e))
@child_pipe.signal(-1)
-1
end
def websocket_handshake(peer, request)
@parser.reset!
@parser.on_headers_complete = proc do |headers|
if (@parser.http_method.casecmp(GET).zero? &&
@parser.upgrade? &&
headers[UPGRADE].casecmp(WEBSOCKET).zero? &&
headers.key?(SEC_WEBSOCKET_KEY) &&
headers[SEC_WEBSOCKET_KEY].bytesize == 24 &&
headers[SEC_WEBSOCKET_VERSION].to_i == 13)
handshake = format(HANDSHAKE, create_accept_key(headers[SEC_WEBSOCKET_KEY]))
peer.latency = Hitimes::TimedMetric.new(:latency)
FFI::MemoryPointer.new(:pointer) do |wslay_ctx_ptr|
if (rc = Wslay::Event::Context.server_init(wslay_ctx_ptr, @wslay_callbacks, peer.peer_id_hex)) == 0
wslay_ctx = wslay_ctx_ptr.read_pointer
Wslay::Event::Config.set_max_recv_msg_length(wslay_ctx, 1024 * 1024)
peer.wslay_ctx = wslay_ctx
write(peer, handshake, handshake.bytesize)
peer.accepted = true
else
if rc == :nomem
raise NoMemoryError
else
raise WslayError, rc.to_s
end
end
end
else
write(peer, UPGRADE_REQUIRED, UPGRADE_REQUIRED.size - 1)
disconnect(peer)
end
:stop
end
@parser << read_zframe(request)
end
def recv_callback(ctx, buf, len, flags, user_data)
peer = @peers[user_data.get_string(0)]
if (peer.buffer)
if peer.buffer_size > len
buf.__copy_from__(peer.buffer, len)
peer.buffer_size -= len
peer.buffer += len
len
else
buf.__copy_from__(peer.buffer, peer.buffer_size)
peer.buffer = nil
buffer_size = peer.buffer_size
peer.buffer_size = 0
buffer_size
end
else
Wslay::Event.set_error(ctx, :would_block)
-1
end
rescue => e
Zsys.error(format_exception(e))
Wslay::Event.set_error(ctx, :callback_failure)
-1
end
def send_callback(ctx, data, len, flags, user_data)
peer = @peers[user_data.get_string(0)]
write(peer, data, len)
len
rescue => e
if Utils.errno == Errno::EAGAIN::Errno
Wslay::Event.set_error(ctx, :would_block)
else
Zsys.error(format_exception(e))
Wslay::Event.set_error(ctx, :callback_failure)
end
-1
end
def on_msg_recv_callback(ctx, arg, user_data)
peer = @peers[user_data.get_string(0)]
if Wslay.is_ctrl_frame(arg[:opcode])
case Wslay::OpCode[arg[:opcode]]
when :pong
Zsys.info("#{peer.peer_id_hex}: latency: #{peer.latency.stop}")
end
else
if arg[:msg_length] > 0
case Wslay::OpCode[arg[:opcode]]
when :text_frame
msg = arg[:msg].read_bytes(arg[:msg_length]).force_encoding(Encoding::UTF_8)
when :binary_frame
msg = arg[:msg].read_bytes(arg[:msg_length])
else
fail TypeError
end
if (response = @callback.call(peer.peer_id_hex, peer.latency, msg))
if response.is_a?(Symbol)
if (rc = Wslay::Event.queue_close(ctx, response, nil, 0)) == 0
peer.write
else
if rc == :nomem
raise NoMemoryError
else
raise WslayError, rc.to_s
end
end
else
peer.tell(response)
end
end
end
end
rescue => e
Zsys.error(format_exception(e))
Wslay::Event.queue_close(ctx, :internal_server_error, nil, 0)
peer.write
end
def format_exception(e)
exception = "#{e}"
exception << "\n" << e.backtrace.join("\n\t") if e.backtrace
end
def create_accept_key(client_key)
Base64.strict_encode64(Digest::SHA1.digest("#{client_key}#{Wslay::WS_GUID}"))
end
end
end
server = Zwebsock::Server.new('tcp://*:9001') do |peer_id, latency, msg|
puts "#{peer_id}: recv #{msg}"
msg
end
server.wait
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment