Skip to content

Instantly share code, notes, and snippets.

@pyos
Last active July 3, 2018 12:51
Show Gist options
  • Save pyos/052ef69e20d22521dc5275fae237d00d to your computer and use it in GitHub Desktop.
Save pyos/052ef69e20d22521dc5275fae237d00d to your computer and use it in GitHub Desktop.
libcno and coroutine-based echo server
#pragma once
#include "cone.h"
#include <stddef.h>
#include <stdint.h>
#include <stdlib.h>
#include <string.h>
#include <memory>
#include <vector>
#include <unordered_map>
#include <cno/core.h>
static constexpr const int mun_errno_cno = mun_errno_custom + 19000;
static constexpr const int CNO_ERRNO_MUN = mun_errno_cno;
namespace cno {
struct buffer {
buffer(size_t limit = 8192) : limit(limit) {}
mun::status read(std::string &out) {
for (std::string *p; !read(p);)
return false;
out = std::move(data);
return consume(data.size());
}
mun::status read(std::string *&out) {
while (data.empty() && !eof)
if (!more.wait().raise(HERE))
return false;
out = &data;
return true;
}
mun::status consume(size_t n) {
data.erase(0, n);
return less.wake().raise(HERE);
}
mun::status write(mun::stringview v) {
for (size_t offset = 0, chunk = 0; offset < v.size; offset += chunk) {
data.append(v.base + offset, chunk = std::min(v.size - offset, limit - data.size()));
if (!more.wake().raise(HERE) || (chunk < v.size - offset && !less.wait().raise(HERE)))
return false;
}
return true;
}
mun::status close() {
eof = true;
return more.wake().raise(HERE);
}
private:
size_t limit;
bool eof = false;
std::string data;
cone::event more;
cone::event less;
};
struct header : cno_header_t {
header(mun::stringview k, mun::stringview v) : cno_header_t{{k.base, k.size}, {v.base, v.size}, 0} {}
};
struct message : cno::buffer {
int code;
std::string method;
std::string path;
std::unordered_multimap<std::string, std::string> headers;
message(const struct cno_message_t &m) {
code = m.code;
method = std::string{m.method.data, m.method.size};
path = std::string{m.path.data, m.path.size};
for (auto *h = m.headers; h != &m.headers[m.headers_len]; h++)
headers.emplace(std::string{h->name.data, h->name.size}, std::string{h->value.data, h->value.size});
}
};
static mun::status mun2cno(mun::status c = false) {
return c ? CNO_OK : CNO_ERROR(MUN, "see mun::error::last() for details");
}
static mun::status cno2mun(mun::status c = false) {
auto &e = *cno_error();
if (!c && e.code != CNO_ERRNO_MUN) {
c.raise({e.traceback->file, "?", (unsigned)e.traceback->line}, mun_errno_cno + e.code, "cno", "%d: %s", e.code, e.text);
for (auto *t = &e.traceback[1]; t != e.traceback_end; t++)
c.raise({t->file, "?", (unsigned)t->line});
}
return c;
}
struct stream {
using handler = std::function<mun::status(message &, stream &)>;
stream(struct cno_connection_t &conn, uint32_t id) : conn(conn), id(id) {}
mun::status head(int code, const std::vector<cno::header> &hs) {
while (messages.front().c.wait().raise(HERE)) {}
if (mun::error::last().code != mun_errno_deadlock)
return false;
struct cno_message_t cm = {code, {}, {}, (struct cno_header_t *)&hs[0], hs.size()};
return cno2mun(cno_write_message(&conn, id, &cm, 0)).raise(HERE);
}
mun::status push(mun::stringview method, mun::stringview path, const std::vector<header> &hs) {
struct cno_message_t cm = {0, {(char *) method.base, method.size}, {(char *) path.base, path.size}, (struct cno_header_t *)&hs[0], hs.size()};
return cno2mun(cno_write_push(&conn, id, &cm)).raise(HERE);
}
mun::status write(mun::stringview buf) {
return wbuffer.append(buf.base, buf.size).size() == buf.size ? on_flow() : mun::status{true};
}
mun::status reset(enum CNO_RST_STREAM_CODE code = CNO_RST_CANCEL) {
return cno2mun(cno_write_reset(&conn, id, code)).raise(HERE);
}
mun::status on_message(const struct cno_message_t &msg, cno::stream::handler &h) {
return messages.emplace_back(this, &h, msg), true;
}
mun::status on_data(mun::stringview data) {
return messages.empty() || messages.back().m.write(data);
}
mun::status on_end() {
return messages.empty() || messages.back().m.close();
}
mun::status on_flow() {
for (ssize_t written; wbuffer.size(); wbuffer.erase(0, written)) {
if (!cno2mun((written = cno_write_data(&conn, id, wbuffer.data(), wbuffer.size(), 0)) >= 0).raise(HERE))
return false;
if (written == 0)
return true;
}
return flushed.wake().raise(HERE);
}
private:
struct task {
cno::stream *s;
cno::stream::handler *h;
cno::message m;
struct cone c;
task(cno::stream *s, cno::stream::handler *h, const struct cno_message_t &m) : s(s), h(h), m(m), c(*this) {}
mun::status operator()() {
if (!(*h)(m, *s).raise(HERE) || (!s->wbuffer.empty() && !s->flushed.wait().raise(HERE)))
return mun::error::last().code == mun_errno_cno + CNO_ERRNO_DISCONNECT;
c.detach();
cno::stream &sr = *s;
s->messages.erase(s->messages.begin()); // destroys `this`
return cno2mun(cno_write_data(&sr.conn, sr.id, "", 0, 1)).raise(HERE); // may destroy `sr`
}
};
struct cno_connection_t &conn;
const uint32_t id;
cone::event flushed;
std::string wbuffer;
std::vector<task> messages;
};
struct server : cno_connection_t {
static mun::status run(mun::fd fd, cno::stream::handler handler) {
cno::server server{handler};
struct cone writer{[&server, &fd]() -> mun::status {
ssize_t w;
for (std::string *s = nullptr; server.buffer.read(s).raise(HERE); server.buffer.consume(w))
if (!mun::status{(w = ::write(fd, &(*s)[0], s->size())) >= 0}.os(HERE))
return errno == EPIPE;
return false;
}};
if (!cno2mun(cno_connection_made(&server, CNO_HTTP1)).raise(HERE))
return false;
char buf[8192];
for (ssize_t rd; mun::status{(rd = ::read(fd, buf, sizeof(buf))) >= 0}.os(HERE);) {
if (rd == 0)
return cno2mun(cno_connection_lost(&server)).raise(HERE);
if (!cno2mun(cno_connection_data_received(&server, buf, (size_t)rd)).raise(HERE))
return false;
}
return false;
}
private:
cno::buffer buffer;
cno::stream::handler &handler;
std::unordered_map<uint32_t, std::unique_ptr<stream>> streams;
server(cno::stream::handler &h) noexcept : handler(h) {
cno_connection_init(this, CNO_SERVER);
cb_data = this;
on_write = (decltype(on_write)) &_on_write;
on_stream_start = (decltype(on_stream_start)) &_on_stream;
on_stream_end = (decltype(on_stream_end)) &_on_stream_end;
on_message_start = (decltype(on_message_start)) &_on_message;
on_message_data = (decltype(on_message_data)) &_on_message_data;
on_message_end = (decltype(on_message_end)) &_on_message_end;
on_flow_increase = (decltype(on_flow_increase)) &_on_flow;
}
~server() {
cno_connection_reset(this);
}
static int _on_write(server *p, const char *data, size_t size) {
return mun2cno(p->buffer.write({data, size}).raise(HERE));
}
static int _on_stream(server *p, uint32_t id) {
return p->streams[id] = std::make_unique<cno::stream>(*p, id), CNO_OK;
}
static int _on_stream_end(server *p, uint32_t id) {
return p->streams.erase(id), CNO_OK;
}
static int _on_message(server *p, uint32_t id, const struct cno_message_t *msg) {
return mun2cno(p->streams[id]->on_message(*msg, p->handler).raise(HERE));
}
static int _on_message_data(server *p, uint32_t id, const char *data, size_t size) {
return mun2cno(p->streams[id]->on_data({data, size}).raise(HERE));
}
static int _on_message_end(server *p, uint32_t id) {
return mun2cno(p->streams[id]->on_end().raise(HERE));
}
static int _on_flow(server *p, uint32_t id) {
if (id)
return mun2cno(p->streams[id]->on_flow().raise(HERE));
for (auto &it : p->streams)
if (!it.second->on_flow().raise(HERE))
return mun2cno();
return CNO_OK;
}
};
};
#pragma once
#include <unistd.h>
#include <sys/socket.h>
#include <atomic>
#include <memory>
#include <string>
#include <thread>
extern "C" {
#define _Atomic(T) std::atomic<T>
#include <cone/cone.h>
#undef _Atomic
#undef cone
}
namespace mun {
#define HERE MUN_CURRENT_FRAME
struct error : mun_error {
using frame = struct mun_stackframe;
static error &last() noexcept {
return *reinterpret_cast<error*>(mun_last_error());
}
void show(const char *prefix = "runtime") const noexcept {
mun_error_show(prefix, this);
}
};
struct status {
constexpr status(int i) noexcept : ok(i >= 0) {}
constexpr status(bool b) noexcept : ok(b) {}
constexpr operator int() const noexcept { return ok ? 0 : -1; }
constexpr operator bool() const noexcept { return ok; }
template <typename... Args>
mun::status raise(mun::error::frame f, int code, const char *name, Args&&... args) const noexcept {
return ok ? 0 : mun_error_at(code, name, f, args...);
}
mun::status os(mun::error::frame f) const noexcept {
return ok ? 0 : mun_error_at(-errno, "errno", f, "OS error");
}
mun::status raise(mun::error::frame f) const noexcept {
return ok ? 0 : mun_error_up(f);
}
private:
const bool ok;
};
struct stringview {
const char *base; size_t size;
stringview(const char *b = "") noexcept : base(b), size(strlen(b)) {}
stringview(const char *b, size_t s) noexcept : base(b), size(s) {}
stringview(const std::string &s) noexcept : base(s.data()), size(s.size()) {}
};
struct fd {
fd(int n = -1) noexcept : n(n) {}
fd(fd &&x) noexcept : n(x.n) { x.n = -1; }
fd(const fd &) = delete;
fd &operator=(fd &&x) noexcept { return std::swap(n, x.n), *this; }
fd &operator=(const fd &) = delete;
~fd() { if (n >= 0) ::close(n); }
operator int() const noexcept { return n; }
operator bool() const noexcept { return n >= 0; }
mun::status unblock() noexcept {
return cone_unblock(n);
}
private:
int n;
};
}
struct cone {
cone() = default;
cone(cone&&) = default;
cone &operator=(cone&&) = default;
template <typename F>
explicit cone(F &f, size_t stack = CONE_DEFAULT_STACK) {
ptr.reset(cone_spawn(stack, cone_bind((&body<F, F*>), &f)));
}
template <typename F>
explicit cone(F &&f, size_t stack = CONE_DEFAULT_STACK) {
auto g = std::make_unique<F>(std::forward<F>(f));
if (ptr.reset(cone_spawn(stack, cone_bind((&body<std::remove_reference_t<F>, decltype(g)>), g.get()))), ptr)
g.release();
}
~cone() {
if (ptr && !(cancel() && wait()))
mun::error::last().show("cone destructor");
}
operator bool() const noexcept {
return !!ptr;
}
void detach() noexcept {
ptr = nullptr;
}
mun::status wait(bool rethrow = false) const noexcept {
return cone_cowait(ptr.get(), rethrow ? 0 : CONE_NORETHROW);
}
mun::status cancel() const noexcept {
return cone_cancel(ptr.get());
}
private:
std::unique_ptr<struct cone, int(*)(struct cone *) noexcept> ptr{nullptr, cone_drop};
template <typename F, typename Fw>
static int body(F *f) noexcept {
return (*Fw{f})();
}
public:
struct event : private cone_event {
event() noexcept : cone_event() {}
event(event &&e) { std::swap(*this, e); }
event(const event &) = delete;
event &operator=(event &&e) { return std::swap(*this, e), *this; }
event &operator=(const event &) = delete;
~event() {
mun_vec_fini(this);
}
mun::status wait(const std::atomic<unsigned> &a, unsigned expect) noexcept {
return cone_wait(this, &a, expect);
}
mun::status wait() noexcept {
return wait(std::atomic<unsigned>{0}, 0);
}
mun::status wake(size_t n = std::numeric_limits<size_t>::max()) noexcept {
return cone_wake(this, n);
}
};
struct thread {
thread() noexcept = default;
thread(thread &&) = default;
thread &operator=(thread &&) = default;
template <typename F>
thread(F &&f, size_t stack = CONE_DEFAULT_STACK) {
int p[2];
::socketpair(SOCK_STREAM, AF_UNIX, 0, p);
signal = mun::fd{p[0]};
signal.unblock();
handle = std::thread(&body<std::remove_reference_t<F>>, std::pair<std::remove_reference_t<F>, mun::fd>{std::forward<F>(f), p[1]}, stack);
}
~thread() {
if (signal && !(cancel() && wait()))
mun::error::last().show("thread destructor");
}
mun::status wait() {
for (char buf; !mun::status{::read(signal, &buf, 1) >= 0}.os(HERE);)
return false;
return handle.join(), true;
}
mun::status cancel() noexcept {
return mun::status{::write(signal, "", 1) == 1}.os(HERE);
}
private:
mun::fd signal;
std::thread handle;
template <typename F>
static void body(std::pair<F, mun::fd> f, size_t stack) noexcept {
if (cone_loop(stack, cone_bind(corobody<F>, &f)))
mun::error::last().show("worker thread");
}
template <typename F>
static int corobody(std::pair<F, mun::fd> *f) noexcept {
struct cone sighandler{[f, c = ::cone]() -> mun::status {
for (char buf; !mun::status{f->second.unblock() && ::read(f->second, &buf, 1) >= 0}.os(HERE);)
return false;
return cone_cancel(c);
}};
return cone::body<F, F*>(&f->first);
}
};
};
// git clone https://github.com/pyos/libcno
// git clone https://github.com/pyos/cone
// make -C libcno
// make -C cone CFLAGS=-O3
// g++ -O3 -Wall -Wextra -Ilibcno -I. -Llibcno/obj -std=c++14 server.cc cone/obj/{cone,cold,mun}.o -lcno -o server
#include <iostream>
#include "cno.h"
#include <signal.h>
#include <sys/socket.h>
#include <sys/select.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
static mun::status echo(mun::fd client) {
return cno::server::run(std::move(client), [](cno::message &msg, cno::stream &stream) -> mun::status {
for (std::string payload, part; msg.read(part).raise(HERE); payload += part) {
if (part.empty())
return stream.head(200, {{"server", "libcno/1.0"}, {"cache-control", "no-cache"}, {"content-length", std::to_string(payload.size())}})
&& (msg.method == "HEAD" || stream.write(payload));
if (payload.size() + part.size() >= 100000)
return stream.head(400, {{"content-length", "0"}});
}
return false;
});
}
namespace io {
static const int ONE = 1;
static mun::fd ipv6_server(unsigned char iface[16], uint32_t port, int backlog = 127) {
mun::fd r = ::socket(AF_INET6, SOCK_STREAM, 0);
if (!mun::status{r && r.unblock() && !setsockopt(r, SOL_SOCKET, SO_REUSEADDR, &ONE, sizeof(int))}.os(HERE))
return -1;
struct sockaddr_in6 addr = {};
addr.sin6_family = AF_INET6;
addr.sin6_port = htons(port);
memcpy(addr.sin6_addr.s6_addr, iface, 16);
if (!mun::status{!bind(r, reinterpret_cast<struct sockaddr *>(&addr), sizeof(addr)) && !listen(r, backlog)}.os(HERE))
return -1;
return r;
}
template <typename F>
static mun::status tcp_server(mun::fd &fd, F &&handler) {
while (true) {
mun::fd child = accept(fd, NULL, NULL);
if (!mun::status{child && child.unblock() && !setsockopt(child, IPPROTO_TCP, TCP_NODELAY, &ONE, sizeof(int))}.os(HERE))
return -1;
(struct cone){std::bind(handler, std::bind(std::move<mun::fd&>, std::move(child)))}.detach();
}
}
}
static auto sigpipe = []() -> std::pair<mun::fd, mun::fd> {
for (int p[2]; !::pipe(p);)
return {p[0], p[1]};
throw std::runtime_error("could not create a signal pipe");
}();
static void sighandle(int num) {
::write(sigpipe.second, &num, sizeof(int));
}
int main() {
signal(SIGINT, &sighandle);
signal(SIGPIPE, SIG_IGN);
unsigned char all[16] = {};
if (mun::fd server = io::ipv6_server(all, 8000)) {
std::vector<cone::thread> workers;
for (int i = 0; i < 4; i++)
workers.emplace_back([&server]() { return io::tcp_server(server, echo); });
sigpipe.first.unblock();
for (int num; ::read(sigpipe.first, &num, sizeof(int)) > 0;)
if (num == SIGINT)
break;
}
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment