Skip to content

Instantly share code, notes, and snippets.

@zhangyuchi
Last active August 29, 2015 14:11
Show Gist options
  • Save zhangyuchi/7690953ad6bb5dc513df to your computer and use it in GitHub Desktop.
Save zhangyuchi/7690953ad6bb5dc513df to your computer and use it in GitHub Desktop.
friend template class, shared_from_this, and libev
#ifndef EVSERVER_HPP
#define EVSERVER_HPP
#include <unistd.h>
#include <fcntl.h>
#include <string.h>
#include <stdlib.h>
#include <ev++.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <resolv.h>
#include <errno.h>
#include <string>
#include <list>
#include <memory>
#include <map>
#include <functional>
#include "slice.h"
#include "mpmgr.h"
using namespace std::placeholders;
namespace qhmedia{
//template<class Handler, class Parser, class Factory> class EvTcpServer;
template<class Handler>
class EvTcpInstance : public std::enable_shared_from_this<EvTcpInstance<Handler>> //shared_ptr
{
template <class H, class P, class F> friend class EvTcpServer; //friend template
private:
ev::io io_;
static int total_clients;
int sfd;
std::shared_ptr<Handler> handler_;
public:
EvTcpInstance(int s, const std::shared_ptr<Handler>& handler, EV_P)
: io_(EV_A), sfd(s), handler_(handler)
{
fcntl(s, F_SETFL, fcntl(s, F_GETFL, 0) | O_NONBLOCK);
printf("Got connection\n");
total_clients++;
io_.set<EvTcpInstance, &EvTcpInstance::callback>(this->shared_from_this());
//reg shared, note:compile will error, libev don't supoort shared_ptr
io_.start(sfd, ev::READ);
}
~EvTcpInstance() {
// Stop and free watcher if client socket is closing
close(sfd);
printf("%d client(s) connected.\n", --total_clients);
}
// Generic callback
void callback(ev::io &watcher, int revents) {
if (EV_ERROR & revents) {
handler_->handle_error("got invalid event");
return;
}
if (revents & EV_READ)
read_cb(watcher);
if (revents & EV_WRITE)
write_cb(watcher);
}
// Socket is writable
void write_cb(ev::io &watcher) {
std::shared_ptr<Slice> slice = handler_->handle_writeable();
if (slice){
ssize_t written = write(watcher.fd, slice->data(), slice->size());
if (written < 0) {
if (errno == EAGAIN || errno == EINTR || errno == EINPROGRESS )
{
return;
}else{
handler_->handle_error("read error");
io_.stop();
return;
}
}
handler_->handle_writedone(written);
}else{
io_.set(ev::READ);
}
}
// Receive message from client socket
void read_cb(ev::io &watcher) {
std::shared_ptr<Slice> slice = handler_->handle_readable();
if (slice){
ssize_t nread = recv(watcher.fd, slice->data(), slice->size(), 0);
if (nread < 0) {
if (errno == EAGAIN || errno == EINTR || errno == EINPROGRESS )
{
return;
}else{
handler_->handle_error("read error");
io_.stop();
}
}else if (nread == 0) {
// Gack - we're deleting ourself inside of ourself!
handler_->handle_close();
io_.stop();
} else {
// Send message bach to the client
handler_->handle_readdone(nread);
}
}
}
};
template<class Handler, class Parser, class Factory>
class EvTcpServer {
private:
struct ev_loop* loop_;
ev::io io_;
int s;
Handler handler_;
Factory factory_;
Parser parser_;
MPMgr* const mpmgr_;
std::map<int, std::weak_ptr<EvTcpInstance<Handler>>> tcp_instances_;
public:
EvTcpServer(const EvTcpServer&) = delete;
EvTcpServer& operator=(const EvTcpServer&) = delete;
EvTcpServer(const EvTcpServer&&) = delete;
EvTcpServer& operator=(const EvTcpServer&&) = delete;
void remove(EvTcpInstance<Handler>* tcp_instance) {
handler_.handle_remove(tcp_instance->sfd);
tcp_instances_.erase(tcp_instance->sfd);
delete tcp_instance;
}
void io_accept(ev::io &watcher, int revents) {
if (EV_ERROR & revents) {
handler_.handle_error("got invalid event");
return;
}
struct sockaddr_in client_addr;
socklen_t client_len = sizeof(client_addr);
int client_sd = accept(watcher.fd, (struct sockaddr *)&client_addr, &client_len);
if (client_sd < 0) {
handler_.handle_error("accept error");
return;
}
fcntl(s, F_SETFL, fcntl(client_sd, F_GETFL, 0) | O_NONBLOCK);
std::shared_ptr<Handler> conn_handler = std::make_shared<Handler>(io_, &parser_, mpmgr_);
std::shared_ptr<EvTcpInstance<Handler>> tcp_instance(new EvTcpInstance<Handler>(client_sd, conn_handler, loop_),
std::bind(&EvTcpServer<Handler,Parser,Factory>::remove, this, _1));
tcp_instances_.insert(std::make_pair(client_sd, tcp_instance));
handler_.handle_accept(conn_handler);
}
EvTcpServer(short port, MPMgr* mgr, EV_P)
:loop_(EV_A), io_(EV_A), handler_(io_), factory_(), parser_(&factory_), mpmgr_(mgr)
{
printf("Listening on port %d\n", port);
struct sockaddr_in addr;
s = socket(PF_INET, SOCK_STREAM, 0);
int one=1;
setsockopt(s, SOL_SOCKET, SO_REUSEADDR, (void*) &one,
(socklen_t)sizeof(one));
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr.s_addr = INADDR_ANY;
if (bind(s, (struct sockaddr *)&addr, sizeof(addr)) != 0) {
handler_.handle_error("bind error");
}
fcntl(s, F_SETFL, fcntl(s, F_GETFL, 0) | O_NONBLOCK);
listen(s, 5);
io_.set<EvTcpServer, &EvTcpServer::io_accept>(this);
io_.start(s, ev::READ);
}
virtual ~EvTcpServer() {
shutdown(s, SHUT_RDWR);
close(s);
}
};
template<class Handler>
int EvTcpInstance<Handler>::total_clients = 0;
}
#endif
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment