Skip to content

Instantly share code, notes, and snippets.

@roxlu
Last active December 13, 2015 19:38
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save roxlu/e1e968631d34f55abf71 to your computer and use it in GitHub Desktop.
Save roxlu/e1e968631d34f55abf71 to your computer and use it in GitHub Desktop.
VPX stream WIP
#include <webm/VPXStream.h>
// BUFFER
// +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
VPXBuffer::VPXBuffer()
:read_dx(0)
{
}
VPXBuffer::~VPXBuffer() {
reset();
}
void VPXBuffer::addBytes(char* b, size_t nbytes) {
std::copy(b, b+nbytes, std::back_inserter(data));
}
void VPXBuffer::clear() {
data.clear();
}
void VPXBuffer::reset() {
clear();
read_dx = 0;
}
char* VPXBuffer::ptr() {
return (char*)&data[0];
}
size_t VPXBuffer::size() {
return data.size();
}
void VPXBuffer::flush(int n) {
data.erase(data.begin(), data.begin() + n);
read_dx -= n;
if(read_dx < 0) {
read_dx = 0;
}
}
void VPXBuffer::print() {
unsigned char* p = (unsigned char*) ptr();
for(int i = 0; i < size(); ++i) {
printf("%02X ", *(p+i));
if(i > 0 && i % 40 == 0) {
printf("\n");
}
}
printf("\n");
}
void VPXBuffer::write(char* b, size_t nbytes) {
std::copy(b, b+nbytes, std::back_inserter(data));
}
void VPXBuffer::wu8(uint8_t b) {
data.push_back(b);
}
void VPXBuffer::wu16(uint16_t b) {
wu8(b >> 8);
wu8(b);
}
void VPXBuffer::wu32(uint32_t b) {
wu8(b >> 24);
wu8(b >> 16);
wu8(b >> 8);
wu8(b);
};
uint8_t VPXBuffer::ru8() {
return data[read_dx++];
}
uint16_t VPXBuffer::ru16() {
uint16_t tmp = 0;
memcpy((char*)&tmp, (char*)&data[read_dx], sizeof(tmp));
uint16_t result = (tmp << 8) | (tmp >> 8);
read_dx += 2;
return result;
}
uint32_t VPXBuffer::ru32() {
uint32_t tmp = 0;
memcpy((char*)&tmp, (char*)&data[read_dx], sizeof(tmp));
uint32_t result = ((tmp >> 24) & 0x000000FF) | ((tmp << 24) * 0xFF000000) | ((tmp >> 8) & 0x0000FF00) | ((tmp << 8) & 0x00FF0000);
read_dx += 4;
return result;
}
uint8_t VPXBuffer::pu8(int dx) {
size_t tmp_dx = read_dx;
read_dx += dx;
uint8_t r = ru8();
read_dx = tmp_dx;
return r;
}
uint16_t VPXBuffer::pu16(int dx) {
size_t tmp_dx = read_dx;
read_dx += dx;
uint16_t r = ru16();
read_dx = tmp_dx;
return r;
}
uint32_t VPXBuffer::pu32(int dx) {
size_t tmp_dx = read_dx;
read_dx += dx;
uint32_t r = ru32();
read_dx = tmp_dx;
return r;
}
// IN STREAM
// +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
VPXInStream::VPXInStream(std::string host, std::string port)
:loop(NULL)
,host(host)
,port(port)
,proto_version(0)
{
RX_VERBOSE(("VPXInStream()"));
loop = uv_default_loop();
sock.data = this;
shutdown_req.data = this;
resolver_req.data = this;
connect_req.data = this;
timer_req.data = this;
}
VPXInStream::~VPXInStream() {
}
bool VPXInStream::connect() {
int r = uv_tcp_init(loop, &sock);
if(r) {
RX_ERROR(("uv_tcp_init failed"));
return false;
}
struct addrinfo hints;
hints.ai_family = PF_INET;
hints.ai_socktype = SOCK_STREAM;
hints.ai_protocol = IPPROTO_TCP;
hints.ai_flags = 0;
r = uv_getaddrinfo(loop, &resolver_req, vpx_client_on_resolved,
host.c_str(), port.c_str(), &hints);
if(r) {
RX_ERROR(("cannot uv_tcp_init(): %s", uv_strerror(uv_last_error(loop))));
return false;
}
return true;
}
void VPXInStream::update() {
uv_run(loop, UV_RUN_NOWAIT);
}
void VPXInStream::parseBuffer() {
do {
uint8_t cmd = buffer.pu8();
uint32_t nbytes = buffer.pu32(1);
if(buffer.size() < (nbytes + 5)) {
RX_VERBOSE(("buffer not big enough, we have: %ld but need: %d", buffer.size(), nbytes));
return;
}
buffer.flush(5);
switch(cmd) {
case VPX_CMD_SETTINGS: {
proto_version = buffer.ru8();
settings.in_w = buffer.ru16();
settings.in_h = buffer.ru16();
settings.fps = buffer.ru16();
buffer.flush(nbytes);
RX_VERBOSE(("version: %d, in_w: %d, in_h: %d, fps: %d, bytes left: %ld",
proto_version, settings.in_w, settings.in_h, settings.fps, buffer.size()));
break;
};
case VPX_CMD_FRAME: {
RX_VERBOSE(("GOT FRAME WITH SIZE: %d", nbytes));
buffer.flush(nbytes);
break;
}
default: {
RX_WARNING(("Unhandled command in VPXInstream: %d", cmd));
break;
}
}
} while(buffer.size() > 5);
}
void VPXInStream::reconnect() {
clear();
int r = uv_timer_init(loop, &timer_req);
if(r) {
RX_ERROR(("uv_timer_init() failed. cannot reconnect"));
return;
}
uv_timer_start(&timer_req, vpx_client_on_reconnect_timer, 1000, 0);
}
void VPXInStream::clear() {
buffer.clear();
}
// OUT STREAM
// +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
VPXOutStream::VPXOutStream(int port)
:port(port)
,loop(NULL)
,is_setup(false)
,num_frames(0)
{
RX_VERBOSE(("VPXOutStream()"));
sock.data = this;
}
VPXOutStream::~VPXOutStream() {
}
bool VPXOutStream::setup(VPXSettings cfg) {
settings = cfg;
if(!grabber.setup(settings, vpx_server_on_grabber_cb, this)) {
RX_ERROR(("cannot setup grabber"));
return false;
}
settings.cb_user = this;
settings.cb_write = vpx_server_on_vpx_write;
if(!encoder.setup(settings)) {
RX_ERROR(("cannot setup encoder"));
return false;
}
if(!encoder.initialize()) {
RX_ERROR(("cannot initailize encoder"));
return false;
}
is_setup = true;
return true;
}
bool VPXOutStream::start() {
if(!is_setup) {
RX_ERROR(("first call setup()"));
return false;
}
loop = uv_default_loop();
if(!loop) {
RX_ERROR(("Cannot create loop"));
return false;
}
int r = 0;
r = uv_tcp_init(loop, &sock);
VPX_UV_ERR(r, 0, "uv_tcp_init failed", false);
struct sockaddr_in baddr = uv_ip4_addr("0.0.0.0", port);
r = uv_tcp_bind(&sock, baddr);
VPX_UV_ERR(r, 0, "uv_tcp_bind failed", false);
r = uv_listen((uv_stream_t*)&sock, 128, vpx_server_on_new_connection);
VPX_UV_ERR(r, 0, "uv_listen failed", false);
return true;
}
void VPXOutStream::update() {
if(!is_setup) {
RX_ERROR(("not setup!"));
return;
}
uv_run(loop, UV_RUN_NOWAIT);
}
void VPXOutStream::grabFrame() {
grabber.grabFrame();
}
void VPXOutStream::removeConnection(VPXConnection* c) {
std::vector<VPXConnection*>::iterator it = std::find(connections.begin(), connections.end(), c);
if(it != connections.end()) {
connections.erase(it);
}
}
void VPXOutStream::writeHeader(VPXConnection* c) {
RX_VERBOSE(("send header to client: in_w: %d, in_h: %d, fps: %d ", settings.out_w, settings.out_h, settings.fps));
VPXBuffer buf;
buf.wu8(1); // version
buf.wu16(settings.out_w);
buf.wu16(settings.out_h);
buf.wu16(settings.fps);
c->writeCommand(VPX_CMD_SETTINGS, buf);
}
void VPXOutStream::writeToAllConnections(VPXCommands c, char* buf, size_t nbytes) {
VPXBuffer tmp;
tmp.write(buf, nbytes);
for(std::vector<VPXConnection*>::iterator it = connections.begin(); it != connections.end(); ++it) {
VPXConnection& con = **it;
con.writeCommand(c, tmp);
}
// void writeToAllConnections(VPXCommands cmd, char* buf, size_t nbytes); /* write a command to all clients/connections */
}
// ScreenGrabber
// +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
VPXScreenGrabber::VPXScreenGrabber()
:num_bytes(0)
,read_dx(0)
,write_dx(1)
,millis_per_frame(0)
,frame_timeout(0)
,is_setup(false)
,cb_grabber(NULL)
,cb_user(NULL)
{
}
VPXScreenGrabber::~VPXScreenGrabber() {
is_setup = false;
read_dx = 0;
write_dx = 1;
millis_per_frame = 0;
num_bytes = 0;
frame_timeout = 0;
cb_grabber = NULL;
cb_user = NULL;
}
bool VPXScreenGrabber::setup(VPXSettings cfg, vpx_screengrabber_cb grabCB, void* grabUser) {
cb_grabber = grabCB;
cb_user = grabUser;
settings = cfg;
millis_per_frame = (1.0f/settings.fps) * 1000;
num_bytes = settings.in_w * settings.in_h * 4; // we grab in BGRA, gpu default
if(num_bytes == 0) {
RX_ERROR(("Unsupported image format"));
return false;
}
glGenBuffers(VPX_SCREENGRABBER_FBOS, pbos);
for(int i = 0; i < VPX_SCREENGRABBER_FBOS; ++i) {
glBindBuffer(GL_PIXEL_PACK_BUFFER, pbos[i]);
glBufferData(GL_PIXEL_PACK_BUFFER, num_bytes, NULL, GL_STREAM_READ);
}
glBindBuffer(GL_PIXEL_PACK_BUFFER, 0);
is_setup = true;
return true;
}
void VPXScreenGrabber::grabFrame() {
if(!is_setup) {
RX_ERROR(("not setup, cannot grab"));
return;
}
if(!cb_grabber) {
return;
}
if(!mustGrabFrame()) {
return;
}
read_dx = (read_dx + 1) % VPX_SCREENGRABBER_FBOS;
write_dx = (read_dx + (VPX_SCREENGRABBER_FBOS-2)) % VPX_SCREENGRABBER_FBOS;
glBindBuffer(GL_PIXEL_PACK_BUFFER, pbos[write_dx]);
glReadPixels(0,0,settings.in_w, settings.in_h, GL_BGRA, GL_UNSIGNED_BYTE, 0);
glBindBuffer(GL_PIXEL_PACK_BUFFER, pbos[read_dx]);
unsigned char* ptr = (unsigned char*)glMapBuffer(GL_PIXEL_PACK_BUFFER, GL_READ_ONLY);
if(ptr) {
cb_grabber(ptr, num_bytes, cb_user);
}
glUnmapBuffer(GL_PIXEL_PACK_BUFFER);
glBindBuffer(GL_PIXEL_PACK_BUFFER, 0);
}
bool VPXScreenGrabber::mustGrabFrame() {
int64_t now = uv_hrtime();
if(!frame_timeout) {
frame_timeout = now + (millis_per_frame * 1000000);
return true;
}
else if(now > frame_timeout) {
frame_timeout = now + (millis_per_frame * 1000000);
return true;
}
else {
return false;
}
}
// Connection
// +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
VPXConnection::VPXConnection(VPXOutStream& os)
:out_stream(os)
{
sock.data = this;
shutdown_req.data = this;
}
VPXConnection::~VPXConnection() {
}
void VPXConnection::parseBuffer() {
RX_VERBOSE(("Parse connection buffer"));
// printf("----------------------\n");
// for(int i = 0; i < buffer.size();++i) { printf("%c", buffer[i]); }
// printf("----------------------\n");
}
void VPXConnection::writeCommand(VPXCommands cmd, VPXBuffer& buf) {
/* the command */
char c = cmd;
write(&c, 1);
/* data size */
VPXBuffer tmp;
tmp.wu32(buf.size());
write(tmp.ptr(), tmp.size());
tmp.print();
RX_VERBOSE(("SENDING: %ld bytes", buf.size() + 5));
/* the data */
write(buf.ptr(), buf.size());
}
void VPXConnection::write(char* data, size_t nbytes) {
uv_buf_t buf = uv_buf_init(data, nbytes);
uv_write_t* req = new uv_write_t();
req->data = this;
int r = uv_write(req, (uv_stream_t*)&sock, &buf, 1, vpx_connection_on_write);
if(!r) {
RX_ERROR(("cannot uv_write(): %s", uv_strerror(uv_last_error(sock.loop))));
}
}
// GRABBER
// +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
void vpx_server_on_grabber_cb(unsigned char* pixels, size_t nbytes, void* user) {
VPXOutStream* s = static_cast<VPXOutStream*>(user);
s->num_frames++;
s->encoder.encode(pixels, s->num_frames);
}
// VPX (VPXOutStream)
// +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
void vpx_server_on_vpx_write(const vpx_codec_cx_pkt_t* pkt, int64_t pts, void* user) {
// RX_VERBOSE((" - / - >> SEND OVER NETWORK"));
VPXOutStream* s = static_cast<VPXOutStream*>(user);
s->writeToAllConnections(VPX_CMD_FRAME, (char*)pkt->data.frame.buf, pkt->data.frame.sz);
}
// UV CONNECTION
// +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
void vpx_connection_on_write(uv_write_t* req, int status) {
VPXConnection* c = static_cast<VPXConnection*>(req->data);
delete req;
if(status == -1) {
RX_ERROR(("error while trying to uv_write: %s", uv_strerror(uv_last_error(c->sock.loop))));
}
}
// UV SERVER
// +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
void vpx_server_on_new_connection(uv_stream_t* sock, int status) {
VPXOutStream* s = static_cast<VPXOutStream*>(sock->data);
if(status == -1) {
RX_ERROR(("cannot handle connection (?)"));
return;
}
VPXConnection* con = new VPXConnection(*s);
int r = 0;
r = uv_tcp_init(s->loop, &con->sock);
if(r) {
RX_ERROR(("uv_tcp_init failed for new connection."));
delete con;
con = NULL;
return;
}
r = uv_accept(sock, (uv_stream_t*)&con->sock);
if(r) {
RX_ERROR(("uv_accept failed for new connection."));
delete con;
con = NULL;
return;
}
r = uv_read_start((uv_stream_t*)&con->sock, vpx_server_on_alloc, vpx_server_on_read);
if(r) {
RX_ERROR(("uv_read_start failed for new connection."));
delete con;
con = NULL;
return;
}
s->writeHeader(con);
s->connections.push_back(con);
}
uv_buf_t vpx_server_on_alloc(uv_handle_t* handle, size_t nbytes) {
char* buf = new char[nbytes];
if(!buf) {
RX_ERROR(("ERROR - OUT OF MEMORY IN VPX STREAM"));
}
return uv_buf_init(buf, nbytes);
}
void vpx_server_on_read(uv_stream_t* sock, ssize_t nread, uv_buf_t buf) {
VPXConnection* c = static_cast<VPXConnection*>(sock->data);
/* disconnected */
if(nread < 0) {
uv_err_t err = uv_last_error(sock->loop);
if(err.code != UV_EOF) {
c->out_stream.removeConnection(c);
delete c;
c = NULL;
return;
}
if(buf.base) {
delete[] buf.base;
buf.base = NULL;
}
int r = uv_read_stop(sock);
if(r) {
RX_ERROR(("failed to uv_read_stop() after client disconnected: %s", uv_strerror(uv_last_error(sock->loop))));
}
r = uv_shutdown(&c->shutdown_req, sock, vpx_server_on_shutdown);
if(r) {
RX_ERROR(("failed to uv_shutdown() after client disconnected: %s", uv_strerror(uv_last_error(sock->loop))));
delete c;
c = NULL;
return;
}
return;
}
//std::copy(buf.base, buf.base+nread, std::back_inserter(c->buffer));
c->buffer.addBytes(buf.base, nread);
c->parseBuffer();
if(buf.base) {
delete[] buf.base;
buf.base = NULL;
}
}
void vpx_server_on_shutdown(uv_shutdown_t* req, int status) {
RX_VERBOSE(("-- SHUTDOWN ON SERVER -- "));
VPXConnection* c = static_cast<VPXConnection*>(req->data);
uv_close((uv_handle_t*)&c->sock, vpx_server_on_close);
}
void vpx_server_on_close(uv_handle_t* handle) {
RX_VERBOSE(("on close ... \n"));
VPXConnection* c = static_cast<VPXConnection*>(handle->data);
c->out_stream.removeConnection(c);
}
// UV CLIENT
// +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
void vpx_client_on_resolved(uv_getaddrinfo_t* req, int status, struct addrinfo* res) {
VPXInStream* s = static_cast<VPXInStream*>(req->data);
if(status == -1) {
RX_ERROR(("cannot resolve(): %s RECONNECT!", uv_strerror(uv_last_error(s->loop))));
s->reconnect();
return;
}
char ip[17] = {0};
uv_ip4_name((struct sockaddr_in*)res->ai_addr, ip, 16);
RX_VERBOSE(("resolved server: %s", ip));
int r = uv_tcp_connect(&s->connect_req, &s->sock,
*(struct sockaddr_in*)res->ai_addr,
vpx_client_on_connect);
uv_freeaddrinfo(res);
}
void vpx_client_on_connect(uv_connect_t* req, int status) {
RX_VERBOSE(("connected"));
VPXInStream* s = static_cast<VPXInStream*>(req->data);
if(status == -1) {
RX_ERROR(("cannot connect: %s RECONNECT!", uv_strerror(uv_last_error(s->loop))));
s->reconnect();
return;
}
int r = uv_read_start((uv_stream_t*)&s->sock, vpx_client_on_alloc, vpx_client_on_read);
if(r) {
RX_ERROR(("uv_read_start() failed %s", uv_strerror(uv_last_error(s->loop))));
return;
}
}
void vpx_client_on_read(uv_stream_t* handle, ssize_t nread, uv_buf_t buf) {
RX_VERBOSE(("Received data from server, :%ld bytes", nread));
VPXInStream* s = static_cast<VPXInStream*>(handle->data);
if(nread < 0) {
uv_err_t err = uv_last_error(handle->loop);
if(err.code != UV_EOF) {
RX_ERROR(("disconnected from server, but not correctly!"));
return;
}
if(buf.base) {
delete[] buf.base;
buf.base = NULL;
}
int r = uv_shutdown(&s->shutdown_req, handle, vpx_client_on_shutdown);
if(r) {
RX_ERROR(("error shutting down client. %s", uv_strerror(uv_last_error(handle->loop))));
delete s;
s = NULL;
return;
}
RX_ERROR(("------- NEED TO RECONNECT TO THE SERVER ------------- "));
return;
}
//std::copy(buf.base, buf.base+nread, std::back_inserter(s->buffer));
s->buffer.addBytes(buf.base, nread);
s->parseBuffer();
if(buf.base) {
delete[] buf.base;
buf.base = NULL;
}
}
void vpx_client_on_write(uv_write_t* req, int status) {
RX_ERROR(("- need to implement + free req "));
}
void vpx_client_on_shutdown(uv_shutdown_t* req, int status) {
VPXInStream* s = static_cast<VPXInStream*>(req->data);
uv_close((uv_handle_t*)&s->sock, vpx_client_on_close);
}
void vpx_client_on_close(uv_handle_t* handle) {
VPXInStream* s = static_cast<VPXInStream*>(handle->data);
s->clear();
s->reconnect();
}
void vpx_client_on_reconnect_timer(uv_timer_t* handle, int status) {
VPXInStream* s = static_cast<VPXInStream*>(handle->data);
if(status == -1) {
RX_ERROR(("error in reconnect timer. %s", uv_strerror(uv_last_error(handle->loop))));
return;
}
RX_VERBOSE(("reconnecting"));
s->connect();
}
uv_buf_t vpx_client_on_alloc(uv_handle_t* handle, size_t nbytes) {
char* buf = new char[nbytes];
if(!buf) {
RX_ERROR(("Cannot allocate memory for uv_buf_t, client stream"));
}
return uv_buf_init(buf, nbytes);
}
/*
Protocol description
--------------------
We use a very simply protocol where we basically send only VPXCommands which all
have this format: COMMAND u(8) + DATA_SIZE u(32) + DATA u(8 * DATA_SIZE). All
integers are stored in big endian format.
Stream flow
----------
(1) Client connects ----->
<------- Server sends most important part of VPXSettings (1)
<------- Server start sending lots of VPX_CMD_FRAME commands (*)
(1) Client disconnects ---->
<------- Server removes connection
*/
#ifndef ROXLU_VPX_STREAM_H
#define ROXLU_VPX_STREAM_H
extern "C" {
# include <uv.h>
}
#include <stdio.h>
#include <roxlu/core/Log.h>
#include <roxlu/opengl/OpenGLInit.h>
#include <webm/VPXEncoder.h>
#include <algorithm>
#include <vector>
#include <string>
#define VPX_UV_ERR(r, okval, msg, ret) \
if(r != okval) { \
RX_ERROR((msg)); \
RX_ERROR(("uv error: %s", uv_strerror(uv_last_error(uv_default_loop())))) \
return ret; \
}
/* server networking callbacks */
void vpx_server_on_new_connection(uv_stream_t* sock, int status);
void vpx_server_on_read(uv_stream_t* sock, ssize_t nread, uv_buf_t buf);
void vpx_server_on_shutdown(uv_shutdown_t* req, int status);
void vpx_server_on_close(uv_handle_t* handle);
uv_buf_t vpx_server_on_alloc(uv_handle_t* handle, size_t nbytes);
/* connections on server */
void vpx_connection_on_write(uv_write_t* req, int status);
/* client networking callback */
void vpx_client_on_resolved(uv_getaddrinfo_t* req, int status, struct addrinfo* res);
void vpx_client_on_connect(uv_connect_t* req, int status);
void vpx_client_on_read(uv_stream_t* handle, ssize_t read, uv_buf_t buf);
void vpx_client_on_write(uv_write_t* req, int status);
void vpx_client_on_shutdown(uv_shutdown_t* req, int status);
void vpx_client_on_close(uv_handle_t* handle);
void vpx_client_on_reconnect_timer(uv_timer_t* handle, int status);
uv_buf_t vpx_client_on_alloc(uv_handle_t* handle, size_t nbytes);
struct VPXBuffer {
VPXBuffer();
~VPXBuffer();
void reset(); /* resets the data vector and read dx */
void clear(); /* clears the buffer */
void addBytes(char* b, size_t nbytes); /* copy bytes to our bufer */
size_t size();
char* ptr(); /* get ptr to the start of the buffer */
void print();
void write(char* buf, size_t nbytes); /* store arbitrary number of bytes */
void wu8(uint8_t byte); /* store u8 */
void wu16(uint16_t byte); /* store u16, BE */
void wu32(uint32_t byte); /* store u32, BE */
uint8_t ru8(); /* read u8, move read index */
uint16_t ru16(); /* read u16, move read index */
uint32_t ru32(); /* read u32, move read index */
uint8_t pu8(int dx = 0); /* peek u8 */
uint16_t pu16(int dx = 0); /* peek u16 */
uint32_t pu32(int dx = 0); /* peek u32 */
void flush(int nbytes); /* erase data from buffer */
std::vector<uint8_t> data;
int read_dx;
};
/* grabber callback */
void vpx_server_on_grabber_cb(unsigned char* ptr, size_t nbytes, void* user);
/* vpx codec callbacks */
void vpx_server_on_vpx_write(const vpx_codec_cx_pkt_t* pkt, int64_t pts, void* user);
#define VPX_SCREENGRABBER_FBOS 4
typedef void(*vpx_screengrabber_cb)(unsigned char* ptr, size_t nbytes, void* user);
class VPXScreenGrabber { /* helper class that we use to grab pixels from the gpu in a fast way */
public:
VPXScreenGrabber();
~VPXScreenGrabber();
bool setup(VPXSettings cfg, vpx_screengrabber_cb grabCB, void* grabUser);
void grabFrame(); /* call this after each draw() you want to record. we will call the set callback when we actually grab data (based on the set fps) */
private:
bool mustGrabFrame(); /* internally used to check if we need to grab a new frame to keep up with the set fps */
private:
bool is_setup; /* you must call setup; this is used to check if setup is called */
VPXSettings settings; /* settings that we use for encoding and grabbing pixels from the screen */
int64_t frame_timeout; /* when we timeout we grab a new frame */
int millis_per_frame; /* millis per frame, based on the vpxsettings */
int num_bytes; /* number of bytes in one raw frame */
GLuint pbos[VPX_SCREENGRABBER_FBOS]; /* our pbos to boost pixel transfers */
int read_dx; /* we read from another pbo, in such a way that the gpu can sync up we glMapBuffer doesnt stall */
int write_dx; /* "" */
vpx_screengrabber_cb cb_grabber; /* we call this function when we've data to be processed; note that you're in the same thread as GL! */
void* cb_user; /* passed through to the callback */
};
enum VPXCommands {
VPX_CMD_SETTINGS, /* when a client connects the server automatically sends the used VPX settings */
VPX_CMD_FRAME /* every time we have a new encoded VPX frame on the server (outstream), we send it with this command */
};
struct VPXCommand { /* basic structure we used in communication: */
VPXCommand(unsigned char cmd); /* - byte 1: the command name */
unsigned char cmd; /* - byte 2,3,4,5: unsigned int, BE, size of data */
VPXBuffer buffer; /* - # bytes: the data for the command */
};
struct VPXOutStream;
struct VPXConnection { /* a VPXConnection represents a connection that is handled by the server (VPXOutStream) */
VPXConnection(VPXOutStream& os);
~VPXConnection();
void parseBuffer(); /* parsed the incoming buffer (commands from a client) */
void write(char* data, size_t nbytes); /* write to socket */
void writeCommand(VPXCommands cmd, VPXBuffer& buf);
uv_tcp_t sock; /* the socket, connected to the server */
uv_shutdown_t shutdown_req; /* context, used to shutdown the connection */
VPXOutStream& out_stream; /* the server to which the VPXConnection is connected */
VPXBuffer buffer; /* input buffer, received form this socket */
};
struct VPXInStream {
VPXInStream(std::string host, std::string port);
~VPXInStream();
bool connect();
void update();
void parseBuffer(); /* (do not call), used to parse the incoming bitstream, is called for you */
void clear(); /* resets all allocated data */
void reconnect(); /* when we get disconnected we automatically try to reconnect */
std::string host;
std::string port;
VPXBuffer buffer; /* the received data (encoded VPX) */
VPXSettings settings; /* is created after receiving the header from the server */
int proto_version; /* protocol version, started with '1' */
uv_loop_t* loop;
uv_tcp_t sock;
uv_shutdown_t shutdown_req;
uv_getaddrinfo_t resolver_req;
uv_connect_t connect_req;
uv_timer_t timer_req;
};
struct VPXOutStream { /* this is our 'server', clients can connect and get video data frome it */
public:
VPXOutStream(int port); /* pass the port of the server to the ctor. clients can connect to this port */
~VPXOutStream();
bool setup(VPXSettings settings);
bool start(); /* call start when you want to start the server and accept clients */
void grabFrame();
void update(); /* call update() as much as possible; this processing the socket buffers */
void removeConnection(VPXConnection* c); /* removes a connection; you don't need to call this */
/* privates, but public for callbacks */
void writeHeader(VPXConnection* c); /* sends the necessary settings to encode the video */
void writeToAllConnections(VPXCommands cmd, char* buf, size_t nbytes); /* write a command to all clients/connections */
public:
bool is_setup;
VPXScreenGrabber grabber;
VPXEncoder encoder;
VPXSettings settings;
std::vector<VPXConnection*> connections;
uv_tcp_t sock;
uv_loop_t* loop;
int port;
unsigned int num_frames;
};
#endif
@roxlu
Copy link
Author

roxlu commented Feb 15, 2013

Server log:

[verbose] [void VPXConnection::writeCommand(VPXCommands, VPXBuffer &) L453] - SENDING: 493 bytes
[error] [void VPXConnection::write(char *, size_t) L466] - cannot uv_write(): success
[error] [void VPXConnection::write(char *, size_t) L466] - cannot uv_write(): success
[error] [void VPXConnection::write(char *, size_t) L466] - cannot uv_write(): success
00 00 00 43
[verbose] [void VPXConnection::writeCommand(VPXCommands, VPXBuffer &) L453] - SENDING: 72 bytes
[error] [void VPXConnection::write(char *, size_t) L466] - cannot uv_write(): success
[error] [void VPXConnection::write(char *, size_t) L466] - cannot uv_write(): success
[error] [void VPXConnection::write(char *, size_t) L466] - cannot uv_write(): success
00 00 00 39
[verbose] [void VPXConnection::writeCommand(VPXCommands, VPXBuffer &) L453] - SENDING: 62 bytes
[error] [void VPXConnection::write(char *, size_t) L466] - cannot uv_write(): success
[error] [void VPXConnection::write(char *, size_t) L466] - cannot uv_write(): success
[error] [void VPXConnection::write(char *, size_t) L466] - cannot uv_write(): success
00 00 00 32
[verbose] [void VPXConnection::writeCommand(VPXCommands, VPXBuffer &) L453] - SENDING: 55 bytes
[error] [void VPXConnection::write(char *, size_t) L466] - cannot uv_write(): success
[error] [void VPXConnection::write(char *, size_t) L466] - cannot uv_write(): success
[error] [void VPXConnection::write(char *, size_t) L466] - cannot uv_write(): success
00 00 00 31
[verbose] [void VPXConnection::writeCommand(VPXCommands, VPXBuffer &) L453] - SENDING: 54 bytes
[error] [void VPXConnection::write(char *, size_t) L466] - cannot uv_write(): success
[error] [void vpx_connection_on_write(uv_write_t *, int) L494] - error while trying to uv_write: broken pipe
[error] [void vpx_connection_on_write(uv_write_t *, int) L494] - error while trying to uv_write: broken pipe
Assertion failed: (!uv__io_active(&stream->io_watcher, UV__POLLIN) && "stream->read_cb(status=-1) did not call uv_close()"), function uv__read, file src/unix/stream.c, line 995.

@roxlu
Copy link
Author

roxlu commented Feb 15, 2013

----> send to client
----> send to client
<---- disconnect
-----> send to client
-----> read -1 from client
- disconnected
- shutdown
-----> send to client
-----> shutdown callback
-----> close callback

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment