Created
June 2, 2013 16:21
-
-
Save anonymous/5694016 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#define _LARGEFILE_SOURCE | |
#define _LARGEFILE64_SOURCE | |
#define _FILE_OFFSET_BITS 64 | |
#include <fcntl.h> | |
#include <string.h> | |
#include <stdlib.h> | |
#include <errno.h> | |
#include <stdio.h> | |
#include <netinet/in.h> | |
#include <resolv.h> | |
#include <sys/socket.h> | |
#include <arpa/inet.h> | |
#include <unistd.h> | |
#include <pthread.h> | |
#include <time.h> | |
#include <sys/types.h> | |
#include <sys/poll.h> | |
#include <signal.h> | |
#include <assert.h> | |
//size to read/write from hdpvr at.. 4k seems ok. | |
#define MEMBUF_SIZE 4096 | |
//how long to wait before giving up and declaring device needs a restart. | |
#define POLL_DELAY 2500 | |
//port to listen on. | |
#define PORT 1101 | |
//max buffers to use in buffered writer, each buffer is MEMBUF_SIZE bytes. | |
#define MAX_BUFFERS 4096 | |
//thread prototype for the connected children. | |
void* SocketHandler(void*); //handles accepted sockets | |
void* StreamHandler(void*); //handles reading from hdpvr and writing to outputs | |
void* DiskIOHandler(void*); //buffered writer, writes data to disk. | |
//data we'll share between all the threads. | |
struct shared { | |
pthread_mutex_t sharedlock; // used to protect the fd array, and datarelayenabled count | |
int datarelayenabled; //no of connected clients. | |
int outfds[16]; | |
char buffer[80]; | |
pthread_mutex_t diskiolock; //used to protect the data buffers and freebuffer arrays, and counts. | |
int recording; | |
int outfd; | |
int buffercount; | |
int freecount; | |
char **data; | |
char **freebuffers; | |
}; | |
//data we'll give to each thread uniquely. | |
struct data { | |
int csock; | |
struct shared *data; //pointer to shared data. | |
}; | |
int main(int argv, char** argc) { | |
signal(SIGPIPE, SIG_IGN); | |
int host_port=PORT; | |
struct sockaddr_in my_addr; | |
int hsock; | |
int * p_int ; | |
socklen_t addr_size = 0; | |
struct sockaddr_in sadr; | |
pthread_t thread_id=0; | |
//init the global data. | |
struct shared *global = (struct shared*)malloc(sizeof(struct shared)); | |
global->datarelayenabled=0; | |
global->recording=0; | |
global->freebuffers = (char **)calloc(MAX_BUFFERS, sizeof(char *)); | |
global->data = (char **)calloc(MAX_BUFFERS, sizeof(char *)); | |
global->buffercount=0; | |
global->freecount=0; | |
pthread_mutex_init(&(global->sharedlock), NULL); | |
pthread_mutex_init(&(global->diskiolock), NULL); | |
//start the disk writing thread, it will sleep until data is ready to write | |
pthread_create(&thread_id,0,&DiskIOHandler, (void*)global ); | |
pthread_detach(thread_id); | |
//start the device read/write thread, it will sleep until there are clients to write to. | |
pthread_create(&thread_id,0,&StreamHandler, (void*)global ); | |
pthread_detach(thread_id); | |
struct data *datainst; | |
hsock = socket(AF_INET, SOCK_STREAM, 0); | |
if(hsock == -1) { | |
printf("Error initializing socket %d\n", errno); | |
goto FINISH; | |
} | |
p_int = (int*)malloc(sizeof(int)); | |
*p_int = 1; | |
if( (setsockopt(hsock, SOL_SOCKET, SO_REUSEADDR, (char*)p_int, sizeof(int)) == -1 )|| | |
(setsockopt(hsock, SOL_SOCKET, SO_KEEPALIVE, (char*)p_int, sizeof(int)) == -1 ) ) { | |
printf("Error setting options %d\n", errno); | |
free(p_int); | |
goto FINISH; | |
} | |
free(p_int); | |
my_addr.sin_family = AF_INET ; | |
my_addr.sin_port = htons(host_port); | |
memset(&(my_addr.sin_zero), 0, 8); | |
my_addr.sin_addr.s_addr = INADDR_ANY ; | |
if( bind( hsock, (struct sockaddr*)&my_addr, sizeof(my_addr)) == -1 ) { | |
fprintf(stderr,"Error binding to socket, make sure nothing else is listening on this port %d\n",errno); | |
goto FINISH; | |
} | |
if(listen( hsock, 10) == -1 ) { | |
fprintf(stderr, "Error listening %d\n",errno); | |
goto FINISH; | |
} | |
//Now lets do the server stuff | |
addr_size = sizeof(struct sockaddr_in); | |
while(1) { | |
char tbuf[80]; | |
time_t timer; | |
struct tm* tm_info; | |
time(&timer); | |
tm_info = localtime(&timer); | |
strftime(tbuf,80,"%Y-%m-%d %H:%M:%S", tm_info); | |
printf("%s Waiting for a connection on port %d \n", tbuf, PORT); | |
//allocate the block we'll send to the thread | |
datainst = (struct data *)malloc(sizeof(struct data)); | |
//hook up our global shared data struct.. | |
datainst->data = global; | |
//get the socket, store into struct. | |
if((datainst->csock = accept( hsock, (struct sockaddr*)&sadr, &addr_size))!= -1) { | |
time(&timer); | |
tm_info = localtime(&timer); | |
strftime(tbuf,80,"%Y-%m-%d %H:%M:%S", tm_info); | |
pthread_mutex_lock(&(global->sharedlock)); | |
printf("%s Received connection from %s\n",tbuf,inet_ntoa(sadr.sin_addr)); | |
pthread_mutex_unlock(&(global->sharedlock)); | |
pthread_create(&thread_id,0,&SocketHandler, (void*)datainst ); | |
pthread_detach(thread_id); | |
} else { | |
fprintf(stderr, "Error accepting %d\n", errno); | |
} | |
} | |
FINISH: | |
free(global); | |
return 0; | |
} | |
void* DiskIOHandler(void* lp) { | |
struct shared *global = (struct shared *)lp; | |
int bufferstowrite=0; | |
int enabled=0; | |
//obtain the initial counts via the appropriate locks. | |
pthread_mutex_lock(&(global->diskiolock)); | |
bufferstowrite = global->buffercount; | |
pthread_mutex_unlock(&(global->diskiolock)); | |
pthread_mutex_lock(&(global->sharedlock)); | |
enabled = global->datarelayenabled; | |
pthread_mutex_unlock(&(global->sharedlock)); | |
while(1){ | |
//if we're not enabled yet, sleep a while, then reget the flag inside the appropriate lock. | |
while(!enabled) { | |
usleep(100); | |
pthread_mutex_lock(&(global->sharedlock)); | |
enabled = global->recording; | |
pthread_mutex_unlock(&(global->sharedlock)); | |
if(enabled){ | |
printf("Writer woken up..\n"); | |
} | |
} | |
//we're now enabled.. until we're not ;p | |
while(enabled){ | |
//update the enabled flag from inside the lock | |
pthread_mutex_lock(&(global->sharedlock)); | |
enabled = global->recording; | |
pthread_mutex_unlock(&(global->sharedlock)); | |
//grab the current buffers to write value. | |
//we release the lock, as only we ever remove buffers, | |
//so we can allow new buffers to be added while we write the ones we have. | |
pthread_mutex_lock(&(global->diskiolock)); | |
bufferstowrite = global->buffercount; | |
pthread_mutex_unlock(&(global->diskiolock)); | |
//write thread might get ahead of read thread.. | |
if(bufferstowrite==0){ | |
//just means we're still enabled, but no buffers have been added yet.. | |
//wait a bit.. maybe more buffers will come =) | |
sleep(1); | |
}else{ | |
//we have buffers to write.. write out as many as we know there are | |
//(there may be more to write by now.. if so we deal with them next loop) | |
int currentBuffer=0; | |
for(currentBuffer=0; currentBuffer<bufferstowrite; currentBuffer++){ | |
char *bufferToWrite = global->data[currentBuffer]; | |
ssize_t written = write(global->outfd, bufferToWrite, MEMBUF_SIZE); | |
if(written==-1) { | |
perror("Error writing to file."); | |
} | |
} | |
//we've written buffers.. | |
// so now we move the buffers still to write to the front of the write array | |
// and move the buffers we've written across to the end of the free array | |
//lock to protect the arrays while we nobble the data. | |
pthread_mutex_lock(&(global->diskiolock)); | |
//reduce by no of buffers written | |
global->buffercount -= bufferstowrite; | |
//move the current buffers onto the end of the free array where they can be reused. | |
memcpy(&(global->freebuffers[global->freecount]), global->data, (bufferstowrite * sizeof(char *))); | |
global->freecount += bufferstowrite; | |
//if buffers were incremented while we were writing.. move the pointers along a bit | |
//so the reads can process from the start again.. | |
if(global->buffercount >0 ){ | |
memcpy(global->data, &(global->data[bufferstowrite]), global->buffercount * sizeof(char *) ); | |
} | |
//all done =) free array is added to, write array removed from, counts adjusted, release lock. | |
pthread_mutex_unlock(&(global->diskiolock)); | |
} | |
} | |
} | |
} | |
//Device read thread.. pulls data from device, writes it to sockets / buffered writer. | |
void* StreamHandler(void* lp) { | |
struct shared *global = (struct shared *)lp; | |
int devfd; | |
char *ifname; | |
struct pollfd fds[2]; | |
void *membuf; | |
ssize_t nbytes; | |
time_t timer; | |
char buffer[80]; | |
struct tm* tm_info; | |
int outfdcount; | |
int retval; | |
ifname = "/dev/video0"; //TODO: make this an arg ;p | |
if(NULL == (membuf = malloc(MEMBUF_SIZE))) { | |
printf("Not enough memory to allocate buffer\n"); | |
fprintf(stderr, "Not enough memory\n"); | |
exit(EXIT_FAILURE); | |
} | |
while(1) { | |
int enabled=0; | |
pthread_mutex_lock(&(global->sharedlock)); | |
enabled = global->datarelayenabled; | |
pthread_mutex_unlock(&(global->sharedlock)); | |
while(!enabled) { | |
sleep(1); | |
pthread_mutex_lock(&(global->sharedlock)); | |
enabled = global->datarelayenabled; | |
pthread_mutex_unlock(&(global->sharedlock)); | |
} | |
//someone enabled the datarelay, we'd better setup & start reading data. | |
/** open the device **/ | |
if(-1 == (devfd = open(ifname, O_RDWR | O_NONBLOCK))) { | |
perror("Unable to open device"); | |
exit(EXIT_FAILURE); | |
} | |
usleep(5000); | |
/** setup descriptors for event polling **/ | |
fds[0].fd = STDIN_FILENO; | |
fds[0].events = POLLIN; | |
fds[1].fd = devfd; | |
fds[1].events = POLLIN; | |
/** start the recording loop **/ | |
int countdown=0; | |
while(enabled) { | |
pthread_mutex_lock(&(global->sharedlock)); | |
enabled = global->datarelayenabled + global->recording; | |
pthread_mutex_unlock(&(global->sharedlock)); | |
while(countdown>0) { | |
retval = poll(fds, 2, POLL_DELAY); | |
if(0 == retval) { | |
time(&timer); | |
tm_info = localtime(&timer); | |
strftime(buffer,80,"%Y-%m-%d %H:%M:%S", tm_info); | |
fprintf(stderr, "%s Waiting for ready (%d)...\n", buffer, countdown); | |
usleep(300); | |
countdown--; | |
} else { | |
countdown=0; | |
} | |
} | |
retval = poll(fds, 2, POLL_DELAY); | |
if(0 == retval) { | |
time(&timer); | |
tm_info = localtime(&timer); | |
strftime(buffer,80,"%Y-%m-%d %H:%M:%S", tm_info); | |
fprintf(stderr, "%s Lost signal, restarting device...\n",buffer); | |
close(devfd); | |
countdown=5; | |
if(-1 == (devfd = open(ifname, O_RDWR | O_NONBLOCK))) { | |
perror("Unable to reopen the device"); | |
exit(EXIT_FAILURE); | |
} else { | |
fds[1].fd = devfd; | |
fds[1].events = POLLIN; | |
fprintf(stderr,"%s Device reaquired. Usleep for 5k for data\n",buffer); | |
usleep(5000); | |
continue; | |
} | |
} else if(-1 == retval) { | |
printf("polling failed\n"); | |
perror("Polling failed"); | |
break; | |
} else if(fds[0].revents & POLLIN) { | |
printf("user quit\n"); | |
fprintf(stderr, "User quit\n"); | |
break; | |
} else if(fds[1].revents & POLLIN) { | |
nbytes = read(devfd, membuf, MEMBUF_SIZE); | |
if(0 > nbytes) { | |
switch(errno) { | |
case EINTR: | |
case EAGAIN:{ | |
usleep(2500); | |
continue; | |
} | |
default: | |
printf("Unknown errno response %d when reading device\n",errno); | |
perror("Unknown"); | |
exit(EXIT_FAILURE); | |
} | |
} else if(MEMBUF_SIZE == nbytes) { | |
pthread_mutex_lock(&(global->sharedlock)); | |
//if recording.. write out to outfd. | |
if(global->recording){ | |
//we're about to alter the write/free buffer arrays, so we need this lock. | |
pthread_mutex_lock(&(global->diskiolock)); | |
char *bufToUse = NULL; | |
//do we have space? | |
if(!(global->buffercount<MAX_BUFFERS)){ | |
//this will be a file corruption scenario, as the buffer MAY already be being written from. | |
//plus the data in this last buffer is overwritten and lost forever. | |
//in an ideal world, the write thread will catch up, and start freeing buffers, | |
//and we'll just lose a chunk of the ts. | |
perror("Out of write buffers!! - reusing last buffer.. "); | |
bufToUse = global->data[(global->buffercount)-1]; | |
//reduce by 1, as later we re-increment it. | |
global->buffercount--; | |
} | |
//any buffers we can reuse? | |
if(bufToUse==NULL && global->freecount>0){ | |
global->data[global->buffercount] = global->freebuffers[global->freecount -1]; | |
bufToUse = global->data[global->buffercount]; | |
//this buffer will no longer be free.. | |
global->freecount--; | |
} | |
//no buffer yet, but if we have space, we can make one.. | |
if(bufToUse==NULL && (global->buffercount + global->freecount)<MAX_BUFFERS){ | |
if(NULL == (bufToUse = malloc(MEMBUF_SIZE))) { | |
printf("Not enough memory to allocate buffer (%d)\n",(global->freecount+global->buffercount)); | |
fprintf(stderr, "Not enough memory\n"); | |
exit(EXIT_FAILURE); | |
} | |
global->data[global->buffercount] = bufToUse; | |
} | |
//at this stage, we pretty much have to have a buffer.. right? | |
assert(bufToUse!=NULL); | |
//copy the data from the read buffer to the selected write buffer. | |
memcpy(bufToUse, membuf, MEMBUF_SIZE); | |
//bump the write counter, to say theres a new buffer to write. | |
global->buffercount++; | |
//all done playing with the buffers.. release the lock. | |
pthread_mutex_unlock(&(global->diskiolock)); | |
} | |
//thats the file taken care of.. now lets handle pushing data out to the socket clients. | |
//iterate over the output fd's.. set them to -1 if they fail to write. | |
for(outfdcount=0; outfdcount<(global->datarelayenabled); outfdcount++) { | |
if(global->outfds[outfdcount]!=-1) { | |
ssize_t written = write(global->outfds[outfdcount], membuf, MEMBUF_SIZE); | |
if(written==-1) { | |
global->outfds[outfdcount]=-1; | |
} else { | |
fsync(global->outfds[outfdcount]); | |
} | |
} | |
} | |
//we're still holding the global lock.. so we can manipulate the datarelayenabledcount, | |
//and move the contents of the outfds array around without fear of a new client corrupting us. | |
//iterate over the outputfd's.. collapsing the array to move the valids to the front. | |
int writepos=0; | |
int currentmax=global->datarelayenabled; | |
for(outfdcount=0; outfdcount<currentmax; outfdcount++) { | |
if(global->outfds[outfdcount] != -1) { | |
if(writepos!=outfdcount) { | |
//move the data back to the writepos slot, and set self to -1.. | |
global->outfds[writepos] = global->outfds[outfdcount]; | |
global->outfds[outfdcount] = -1; | |
} | |
writepos++; | |
} else { | |
global->datarelayenabled--; | |
} | |
} | |
pthread_mutex_unlock(&(global->sharedlock)); | |
continue; | |
} else { | |
printf("Short read\n"); | |
perror("Short read"); | |
exit(EXIT_FAILURE); | |
} | |
} else if(fds[1].revents & POLLERR) { | |
printf("Pollerr\n"); | |
perror("pollerr"); | |
exit(EXIT_FAILURE); | |
break; | |
} | |
} | |
/** clean up **/ | |
close(devfd); | |
} | |
free(membuf); | |
return 0; | |
} | |
//the ever so slightly optimistic http request handler; | |
void* SocketHandler(void* lp) { | |
struct data *datainst = (struct data *)lp; | |
int csock = datainst->csock; //(int*)lp; | |
struct shared *global = datainst->data; | |
char buffer[1024]; | |
int buffer_len = 1024; | |
int bytecount; | |
memset(buffer, 0, buffer_len); | |
if((bytecount = recv(csock, buffer, buffer_len, 0))== -1) { | |
fprintf(stderr, "Error receiving data %d\n", errno); | |
goto FINISH; | |
} | |
//from here, we just use strstr to check for GET strings.. | |
if(strstr(buffer,"GET /startrec HTTP/1.1")) { | |
char *text="HTTP/1.0 200 OK\nContent-Type: text/plain\n\n Told to Start Recording : Sent at : "; | |
send(csock,text,strlen(text),0); | |
int outfd; | |
char tbuf[80]; | |
time_t timer; | |
struct tm* tm_info; | |
time(&timer); | |
tm_info = localtime(&timer); | |
strftime(tbuf,80,"%Y-%m-%d %H:%M:%S", tm_info); | |
send(csock,tbuf,strlen(tbuf),0); | |
printf("%s Start recording request...\n",tbuf); | |
strftime(tbuf,80,"%Y%m%d%H%M%S.ts", tm_info); | |
/** open the output file **/ | |
if(-1 == (outfd = open(tbuf, O_CREAT | O_RDWR, S_IRWXU | S_IRWXG))) { | |
perror("Unable to open output file"); | |
} else { | |
int started=0; | |
pthread_mutex_lock(&(global->sharedlock)); | |
if(global->recording==0) { | |
global->outfd=outfd; | |
global->recording=1; | |
started=1; | |
} | |
pthread_mutex_unlock(&(global->sharedlock)); | |
if(!started) { | |
char *alreadyrecording="\n\nAlready recording, new request ignored\n"; | |
send(csock,alreadyrecording,strlen(alreadyrecording),0); | |
} else { | |
char *nowrecording="\n\nRecording now in progress\n"; | |
send(csock,nowrecording,strlen(nowrecording),0); | |
} | |
} | |
fsync(csock); | |
close(csock); | |
//printf("Sent bytes %d\n", bytecount); | |
} else if(strstr(buffer,"GET /stoprec HTTP/1.1")) { | |
char *text="HTTP/1.0 200 OK\nContent-Type: text/plain\n\n Told to Stop Recording : Sent at : "; | |
send(csock,text,strlen(text),0); | |
int outfd; | |
char tbuf[80]; | |
time_t timer; | |
struct tm* tm_info; | |
time(&timer); | |
tm_info = localtime(&timer); | |
strftime(tbuf,80,"%Y-%m-%d %H:%M:%S", tm_info); | |
send(csock,tbuf,strlen(tbuf),0); | |
printf("%s Start recording request...\n",tbuf); | |
int stopped=0; | |
pthread_mutex_lock(&(global->sharedlock)); | |
if(global->recording==1) { | |
outfd=global->outfd; | |
global->outfd=-1; | |
global->recording=0; | |
stopped=1; | |
} | |
pthread_mutex_unlock(&(global->sharedlock)); | |
if(!stopped) { | |
char *notrecording="\n\nRecording not in progress, stop request ignored\n"; | |
send(csock,notrecording,strlen(notrecording),0); | |
} else { | |
fsync(outfd); | |
close(outfd); | |
char *stoppedok="\n\nRecording stopped.\n"; | |
send(csock,stoppedok,strlen(stoppedok),0); | |
} | |
fsync(csock); | |
close(csock); | |
//printf("Sent bytes %d\n", bytecount); | |
} else if(strstr(buffer,"GET /video HTTP/1.1")) { | |
if(global->datarelayenabled < 128) { | |
char *text="HTTP/1.0 200 OK\nContent-Type: video/h264\nSync-Point: no\nPre-roll: no\nMedia-Start: invalid\nMedia-End: invalid\nStream-Start: invalid\nStream-End: invalid\n\n"; | |
send(csock,text,strlen(text),0); | |
char ofname[80]; | |
time_t curtime; | |
struct tm *fmttime; | |
/** set the output file name to time of creation **/ | |
time(&curtime); | |
fmttime = localtime(&curtime); | |
strftime(ofname, 80, "%Y-%m-%d %H:%M:%S", fmttime); | |
printf("%s Start streaming request...\n",ofname); | |
pthread_mutex_lock(&(global->sharedlock)); | |
global->outfds[global->datarelayenabled]=csock; | |
global->datarelayenabled++; | |
pthread_mutex_unlock(&(global->sharedlock)); | |
} | |
} else if(strstr(buffer,"GET /status HTTP/1.1")) { | |
char *text="HTTP/1.0 200 OK\nContent-Type: text/plain\n\n Status at : "; | |
send(csock,text,strlen(text),0); | |
char tbuf[80]; | |
time_t timer; | |
struct tm* tm_info; | |
time(&timer); | |
tm_info = localtime(&timer); | |
strftime(tbuf,80,"%Y-%m-%d %H:%M:%S", tm_info); | |
send(csock,tbuf,strlen(tbuf),0); | |
printf("%s Status request...\n",tbuf); | |
int freebuffers = 0; | |
int databuffers = 0; | |
int noofconnections = 0; | |
int recording = 0; | |
pthread_mutex_lock(&(global->sharedlock)); | |
noofconnections = global->datarelayenabled; | |
recording = global->recording; | |
pthread_mutex_unlock(&(global->sharedlock)); | |
pthread_mutex_lock(&(global->diskiolock)); | |
freebuffers = global->freecount; | |
databuffers = global->buffercount; | |
pthread_mutex_unlock(&(global->diskiolock)); | |
snprintf(tbuf,80,"\nNo of Connections : %d\n",noofconnections); | |
send(csock,tbuf,strlen(tbuf),0); | |
snprintf(tbuf,80, "Recording? : %d\n",recording); | |
send(csock,tbuf,strlen(tbuf),0); | |
snprintf(tbuf,80, "Free Buffer Count : %d\n",freebuffers); | |
send(csock,tbuf,strlen(tbuf),0); | |
snprintf(tbuf,80, "Data Buffer Count : %d\n",databuffers); | |
send(csock,tbuf,strlen(tbuf),0); | |
snprintf(tbuf,80, "Total Buffer Usage: %d\n",databuffers+freebuffers); | |
send(csock,tbuf,strlen(tbuf),0); | |
fsync(csock); | |
close(csock); | |
} else { | |
//TODO: ignore favicon.ico requests.. they get a little pointless.. | |
//TODO: extract the GET url requested, to log it to console. | |
char *text="HTTP/1.0 200 OK\nContent-Type: text/plain\n\nUnknown URL requested, Response Sent at : "; | |
send(csock,text,strlen(text),0); | |
char tbuf[80]; | |
time_t timer; | |
struct tm* tm_info; | |
time(&timer); | |
tm_info = localtime(&timer); | |
strftime(tbuf,80,"%Y-%m-%d %H:%M:%S", tm_info); | |
send(csock,tbuf,strlen(tbuf),0); | |
printf("%s Unknown URL request\n",tbuf); | |
char *text2="\nOriginal Headers:\n\n"; | |
send(csock,text2,strlen(text2),0); | |
if((bytecount = send(csock, buffer, strlen(buffer), 0))== -1) { | |
fprintf(stderr, "Error sending data %d\n", errno); | |
goto FINISH; | |
} | |
fsync(csock); | |
close(csock); | |
} | |
FINISH: | |
free(lp); | |
return 0; | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment