Skip to content

Instantly share code, notes, and snippets.

@mausvt
Last active August 29, 2015 14:11
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mausvt/9d601360519dd4e3bdc6 to your computer and use it in GitHub Desktop.
Save mausvt/9d601360519dd4e3bdc6 to your computer and use it in GitHub Desktop.
Ugly mess uv_tcp_connect do not give callback
#include <iostream>
#include <assert.h>
//#include <netdb.h>
#include <errno.h>
#include <QList>
#include "comm.h"
#include "uiconf.h"
using namespace std;
UDPthread::UDPthread(QList<UiElement*> *elements, int port)
: m_stop(false),
m_port(port),
m_elements(elements)
{
start();
}
UDPthread::~UDPthread()
{
m_stop = true;
wait();
}
void UDPthread::run()
{
char packet[65507];
SqComm::PacketHdr *hdr = (SqComm::PacketHdr*)packet;
QUdpSocket sock;
char *p;
sock.bind(m_port);
printf("sss %d \n", m_port);
while(!m_stop)
{ printf("_\n");
if(!sock.waitForReadyRead(100))
continue;
printf(".");
int s = sock.pendingDatagramSize();
assert(s <= (int)sizeof(packet));
sock.readDatagram(packet, s);
if(hdr->sig != SIGNATURE)
{
cerr << "Bad signature of UDP packet" << endl;
continue;
}
while(s - sizeof(SqComm::PacketHdr) < hdr->len)
{ printf("`");
if(!sock.waitForReadyRead(3000))
continue;
assert(s + sock.pendingDatagramSize() <= sizeof(packet));
s += sock.readDatagram(packet+s, sock.pendingDatagramSize());
}
printf("#\n");
p = packet+sizeof(SqComm::PacketHdr);
if(m_elements)
{
foreach(UiElement *el, *m_elements)
{
switch(el->type())
{
case Element::Signal:
{
SqComm::Digital *dd = (SqComm::Digital*)(p+el->offset());
((UiSignal*)el)->arrived(dd->valid, dd->value);
break;
}
case Element::Control:
{
SqComm::Digital *dd = (SqComm::Digital*)(p+el->offset());
((UiControl*)el)->arrived(dd->valid, dd->value);
break;
}
case Element::Sensor:
{
SqComm::Analog *ad = (SqComm::Analog*)(p+el->offset());
((UiSensor*)el)->arrived(ad->valid, ad->value, ad->rawValue);
break;
}
case Element::Setpoint:
{
SqComm::Analog *ad = (SqComm::Analog*)(p+el->offset());
((UiSetpoint*)el)->arrived(ad->valid, ad->value, ad->rawValue);
break;
}
case Element::Bool:
{
bool val = *(bool*)(p+el->offset());
((UiBool*)el)->arrived(val);
break;
}
case Element::Lock:
{
bool val = *(bool*)(p+el->offset());
((UiLock*)el)->arrived(val);
break;
}
case Element::Number:
{
float val = *(float*)(p+el->offset());
((UiNumber*)el)->arrived(val);
break;
}
case Element::String:
case Element::Namespace:
case Element::Function:
case Element::Procedure:
case Element::Trap:
break;
case Element::Unknown:
assert(false);
break;
}
}
}
}
if(m_elements)
{
foreach(UiElement *el, *m_elements)
{
el->reset();
}
}
}
TCPthread::TCPthread()
: SqLink(),
m_stop(false),
m_sock(-1),
m_udp(0)
{}
TCPthread::~TCPthread()
{
disconnect();
}
bool TCPthread::connect(const QString & host, int tcpPort, int udpPort, QList<UiElement*> *elements)
{
/*
if(m_sock >= 0)
return false;
struct hostent *he = gethostbyname(host.toStdString().c_str());
if(!he)
{
cerr << "gethostbyname() error: " << strerror(h_errno) << endl;
return false;
}
m_sock = socket(PF_INET, SOCK_STREAM, 0);
if(m_sock < 0)
{
cerr << "socket() error: " << strerror(errno) << endl;
return false;
}
struct sockaddr_in sin;
sin.sin_family = AF_INET;
sin.sin_port = htons(tcpPort);
sin.sin_addr.s_addr = *(int*)(he->h_addr);
if(::connect(m_sock, (struct sockaddr*)&sin, sizeof(struct sockaddr)))
{
// cerr << "connect() error: " << strerror(errno) << endl;
close(m_sock);
m_sock = -1;
return false;
}
QString cmd = QString("SUBSCRIBE PORT %1 DELAY 49 ALL").arg(udpPort);
m_udp = new UDPthread(elements, udpPort);
*/
m_stop = false;
m_elements=elements;
m_tcp=tcpPort;
m_udp=udpPort;
m_host=host.toStdString();
start();
//command(cmd);
emit connected();
return true;
}
bool TCPthread::isConnected()
{
return (m_sock != -1);
}
bool TCPthread::disconnect()
{
if(m_sock < 0)
return false;
m_stop = true;
if(!wait())
return false;
return true;
}
bool TCPthread::command(const QString & cmd)
{
/* if(m_sock < 0)
return false;
QByteArray data = cmd.toUtf8();
SqComm::PacketHdr hdr;
hdr.sig = SIGNATURE;
hdr.len = data.size();
if(send(m_sock, (char*)&hdr, sizeof(SqComm::PacketHdr), 0) < 0)
return false;
if(send(m_sock, data.constData(), data.size(), 0) < 0)
return false;
*/
SqLink::command(cmd.toStdString());
return true;
}
void TCPthread::datum(const char *p, size_t size)
{ m_sock=1;
if (m_stop) shutout();
if(m_elements)
{
foreach(UiElement *el, *m_elements)
{
switch(el->type())
{
case Element::Signal:
{
SqComm::Digital *dd = (SqComm::Digital*)(p+el->offset());
((UiSignal*)el)->arrived(dd->valid, dd->value);
break;
}
case Element::Control:
{
SqComm::Digital *dd = (SqComm::Digital*)(p+el->offset());
((UiControl*)el)->arrived(dd->valid, dd->value);
break;
}
case Element::Sensor:
{
SqComm::Analog *ad = (SqComm::Analog*)(p+el->offset());
((UiSensor*)el)->arrived(ad->valid, ad->value, ad->rawValue);
break;
}
case Element::Setpoint:
{
SqComm::Analog *ad = (SqComm::Analog*)(p+el->offset());
((UiSetpoint*)el)->arrived(ad->valid, ad->value, ad->rawValue);
break;
}
case Element::Bool:
{
bool val = *(bool*)(p+el->offset());
((UiBool*)el)->arrived(val);
break;
}
case Element::Lock:
{
bool val = *(bool*)(p+el->offset());
((UiLock*)el)->arrived(val);
break;
}
case Element::Number:
{
float val = *(float*)(p+el->offset());
((UiNumber*)el)->arrived(val);
break;
}
case Element::String:
case Element::Namespace:
case Element::Function:
case Element::Procedure:
case Element::Trap:
break;
case Element::Unknown:
assert(false);
break;
}
}
}
}
void TCPthread::run()
{
fprintf(stderr,"tcpthreadi\n");
SqLink::connect(m_host, m_tcp, m_udp);
fprintf(stderr,"tcpthreadk\n");
uv_run(loop,UV_RUN_DEFAULT);
/*int r;
SqComm::PacketHdr hdr;
char *body = 0;
// cout << "TCPthread started" << endl;
while(!m_stop)
{
fd_set rdfs;
FD_ZERO(&rdfs);
FD_SET(m_sock, &rdfs);
struct timeval tv;
tv.tv_sec = 0;
tv.tv_usec = 100000;
r = select(m_sock+1, &rdfs, 0, 0, &tv);
if(!r)
continue;
if(r < 0)
{
cerr << "select() error: " << strerror(errno) << endl;
break;
}
r = recv(m_sock, &hdr, sizeof(SqComm::PacketHdr), 0);
if(!r)
break;
if(r < 0)
{
cerr << "recv() error: " << strerror(errno) << endl;
break;
}
if(hdr.sig != SIGNATURE)
{
cerr << "Bad packet signature" << endl;
break;
}
body = new char[hdr.len+1];
int len = 0;
while((len < hdr.len) && (r >=0 ) && !m_stop)
{
do
{
FD_ZERO(&rdfs);
FD_SET(m_sock, &rdfs);
tv.tv_sec = 0;
tv.tv_usec = 100000;
r = select(m_sock+1, &rdfs, 0, 0, &tv);
} while(!r && !m_stop);
if(m_stop)
break;
if(r < 0)
{
cerr << "select() error: " << strerror(errno) << endl;
break;
}
r = recv(m_sock, body+len, hdr.len-len, 0);
if(!r)
break;
if(r < 0)
{
cerr << "recv() error: " << strerror(errno) << endl;
break;
}
len += r;
}
body[len] = 0;
emit message(body);
// cout << "GOT: " << body << endl;
delete[] body;
body = 0;
}
if(body)
delete[] body;
// cout << "TCPthread finished" << endl;
close(m_sock);
m_sock = -1;
delete m_udp;
*/
emit disconnected();
}
void alloc_buffer(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf) {
buf->base = (char*)malloc(suggested_size);
buf->len = suggested_size;
}
void SqLink::connect(const string &host, int tcpPort, int udpPort)
{
struct addrinfo hints;
hints.ai_family = PF_INET;
hints.ai_socktype = SOCK_STREAM;
hints.ai_protocol = IPPROTO_TCP;
hints.ai_flags = 0;
resolver.data=this;
tcp_port=tcpPort;
udp_port=udpPort;
int r = uv_getaddrinfo(loop, &resolver, resolved, host.c_str(), NULL, &hints);
if (r) {
fprintf(stderr, "getaddrinfo call error %s\n", uv_err_name(r));
return;// 1;
}
uv_udp_init(loop, &reciver);
addrlen=0;
reciver.data=this;
struct sockaddr_in recv_addr;
assert(uv_ip4_addr("0.0.0.0", udp_port, &recv_addr) == 0);
r=uv_udp_bind(&reciver, (const struct sockaddr *)&recv_addr, 0);
if (r) {
fprintf(stderr, "udp_bind %s\n", uv_err_name(r));
return;// 1;
}
r=uv_udp_recv_start(&reciver, alloc_buffer, readon);
if (r) {
fprintf(stderr, "udp_recv_start %s\n", uv_err_name(r));
return;// 1;
}
uv_async_init(loop, &sender, post_message);
}
void SqLink::resolved(uv_getaddrinfo_t *resolver, int status, struct addrinfo *res) {
SqLink *self=(SqLink*)resolver->data;
if (status < 0) {
fprintf(stderr, "getaddrinfo callback error %s\n", uv_err_name(status));
return;
}
char addr[17] = {'\0'};
uv_ip4_name((struct sockaddr_in*) res->ai_addr, addr, 16);
fprintf(stderr, "%s\n", addr);
((struct sockaddr_in*)res->ai_addr)->sin_port=htons(self->tcp_port);
self->socket.data=(void*)self;
uv_tcp_init(self->loop, &(self->socket));
self->connect_req.data = (void*) self;
int r;
r=uv_tcp_connect(&(self->connect_req), &(self->socket), (const struct sockaddr*) res->ai_addr,SqLink::connected);
fprintf(stderr,"try con %d\n",r);
uv_freeaddrinfo(res);
}
SqLink::SqLink(){
fprintf(stderr,"sqlink\n");
loop=uv_loop_new();
uv_async_init(loop, &sender, post_message);
sender.data=this;
uv_sem_init(&sem,1);
addrlen=0;
need_header=sizeof(SqComm::PacketHdr);
need_to_command=0;
fold="";
};
SqLink::~SqLink(){
uv_sem_wait(&sem);
uv_sem_destroy(&sem);
uv_loop_close(loop);
};
void SqLink::shutout(){
uv_close((uv_handle_t*) &(sender), done);
}
void SqLink::done(uv_handle_t* client) {
SqLink* self=static_cast<SqLink*>(client->data);
assert(self);
if (!uv_is_closing((uv_handle_t*)&(self->sender)))
{
uv_close((uv_handle_t*) &(self->sender), done);
return;
}
if (!uv_is_closing((uv_handle_t*)&(self->socket)))
{
uv_close((uv_handle_t*) &(self->socket), done);
return;
}
if (!uv_is_closing((uv_handle_t*)&(self->reciver)))
{
uv_close((uv_handle_t*) &(self->reciver), done);
return;
}
if (!uv_is_closing((uv_handle_t*)&(self->resolver)))
{
uv_close((uv_handle_t*) &(self->resolver), done);
return;
}
}
typedef struct {
uv_write_t req;
uv_buf_t buf;
} write_req_t;
void SqLink::writen(uv_write_t *req, int status) {
write_req_t* wr;
wr = (write_req_t*) req;
if (status) {
fprintf(stderr, "async write %s\n", uv_err_name(status));
uv_close((uv_handle_t*) &req->data, done);
};
assert(wr->req.type == UV_WRITE);
/* Free the read/write buffer and the request */
free(wr->buf.base);
free(wr);
}
void SqLink::send_message(const string &msg){
write_req_t *wr = (write_req_t*) malloc(sizeof(write_req_t));
alloc_buffer((uv_handle_t*)&socket, sizeof(SqComm::PacketHdr)+ msg.length() ,&(wr->buf));
SqComm::PacketHdr *hdr=(SqComm::PacketHdr *)(wr->buf).base;
hdr->sig = SIGNATURE;
hdr->len = msg.length();
memcpy((wr->buf).base+sizeof(SqComm::PacketHdr),msg.c_str(),msg.length());
wr->req.data=&socket;
uv_write(&wr->req, (uv_stream_t*) &socket, &wr->buf, 1/*nbufs*/, writen);
}
void SqLink::command(const string &msg){
uv_sem_wait(&sem);
pipe.push_back(msg);
uv_sem_post(&sem);
uv_async_send(&sender);
}
void SqLink::post_message(uv_async_t *handle) {
SqLink* self=(SqLink*)handle->data;
for(;;){
uv_sem_wait(&(self->sem));
if (self->pipe.empty() || !self->addrlen) {
uv_sem_post(&(self->sem));
return;
}
string msg=self->pipe.front();
self->pipe.pop_front();
uv_sem_post(&(self->sem));
self->send_message(msg);
}
}
void SqLink::connected(uv_connect_t *req, int status) {
SqLink* self=(SqLink*)req->data;
if (status < 0) {
fprintf(stderr, "connect failed error %s\n", uv_err_name(status));
uv_close((uv_handle_t*)&(self->sender),done);
return;
}
fprintf(stderr, "connect \n");
self->addrlen=sizeof(sock_addr);
uv_tcp_getpeername(&self->socket,&self->sock_addr,&self->addrlen);
uv_read_start((uv_stream_t*) &(self->socket), alloc_buffer, readin);
char ss[100];
snprintf(ss, sizeof(ss), "SUBSCRIBE PORT %d DELAY %d ", self->udp_port, 50);
self->send_message(ss + string("ALL"));
}
void SqLink::readin(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) {
SqLink* self = (SqLink*)client->data;
if (nread < 0) {
if (nread != UV_EOF)
fprintf(stderr, "Read error %s\n", uv_err_name(nread));
uv_close((uv_handle_t*) client, done);
return;
}
self->packet(buf->base, nread);
free(buf->base);
}
void SqLink::packet(char* base, int len){
if (len<=0) return;
if (need_to_command) {
if (need_to_command < len) {
message(fold+string(base, need_to_command));
fold="";
base+=need_to_command;
len-=need_to_command;
need_to_command=0;
need_header=sizeof(hdr);
packet(base,len);
} else {
need_to_command-=len;
if (!need_to_command) {
message(fold+string(base, len));
fold="";
need_header=sizeof(hdr);
} else {
fold=fold+string(base,len);
}
}
} else if (need_header) {
if (need_header > len) {
memcpy((((char*)&hdr)+(sizeof(hdr) - need_header)), base, len);
need_header-=len; len=0;
} else {
memcpy((((char*)&hdr)+(sizeof(hdr) - need_header)), base, need_header);
len-=need_header;
base=base+need_header;
need_header=0;
};
if (!need_header) {
if (hdr.sig==SIGNATURE) {
need_to_command=hdr.len;
packet(base,len);
} else {
printf("peer proto fail\n");
uv_close((uv_handle_t*) &(this->socket), done);
}
}
}
}
void SqLink::readon(uv_udp_t *req, ssize_t nread, const uv_buf_t *buf, const struct sockaddr *addr, unsigned flags) {
if (nread < 0) {
fprintf(stderr, "Read error %s\n", uv_err_name(nread));
uv_close((uv_handle_t*) req, done);
free(buf->base);
return;
}
SqLink* self=(SqLink*)req->data;
if (addr) {
if (addr->sa_family!=self->sock_addr.sa_family){
free(buf->base);
return;
} else {
switch(addr->sa_family)
{
case AF_INET:
if (memcmp(&(((sockaddr_in*)&(self->sock_addr))->sin_addr),
&(((sockaddr_in*)addr)->sin_addr),
sizeof(sockaddr_in::sin_addr))!=0)
{
free(buf->base);
return;
}
break;
case AF_INET6:
if (memcmp(&(((sockaddr_in6*)&(self->sock_addr))->sin6_addr),
&(((sockaddr_in6*)addr)->sin6_addr),
sizeof(sockaddr_in6::sin6_addr))!=0)
{
free(buf->base);
return;
}
break;
default:
free(buf->base);
return;
}
}
char sender[17] = { 0 };
uv_ip4_name((const struct sockaddr_in*) addr, sender, 16);
} else {
if (!nread) {
free(buf->base);
return;
}
}
if (((SqComm::PacketHdr*)buf->base)->sig!=SIGNATURE) {
free(buf->base);
return;
} else {
if ((((SqComm::PacketHdr*)buf->base)->len+sizeof(SqComm::PacketHdr))!=(size_t)nread)
{
free(buf->base);
return;
}
}
self->datum(buf->base+sizeof(SqComm::PacketHdr),buf->len);
free(buf->base);
// uv_udp_recv_stop(req);
}
#ifndef COMM_H
#define COMM_H
#include <QThread>
#include <QtNetwork>
using namespace std;
#include <string.h>
#include <string>
#include <uv.h>
#define TCP_PORT 0x1441
#define SIGNATURE 0x5153
namespace SqComm{
#pragma pack(push, 1)
typedef struct
{
unsigned short sig;
unsigned short len;
} PacketHdr;
typedef struct
{
char valid;
float value;
float rawValue;
} Analog;
typedef struct
{
char valid;
char value;
} Digital;
#pragma pack(pop)
}
class SqLink {
protected:
int tcp_port;
int udp_port;
uv_loop_t *loop;
private:
uv_getaddrinfo_t resolver;
uv_connect_t connect_req;
int addrlen;
struct sockaddr sock_addr;
uv_tcp_t socket;
uv_udp_t reciver;
uv_async_t sender;
uv_sem_t sem;
list<string> pipe;
int need_to_command;
int need_header;
string fold;
SqComm::PacketHdr hdr;
void packet(char* base, int len);
void send_message(const string &msg);
static void post_message(uv_async_t *handle);
static void done(uv_handle_t* client);
static void writen(uv_write_t *req, int status);
static void resolved(uv_getaddrinfo_t *resolver, int status, struct addrinfo *res);
static void connected(uv_connect_t *req, int status);
static void readin(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf);
static void readon(uv_udp_t *req, ssize_t nread, const uv_buf_t *buf, const struct sockaddr *addr, unsigned flags);
public:
SqLink();
virtual ~SqLink();
virtual void message(const string &msg){};
virtual void datum(const char* data, size_t size){};
void connect(const string &host, int tcpPort, int udpPort);
void command(const string &msg);
void shutout();
};
class UiElement;
class UDPthread : public QThread
{
bool m_stop;
int m_port;
QList<UiElement*> *m_elements;
public:
UDPthread(QList<UiElement*> *elements, int port);
~UDPthread();
void run();
};
class TCPthread : public QThread, public SqLink
{
Q_OBJECT
bool m_stop;
int m_sock;
// UDPthread *m_udp;
QList<UiElement*> *m_elements;
int m_tcp;
int m_udp;
string m_host;
public:
TCPthread();
~TCPthread();
bool connect(const QString & host, int tcpPort, int udpPort, QList<UiElement*> *elements);
bool disconnect();
bool isConnected();
bool command(const QString & cmd);
void run();
void message(const string &msg){ emit message(QString::fromUtf8(msg.c_str(),msg.length())); };
void datum(const char* data, size_t size);
signals:
void connected();
void disconnected();
void message(const QString & str);
};
#endif // COMM_H
@mausvt
Copy link
Author

mausvt commented Dec 26, 2014

check up uv_async_init

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment