Skip to content

Instantly share code, notes, and snippets.

Created May 28, 2013 06:35
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save anonymous/5660853 to your computer and use it in GitHub Desktop.
Save anonymous/5660853 to your computer and use it in GitHub Desktop.
#include <fcntl.h>
#include <string.h>
#include <stdlib.h>
#include <errno.h>
#include <stdio.h>
#include <netinet/in.h>
#include <resolv.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <pthread.h>
#include <time.h>
#include <sys/types.h>
#include <sys/poll.h>
#include <signal.h>
#define MEMBUF_SIZE 4096
#define POLL_DELAY 2500
#define PORT 1101
//thread prototype for the connected children.
void* SocketHandler(void*);
void* StreamHandler(void*);
//data we'll share between all the threads.
struct shared {
pthread_mutex_t sharedlock;
int datarelayenabled;
int outfds[128];
char buffer[80];
int recording;
int outfd;
};
//data we'll give to each thread uniquely.
struct data {
int csock;
struct shared *data; //pointer to shared data.
};
int main(int argv, char** argc) {
signal(SIGPIPE, SIG_IGN);
int host_port=PORT;
struct sockaddr_in my_addr;
int hsock;
int * p_int ;
socklen_t addr_size = 0;
struct sockaddr_in sadr;
pthread_t thread_id=0;
//init the global data.
struct shared *global = (struct shared*)malloc(sizeof(struct shared));
global->datarelayenabled=0;
global->recording=0;
pthread_mutex_init(&(global->sharedlock), NULL);
pthread_create(&thread_id,0,&StreamHandler, (void*)global );
pthread_detach(thread_id);
struct data *datainst;
hsock = socket(AF_INET, SOCK_STREAM, 0);
if(hsock == -1) {
printf("Error initializing socket %d\n", errno);
goto FINISH;
}
p_int = (int*)malloc(sizeof(int));
*p_int = 1;
if( (setsockopt(hsock, SOL_SOCKET, SO_REUSEADDR, (char*)p_int, sizeof(int)) == -1 )||
(setsockopt(hsock, SOL_SOCKET, SO_KEEPALIVE, (char*)p_int, sizeof(int)) == -1 ) ) {
printf("Error setting options %d\n", errno);
free(p_int);
goto FINISH;
}
free(p_int);
my_addr.sin_family = AF_INET ;
my_addr.sin_port = htons(host_port);
memset(&(my_addr.sin_zero), 0, 8);
my_addr.sin_addr.s_addr = INADDR_ANY ;
if( bind( hsock, (struct sockaddr*)&my_addr, sizeof(my_addr)) == -1 ) {
fprintf(stderr,"Error binding to socket, make sure nothing else is listening on this port %d\n",errno);
goto FINISH;
}
if(listen( hsock, 10) == -1 ) {
fprintf(stderr, "Error listening %d\n",errno);
goto FINISH;
}
//Now lets do the server stuff
addr_size = sizeof(struct sockaddr_in);
while(1) {
printf("waiting for a connection\n");
//allocate the block we'll send to the thread
datainst = (struct data *)malloc(sizeof(struct data));
//hook up our global shared data struct..
datainst->data = global;
//get the socket, store into struct.
if((datainst->csock = accept( hsock, (struct sockaddr*)&sadr, &addr_size))!= -1) {
pthread_mutex_lock(&(global->sharedlock));
printf("-%d--------------------\nReceived connection from %s\n",global->datarelayenabled,inet_ntoa(sadr.sin_addr));
pthread_mutex_unlock(&(global->sharedlock));
pthread_create(&thread_id,0,&SocketHandler, (void*)datainst );
pthread_detach(thread_id);
} else {
fprintf(stderr, "Error accepting %d\n", errno);
}
}
FINISH:
free(global);
return 0;
}
void* StreamHandler(void* lp) {
struct shared *global = (struct shared *)lp;
int devfd;
char *ifname;
struct pollfd fds[2];
void *membuf;
ssize_t nbytes;
time_t timer;
char buffer[80];
struct tm* tm_info;
int outfdcount;
int retval;
ifname = "/dev/video0";
if(NULL == (membuf = malloc(MEMBUF_SIZE))) {
printf("Not enough memory to allocate buffer\n");
fprintf(stderr, "Not enough memory\n");
exit(EXIT_FAILURE);
}
while(1) {
int enabled=0;
pthread_mutex_lock(&(global->sharedlock));
enabled = global->datarelayenabled;
pthread_mutex_unlock(&(global->sharedlock));
while(!enabled) {
sleep(1);
pthread_mutex_lock(&(global->sharedlock));
enabled = global->datarelayenabled;
pthread_mutex_unlock(&(global->sharedlock));
}
//someone enabled the datarelay, we'd better setup & start reading data.
/** open the device **/
if(-1 == (devfd = open(ifname, O_RDWR | O_NONBLOCK))) {
perror("Unable to open device");
exit(EXIT_FAILURE);
}
usleep(5000);
/** setup descriptors for event polling **/
fds[0].fd = STDIN_FILENO;
fds[0].events = POLLIN;
fds[1].fd = devfd;
fds[1].events = POLLIN;
/** start the recording loop **/
int countdown=0;
while(enabled) {
pthread_mutex_lock(&(global->sharedlock));
enabled = global->datarelayenabled;
pthread_mutex_unlock(&(global->sharedlock));
while(countdown>0) {
retval = poll(fds, 2, POLL_DELAY);
if(0 == retval) {
time(&timer);
tm_info = localtime(&timer);
strftime(buffer,80,"%Y-%m-%d %H:%M:%S", tm_info);
fprintf(stderr, "%s Waiting for ready (%d)...\n", buffer, countdown);
usleep(300);
countdown--;
} else {
countdown=0;
}
}
retval = poll(fds, 2, POLL_DELAY);
if(0 == retval) {
time(&timer);
tm_info = localtime(&timer);
strftime(buffer,80,"%Y-%m-%d %H:%M:%S", tm_info);
fprintf(stderr, "%s Lost signal, restarting device...\n",buffer);
close(devfd);
countdown=5;
if(-1 == (devfd = open(ifname, O_RDWR | O_NONBLOCK))) {
perror("Unable to reopen the device");
exit(EXIT_FAILURE);
} else {
fds[1].fd = devfd;
fds[1].events = POLLIN;
fprintf(stderr,"%s Device reaquired. Usleep for 5k for data\n",buffer);
usleep(5000);
continue;
}
} else if(-1 == retval) {
printf("polling failed\n");
perror("Polling failed");
break;
} else if(fds[0].revents & POLLIN) {
printf("user quit\n");
fprintf(stderr, "User quit\n");
break;
} else if(fds[1].revents & POLLIN) {
nbytes = read(devfd, membuf, MEMBUF_SIZE);
if(0 > nbytes) {
switch(errno) {
case EINTR:
case EAGAIN:{
usleep(2500);
continue;
}
default:
printf("Unknown errno response %d when reading device\n",errno);
perror("Unknown");
exit(EXIT_FAILURE);
}
} else if(MEMBUF_SIZE == nbytes) {
pthread_mutex_lock(&(global->sharedlock));
//if recording.. write out to outfd.
if(global->recording){
ssize_t written = write(global->outfd, membuf, MEMBUF_SIZE);
if(written==-1) {
perror("Error writing to file.");
}
}
//iterate over the output fd's.. set them to -1 if they fail to write.
for(outfdcount=0; outfdcount<(global->datarelayenabled); outfdcount++) {
if(global->outfds[outfdcount]!=-1) {
ssize_t written = write(global->outfds[outfdcount], membuf, MEMBUF_SIZE);
if(written==-1) {
global->outfds[outfdcount]=-1;
} else {
fsync(global->outfds[outfdcount]);
}
}
}
//iterate over the outputfd's.. collapsing the array to move the valids to the front.
int writepos=0;
int currentmax=global->datarelayenabled;
for(outfdcount=0; outfdcount<currentmax; outfdcount++) {
if(global->outfds[outfdcount] != -1) {
if(writepos!=outfdcount) {
//move the data back to the writepos slot, and set self to -1..
global->outfds[writepos] = global->outfds[outfdcount];
global->outfds[outfdcount] = -1;
}
writepos++;
} else {
global->datarelayenabled--;
}
}
pthread_mutex_unlock(&(global->sharedlock));
continue;
} else {
printf("Short read\n");
perror("Short read");
exit(EXIT_FAILURE);
}
} else if(fds[1].revents & POLLERR) {
printf("Pollerr\n");
perror("pollerr");
exit(EXIT_FAILURE);
break;
}
}
/** clean up **/
close(devfd);
}
free(membuf);
return 0;
}
void* SocketHandler(void* lp) {
struct data *datainst = (struct data *)lp;
int csock = datainst->csock; //(int*)lp;
struct shared *global = datainst->data;
char buffer[1024];
int buffer_len = 1024;
int bytecount;
memset(buffer, 0, buffer_len);
if((bytecount = recv(csock, buffer, buffer_len, 0))== -1) {
fprintf(stderr, "Error receiving data %d\n", errno);
goto FINISH;
}
printf("Received bytes %d\nReceived string \"%s\"\n", bytecount, buffer);
strcat(buffer, " SERVER ECHO");
if(strstr(buffer,"GET /startrec HTTP/1.1")) {
char *text="HTTP/1.0 200 OK\nContent-Type: text/plain\n\n Told to Start Recording : Sent at : ";
send(csock,text,strlen(text),0);
int outfd;
char tbuf[80];
time_t timer;
struct tm* tm_info;
time(&timer);
tm_info = localtime(&timer);
strftime(tbuf,80,"%Y-%m-%d %H:%M:%S", tm_info);
send(csock,tbuf,strlen(tbuf),0);
strftime(tbuf,80,"%Y%m%d%H%M%S.ts", tm_info);
/** open the output file **/
if(-1 == (outfd = open(tbuf, O_CREAT | O_RDWR, S_IRWXU | S_IRWXG))) {
perror("Unable to open output file");
} else {
int started=0;
pthread_mutex_lock(&(global->sharedlock));
if(global->recording==0) {
global->outfd=outfd;
global->recording=1;
started=1;
}
pthread_mutex_unlock(&(global->sharedlock));
if(!started) {
char *alreadyrecording="\n\nAlready recording, new request ignored\n";
send(csock,alreadyrecording,strlen(alreadyrecording),0);
} else {
char *nowrecording="\n\nRecording now in progress\n";
send(csock,nowrecording,strlen(nowrecording),0);
}
}
fsync(csock);
close(csock);
printf("Sent bytes %d\n", bytecount);
} else if(strstr(buffer,"GET /stoprec HTTP/1.1")) {
char *text="HTTP/1.0 200 OK\nContent-Type: text/plain\n\n Told to Stop Recording : Sent at : ";
send(csock,text,strlen(text),0);
int outfd;
char tbuf[80];
time_t timer;
struct tm* tm_info;
time(&timer);
tm_info = localtime(&timer);
strftime(tbuf,80,"%Y-%m-%d %H:%M:%S", tm_info);
send(csock,tbuf,strlen(tbuf),0);
int stopped=0;
pthread_mutex_lock(&(global->sharedlock));
if(global->recording==1) {
outfd=global->outfd;
global->outfd=-1;
global->recording=0;
stopped=1;
}
pthread_mutex_unlock(&(global->sharedlock));
if(!stopped) {
char *notrecording="\n\nRecording not in progress, stop request ignored\n";
send(csock,notrecording,strlen(notrecording),0);
} else {
fsync(outfd);
close(outfd);
char *stoppedok="\n\nRecording stopped.\n";
send(csock,stoppedok,strlen(stoppedok),0);
}
fsync(csock);
close(csock);
printf("Sent bytes %d\n", bytecount);
} else if(strstr(buffer,"GET /video HTTP/1.1")) {
if(global->datarelayenabled < 128) {
char *text="HTTP/1.0 200 OK\nContent-Type: video/h264\nSync-Point: no\nPre-roll: no\nMedia-Start: invalid\nMedia-End: invalid\nStream-Start: invalid\nStream-End: invalid\n\n";
send(csock,text,strlen(text),0);
char ofname[32];
time_t curtime;
struct tm *fmttime;
/** set the output file name to time of creation **/
time(&curtime);
fmttime = localtime(&curtime);
strftime(ofname, 32, "%Y%m%d%H%M%S.ts", fmttime);
pthread_mutex_lock(&(global->sharedlock));
global->outfds[global->datarelayenabled]=csock;
global->datarelayenabled++;
pthread_mutex_unlock(&(global->sharedlock));
}
} else {
char *text="HTTP/1.0 200 OK\nContent-Type: text/plain\n\nUnknown URL requested, Response Sent at : ";
send(csock,text,strlen(text),0);
char tbuf[80];
time_t timer;
struct tm* tm_info;
time(&timer);
tm_info = localtime(&timer);
strftime(tbuf,80,"%Y-%m-%d %H:%M:%S", tm_info);
send(csock,tbuf,strlen(tbuf),0);
char *text2="\nOriginal Headers:\n\n";
send(csock,text2,strlen(text2),0);
if((bytecount = send(csock, buffer, strlen(buffer), 0))== -1) {
fprintf(stderr, "Error sending data %d\n", errno);
goto FINISH;
}
fsync(csock);
close(csock);
printf("Sent bytes %d\n", bytecount);
}
FINISH:
free(lp);
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment