Skip to content

Instantly share code, notes, and snippets.

@stormouse
Created Jul 29, 2021
Embed
What would you like to do?
proxy-a-bit-messy-imo
struct tunnel
{
int epoll_fd;
fixed_size_buffer in_buf; // buffer data read from IN and will pipe to OUT
fixed_size_buffer out_buf; // buffer data read from OUT and pipe to IN
epoll_listen_state in_epoll_state;
epoll_listen_state out_epoll_state;
stream_socket in_sock;
stream_socket out_sock;
tunnel_handle* in_handle;
tunnel_handle* out_handle;
tunnel_state state;
tunnel_spec& spec;
uint32_t last_epoch;
const int default_buffer_size = 4096;
std::atomic_bool marked_for_destroy;
tunnel(uint32_t epoch, int epoll_fd, stream_socket in_sock, tunnel_spec& spec);
void handle_upstream_connect(bool async_finished);
void handle_upstream_read();
void handle_downstream_write();
void handle_downstream_read();
void handle_upstream_write();
void listen_epoll_rw(tunnel_handle* handle, epoll_listen_state& ref_state);
void listen_epoll_read(tunnel_handle* handle, epoll_listen_state& ref_state);
void listen_epoll_write(tunnel_handle* handle, epoll_listen_state& ref_state);
void destroy();
int get_name();
virtual ~tunnel();
private:
epoll_event ev;
};
void tunnel::handle_upstream_read()
{
if (out_buf.is_full())
{
handle_downstream_write();
return;
}
int bytes_read = out_buf.read(out_sock.fd);
if (bytes_read == 0)
{
_logger::instance->info("Tunnel") << "upstream closed" << std::endl;
destroy();
return;
}
else if (bytes_read == -1)
{
int err = errno;
if (err == EAGAIN || err == EWOULDBLOCK)
{
// normal case
}
else
{
_logger::instance->warn("Tunnel") << "upstream reading failed: " << strerror(err) << std::endl;
this->destroy();
return;
}
}
// we have something to push
if (bytes_read > 0 || out_buf.get_data_size() > 0)
{
handle_downstream_write();
}
}
void tunnel::handle_downstream_write()
{
int bytes_written = 0;
int newly_written = 0;
do
{
if (out_buf.is_empty()) break;
newly_written = out_buf.write(in_sock.fd);
if (newly_written > 0)
{
bytes_written += newly_written;
}
else
{
int err = errno;
if (err == EAGAIN || err == EWOULDBLOCK)
{
// the other end won't accept more data for now
// we quit, waiting for the EPOLLOUT will resume writing
listen_epoll_rw(in_handle, in_epoll_state);
break;
}
else
{
destroy();
return;
}
}
} while (out_buf.get_data_size() > 0 && newly_written > 0);
if ((uint8_t)in_epoll_state & (uint8_t)epoll_listen_state::IO_WRITE)
{
// downstream writing blocked
}
else
{
handle_upstream_read();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment