Skip to content

Instantly share code, notes, and snippets.

@ochaton
Last active February 7, 2017 20:55
Show Gist options
  • Save ochaton/335eb9f0a929e3ba40c4dd711831eeb5 to your computer and use it in GitHub Desktop.
Save ochaton/335eb9f0a929e3ba40c4dd711831eeb5 to your computer and use it in GitHub Desktop.
Class Tube on C++11 uses ev++ and some benefits of C++11
/*
* Compile with: g++ -std=c++11 -Werror -O2 tube.cpp -o tube -lev
*
*/
#include <ev++.h>
#include <functional>
#include <iostream>
#include <stdexcept>
#include <memory>
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>
enum { BUF_SIZE = 1024 };
class Tube {
ev::io ww, rw;
int fdin, fdout;
void io_cb(ev::io &w, int revents);
void io_read_cb (ev::io & w);
void io_write_cb(ev::io & w);
std::function<bool(Tube *, size_t)> on_read_cb;
std::function<void(Tube *, int)> on_error_cb;
std::function<void(Tube *)> on_finish_cb;
public:
bool drain;
char buf[BUF_SIZE];
char *pwstart, *prend, *pend;
std::shared_ptr<Tube> self;
void destroy();
Tube (int in, int out,
std::function<bool(Tube *, size_t)>on_read,
std::function<void(Tube *, int)>on_error,
std::function<void(Tube *)>on_finish);
void capture(std::shared_ptr<Tube>_self);
~Tube();
};
/*************************************************************/
/* Implementation goes here: */
void Tube::io_cb(ev::io &w, int revents) {
if (revents & EV_ERROR) {
std::cerr << "ev_cb. got invalid event" << strerror(errno) << std::endl;
this->destroy();
on_error_cb(this, errno);
return;
}
if (revents & EV_READ) {
io_read_cb(w);
return;
}
if (revents & EV_WRITE) {
io_write_cb(w);
return;
}
}
void Tube::io_read_cb (ev::io & w) {
size_t bytes = read(w.fd, prend, pend - prend);
if (bytes == -1) {
if (errno == EAGAIN || errno == EINTR || errno == EWOULDBLOCK) {
// That's ok. EV-loop will call us again
return;
} else {
// Stop both watchers
this->destroy();
on_error_cb(this, errno);
return;
}
}
if (on_read_cb(this, bytes)) {
this->destroy();
return;
}
prend += bytes;
if (pwstart < prend) {
ww.start(fdout, ev::WRITE);
}
if (bytes == 0) { // EOF:
rw.stop();
std::cerr << "DRAIN!\n";
drain = true;
return;
}
if (prend == pend) {
rw.stop();
}
}
void Tube::io_write_cb(ev::io & w) {
size_t bytes = write(w.fd, pwstart, prend - pwstart);
if (bytes == -1) {
if (errno == EAGAIN || errno == EINTR || errno == EWOULDBLOCK) {
// That's ok. EV-loop will call us again
return;
} else {
// Stop both watchers
this->destroy();
on_error_cb(this, errno);
return;
}
}
pwstart += bytes;
if (pwstart == prend) { // All data is writed
ww.stop();
if (drain) {
self->destroy();
return on_finish_cb(this);
}
if (prend == pend) { // End of buffer achived
pwstart = prend = buf;
}
rw.start(fdin, ev::READ);
} else {
// Continue writing
return;
}
}
void Tube::destroy() {
ww.stop();
rw.stop();
self = nullptr;
}
void Tube::capture(std::shared_ptr<Tube>pself) {
self = pself;
}
Tube::Tube (int in, int out,
std::function<bool(Tube *, size_t)>on_read,
std::function<void(Tube *, int)>on_error,
std::function<void(Tube *)>on_finish)
: fdin(in), fdout(out), on_read_cb(on_read), on_error_cb(on_error), on_finish_cb(on_finish)
{
if (fcntl(fdin, F_GETFD) == -1 || errno == EBADF) {
throw std::invalid_argument ( "Input is not openned fd" );
}
if (fcntl(fdout, F_GETFD) == -1 || errno == EBADF) {
throw std::invalid_argument ( "Output is non-openned fd" );
}
fcntl(fdin, F_SETFL, fcntl(fdin, F_GETFL, 0) | O_NONBLOCK);
fcntl(fdout, F_SETFL, fcntl(fdout, F_GETFL, 0) | O_NONBLOCK);
this->pwstart = this->prend = buf;
this->pend = buf + BUF_SIZE;
drain = false;
ww.set<Tube, &Tube::io_cb>(this);
rw.set<Tube, &Tube::io_cb>(this);
rw.start(fdin, ev::READ);
}
Tube::~Tube() {
ww.stop();
rw.stop();
std::cerr << "DESTROY!\n";
}
int new_tube(int in, int out) {
try {
std::shared_ptr<Tube> p = std::make_shared<Tube>(
in, out,
[](Tube * self, size_t bytes) -> bool { // on_read
std::cerr << "Readed: " << bytes << std::endl;
// return true; --> stop
// noreturn --> continue
},
[](Tube * self, int msg) { // on_error
std::cerr << "Error on pipe: " << strerror(msg) << std::endl;
},
[](Tube * self) { // on_final
std::cerr << "Final!\n";
});
p->capture(p);
} catch ( std::exception & e ) {
std::cerr << e.what() << std::endl;
}
}
int main(int argc, char const *argv[])
{
int in = open("tube.cpp", O_RDONLY);
int out = open("tube.hpp", O_WRONLY | O_CREAT | O_TRUNC , 0666 );
new_tube(in, out);
ev_run(EV_DEFAULT, 0);
return 0;
}
/*
* USAGE: You should create Tube via shred_ptr to avoid memory leaks.
on_read callback:
[](Tube * this, size_t bytes) -> bool {
return true; // this will destroy tube
}
on_error and on_finish callbacks destroys tube before call.
*
*/
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment