Last active
February 7, 2017 20:55
-
-
Save ochaton/335eb9f0a929e3ba40c4dd711831eeb5 to your computer and use it in GitHub Desktop.
Class Tube on C++11 uses ev++ and some benefits of C++11
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Compile with: g++ -std=c++11 -Werror -O2 tube.cpp -o tube -lev | |
* | |
*/ | |
#include <ev++.h> | |
#include <functional> | |
#include <iostream> | |
#include <stdexcept> | |
#include <memory> | |
#include <unistd.h> | |
#include <fcntl.h> | |
#include <errno.h> | |
enum { BUF_SIZE = 1024 }; | |
class Tube { | |
ev::io ww, rw; | |
int fdin, fdout; | |
void io_cb(ev::io &w, int revents); | |
void io_read_cb (ev::io & w); | |
void io_write_cb(ev::io & w); | |
std::function<bool(Tube *, size_t)> on_read_cb; | |
std::function<void(Tube *, int)> on_error_cb; | |
std::function<void(Tube *)> on_finish_cb; | |
public: | |
bool drain; | |
char buf[BUF_SIZE]; | |
char *pwstart, *prend, *pend; | |
std::shared_ptr<Tube> self; | |
void destroy(); | |
Tube (int in, int out, | |
std::function<bool(Tube *, size_t)>on_read, | |
std::function<void(Tube *, int)>on_error, | |
std::function<void(Tube *)>on_finish); | |
void capture(std::shared_ptr<Tube>_self); | |
~Tube(); | |
}; | |
/*************************************************************/ | |
/* Implementation goes here: */ | |
void Tube::io_cb(ev::io &w, int revents) { | |
if (revents & EV_ERROR) { | |
std::cerr << "ev_cb. got invalid event" << strerror(errno) << std::endl; | |
this->destroy(); | |
on_error_cb(this, errno); | |
return; | |
} | |
if (revents & EV_READ) { | |
io_read_cb(w); | |
return; | |
} | |
if (revents & EV_WRITE) { | |
io_write_cb(w); | |
return; | |
} | |
} | |
void Tube::io_read_cb (ev::io & w) { | |
size_t bytes = read(w.fd, prend, pend - prend); | |
if (bytes == -1) { | |
if (errno == EAGAIN || errno == EINTR || errno == EWOULDBLOCK) { | |
// That's ok. EV-loop will call us again | |
return; | |
} else { | |
// Stop both watchers | |
this->destroy(); | |
on_error_cb(this, errno); | |
return; | |
} | |
} | |
if (on_read_cb(this, bytes)) { | |
this->destroy(); | |
return; | |
} | |
prend += bytes; | |
if (pwstart < prend) { | |
ww.start(fdout, ev::WRITE); | |
} | |
if (bytes == 0) { // EOF: | |
rw.stop(); | |
std::cerr << "DRAIN!\n"; | |
drain = true; | |
return; | |
} | |
if (prend == pend) { | |
rw.stop(); | |
} | |
} | |
void Tube::io_write_cb(ev::io & w) { | |
size_t bytes = write(w.fd, pwstart, prend - pwstart); | |
if (bytes == -1) { | |
if (errno == EAGAIN || errno == EINTR || errno == EWOULDBLOCK) { | |
// That's ok. EV-loop will call us again | |
return; | |
} else { | |
// Stop both watchers | |
this->destroy(); | |
on_error_cb(this, errno); | |
return; | |
} | |
} | |
pwstart += bytes; | |
if (pwstart == prend) { // All data is writed | |
ww.stop(); | |
if (drain) { | |
self->destroy(); | |
return on_finish_cb(this); | |
} | |
if (prend == pend) { // End of buffer achived | |
pwstart = prend = buf; | |
} | |
rw.start(fdin, ev::READ); | |
} else { | |
// Continue writing | |
return; | |
} | |
} | |
void Tube::destroy() { | |
ww.stop(); | |
rw.stop(); | |
self = nullptr; | |
} | |
void Tube::capture(std::shared_ptr<Tube>pself) { | |
self = pself; | |
} | |
Tube::Tube (int in, int out, | |
std::function<bool(Tube *, size_t)>on_read, | |
std::function<void(Tube *, int)>on_error, | |
std::function<void(Tube *)>on_finish) | |
: fdin(in), fdout(out), on_read_cb(on_read), on_error_cb(on_error), on_finish_cb(on_finish) | |
{ | |
if (fcntl(fdin, F_GETFD) == -1 || errno == EBADF) { | |
throw std::invalid_argument ( "Input is not openned fd" ); | |
} | |
if (fcntl(fdout, F_GETFD) == -1 || errno == EBADF) { | |
throw std::invalid_argument ( "Output is non-openned fd" ); | |
} | |
fcntl(fdin, F_SETFL, fcntl(fdin, F_GETFL, 0) | O_NONBLOCK); | |
fcntl(fdout, F_SETFL, fcntl(fdout, F_GETFL, 0) | O_NONBLOCK); | |
this->pwstart = this->prend = buf; | |
this->pend = buf + BUF_SIZE; | |
drain = false; | |
ww.set<Tube, &Tube::io_cb>(this); | |
rw.set<Tube, &Tube::io_cb>(this); | |
rw.start(fdin, ev::READ); | |
} | |
Tube::~Tube() { | |
ww.stop(); | |
rw.stop(); | |
std::cerr << "DESTROY!\n"; | |
} | |
int new_tube(int in, int out) { | |
try { | |
std::shared_ptr<Tube> p = std::make_shared<Tube>( | |
in, out, | |
[](Tube * self, size_t bytes) -> bool { // on_read | |
std::cerr << "Readed: " << bytes << std::endl; | |
// return true; --> stop | |
// noreturn --> continue | |
}, | |
[](Tube * self, int msg) { // on_error | |
std::cerr << "Error on pipe: " << strerror(msg) << std::endl; | |
}, | |
[](Tube * self) { // on_final | |
std::cerr << "Final!\n"; | |
}); | |
p->capture(p); | |
} catch ( std::exception & e ) { | |
std::cerr << e.what() << std::endl; | |
} | |
} | |
int main(int argc, char const *argv[]) | |
{ | |
int in = open("tube.cpp", O_RDONLY); | |
int out = open("tube.hpp", O_WRONLY | O_CREAT | O_TRUNC , 0666 ); | |
new_tube(in, out); | |
ev_run(EV_DEFAULT, 0); | |
return 0; | |
} | |
/* | |
* USAGE: You should create Tube via shred_ptr to avoid memory leaks. | |
on_read callback: | |
[](Tube * this, size_t bytes) -> bool { | |
return true; // this will destroy tube | |
} | |
on_error and on_finish callbacks destroys tube before call. | |
* | |
*/ |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment