Create a gist now

Instantly share code, notes, and snippets.

anonymous /hdpvr.c
Created Jun 2, 2013

#define _LARGEFILE_SOURCE
#define _LARGEFILE64_SOURCE
#define _FILE_OFFSET_BITS 64
#include <fcntl.h>
#include <string.h>
#include <stdlib.h>
#include <errno.h>
#include <stdio.h>
#include <netinet/in.h>
#include <resolv.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <pthread.h>
#include <time.h>
#include <sys/types.h>
#include <sys/poll.h>
#include <signal.h>
#include <assert.h>
//size to read/write from hdpvr at.. 4k seems ok.
#define MEMBUF_SIZE 4096
//how long to wait before giving up and declaring device needs a restart.
#define POLL_DELAY 2500
//port to listen on.
#define PORT 1101
//max buffers to use in buffered writer, each buffer is MEMBUF_SIZE bytes.
#define MAX_BUFFERS 4096
//thread prototype for the connected children.
void* SocketHandler(void*); //handles accepted sockets
void* StreamHandler(void*); //handles reading from hdpvr and writing to outputs
void* DiskIOHandler(void*); //buffered writer, writes data to disk.
//data we'll share between all the threads.
struct shared {
pthread_mutex_t sharedlock; // used to protect the fd array, and datarelayenabled count
int datarelayenabled; //no of connected clients.
int outfds[16];
char buffer[80];
pthread_mutex_t diskiolock; //used to protect the data buffers and freebuffer arrays, and counts.
int recording;
int outfd;
int buffercount;
int freecount;
char **data;
char **freebuffers;
};
//data we'll give to each thread uniquely.
struct data {
int csock;
struct shared *data; //pointer to shared data.
};
int main(int argv, char** argc) {
signal(SIGPIPE, SIG_IGN);
int host_port=PORT;
struct sockaddr_in my_addr;
int hsock;
int * p_int ;
socklen_t addr_size = 0;
struct sockaddr_in sadr;
pthread_t thread_id=0;
//init the global data.
struct shared *global = (struct shared*)malloc(sizeof(struct shared));
global->datarelayenabled=0;
global->recording=0;
global->freebuffers = (char **)calloc(MAX_BUFFERS, sizeof(char *));
global->data = (char **)calloc(MAX_BUFFERS, sizeof(char *));
global->buffercount=0;
global->freecount=0;
pthread_mutex_init(&(global->sharedlock), NULL);
pthread_mutex_init(&(global->diskiolock), NULL);
//start the disk writing thread, it will sleep until data is ready to write
pthread_create(&thread_id,0,&DiskIOHandler, (void*)global );
pthread_detach(thread_id);
//start the device read/write thread, it will sleep until there are clients to write to.
pthread_create(&thread_id,0,&StreamHandler, (void*)global );
pthread_detach(thread_id);
struct data *datainst;
hsock = socket(AF_INET, SOCK_STREAM, 0);
if(hsock == -1) {
printf("Error initializing socket %d\n", errno);
goto FINISH;
}
p_int = (int*)malloc(sizeof(int));
*p_int = 1;
if( (setsockopt(hsock, SOL_SOCKET, SO_REUSEADDR, (char*)p_int, sizeof(int)) == -1 )||
(setsockopt(hsock, SOL_SOCKET, SO_KEEPALIVE, (char*)p_int, sizeof(int)) == -1 ) ) {
printf("Error setting options %d\n", errno);
free(p_int);
goto FINISH;
}
free(p_int);
my_addr.sin_family = AF_INET ;
my_addr.sin_port = htons(host_port);
memset(&(my_addr.sin_zero), 0, 8);
my_addr.sin_addr.s_addr = INADDR_ANY ;
if( bind( hsock, (struct sockaddr*)&my_addr, sizeof(my_addr)) == -1 ) {
fprintf(stderr,"Error binding to socket, make sure nothing else is listening on this port %d\n",errno);
goto FINISH;
}
if(listen( hsock, 10) == -1 ) {
fprintf(stderr, "Error listening %d\n",errno);
goto FINISH;
}
//Now lets do the server stuff
addr_size = sizeof(struct sockaddr_in);
while(1) {
char tbuf[80];
time_t timer;
struct tm* tm_info;
time(&timer);
tm_info = localtime(&timer);
strftime(tbuf,80,"%Y-%m-%d %H:%M:%S", tm_info);
printf("%s Waiting for a connection on port %d \n", tbuf, PORT);
//allocate the block we'll send to the thread
datainst = (struct data *)malloc(sizeof(struct data));
//hook up our global shared data struct..
datainst->data = global;
//get the socket, store into struct.
if((datainst->csock = accept( hsock, (struct sockaddr*)&sadr, &addr_size))!= -1) {
time(&timer);
tm_info = localtime(&timer);
strftime(tbuf,80,"%Y-%m-%d %H:%M:%S", tm_info);
pthread_mutex_lock(&(global->sharedlock));
printf("%s Received connection from %s\n",tbuf,inet_ntoa(sadr.sin_addr));
pthread_mutex_unlock(&(global->sharedlock));
pthread_create(&thread_id,0,&SocketHandler, (void*)datainst );
pthread_detach(thread_id);
} else {
fprintf(stderr, "Error accepting %d\n", errno);
}
}
FINISH:
free(global);
return 0;
}
void* DiskIOHandler(void* lp) {
struct shared *global = (struct shared *)lp;
int bufferstowrite=0;
int enabled=0;
//obtain the initial counts via the appropriate locks.
pthread_mutex_lock(&(global->diskiolock));
bufferstowrite = global->buffercount;
pthread_mutex_unlock(&(global->diskiolock));
pthread_mutex_lock(&(global->sharedlock));
enabled = global->datarelayenabled;
pthread_mutex_unlock(&(global->sharedlock));
while(1){
//if we're not enabled yet, sleep a while, then reget the flag inside the appropriate lock.
while(!enabled) {
usleep(100);
pthread_mutex_lock(&(global->sharedlock));
enabled = global->recording;
pthread_mutex_unlock(&(global->sharedlock));
if(enabled){
printf("Writer woken up..\n");
}
}
//we're now enabled.. until we're not ;p
while(enabled){
//update the enabled flag from inside the lock
pthread_mutex_lock(&(global->sharedlock));
enabled = global->recording;
pthread_mutex_unlock(&(global->sharedlock));
//grab the current buffers to write value.
//we release the lock, as only we ever remove buffers,
//so we can allow new buffers to be added while we write the ones we have.
pthread_mutex_lock(&(global->diskiolock));
bufferstowrite = global->buffercount;
pthread_mutex_unlock(&(global->diskiolock));
//write thread might get ahead of read thread..
if(bufferstowrite==0){
//just means we're still enabled, but no buffers have been added yet..
//wait a bit.. maybe more buffers will come =)
sleep(1);
}else{
//we have buffers to write.. write out as many as we know there are
//(there may be more to write by now.. if so we deal with them next loop)
int currentBuffer=0;
for(currentBuffer=0; currentBuffer<bufferstowrite; currentBuffer++){
char *bufferToWrite = global->data[currentBuffer];
ssize_t written = write(global->outfd, bufferToWrite, MEMBUF_SIZE);
if(written==-1) {
perror("Error writing to file.");
}
}
//we've written buffers..
// so now we move the buffers still to write to the front of the write array
// and move the buffers we've written across to the end of the free array
//lock to protect the arrays while we nobble the data.
pthread_mutex_lock(&(global->diskiolock));
//reduce by no of buffers written
global->buffercount -= bufferstowrite;
//move the current buffers onto the end of the free array where they can be reused.
memcpy(&(global->freebuffers[global->freecount]), global->data, (bufferstowrite * sizeof(char *)));
global->freecount += bufferstowrite;
//if buffers were incremented while we were writing.. move the pointers along a bit
//so the reads can process from the start again..
if(global->buffercount >0 ){
memcpy(global->data, &(global->data[bufferstowrite]), global->buffercount * sizeof(char *) );
}
//all done =) free array is added to, write array removed from, counts adjusted, release lock.
pthread_mutex_unlock(&(global->diskiolock));
}
}
}
}
//Device read thread.. pulls data from device, writes it to sockets / buffered writer.
void* StreamHandler(void* lp) {
struct shared *global = (struct shared *)lp;
int devfd;
char *ifname;
struct pollfd fds[2];
void *membuf;
ssize_t nbytes;
time_t timer;
char buffer[80];
struct tm* tm_info;
int outfdcount;
int retval;
ifname = "/dev/video0"; //TODO: make this an arg ;p
if(NULL == (membuf = malloc(MEMBUF_SIZE))) {
printf("Not enough memory to allocate buffer\n");
fprintf(stderr, "Not enough memory\n");
exit(EXIT_FAILURE);
}
while(1) {
int enabled=0;
pthread_mutex_lock(&(global->sharedlock));
enabled = global->datarelayenabled;
pthread_mutex_unlock(&(global->sharedlock));
while(!enabled) {
sleep(1);
pthread_mutex_lock(&(global->sharedlock));
enabled = global->datarelayenabled;
pthread_mutex_unlock(&(global->sharedlock));
}
//someone enabled the datarelay, we'd better setup & start reading data.
/** open the device **/
if(-1 == (devfd = open(ifname, O_RDWR | O_NONBLOCK))) {
perror("Unable to open device");
exit(EXIT_FAILURE);
}
usleep(5000);
/** setup descriptors for event polling **/
fds[0].fd = STDIN_FILENO;
fds[0].events = POLLIN;
fds[1].fd = devfd;
fds[1].events = POLLIN;
/** start the recording loop **/
int countdown=0;
while(enabled) {
pthread_mutex_lock(&(global->sharedlock));
enabled = global->datarelayenabled + global->recording;
pthread_mutex_unlock(&(global->sharedlock));
while(countdown>0) {
retval = poll(fds, 2, POLL_DELAY);
if(0 == retval) {
time(&timer);
tm_info = localtime(&timer);
strftime(buffer,80,"%Y-%m-%d %H:%M:%S", tm_info);
fprintf(stderr, "%s Waiting for ready (%d)...\n", buffer, countdown);
usleep(300);
countdown--;
} else {
countdown=0;
}
}
retval = poll(fds, 2, POLL_DELAY);
if(0 == retval) {
time(&timer);
tm_info = localtime(&timer);
strftime(buffer,80,"%Y-%m-%d %H:%M:%S", tm_info);
fprintf(stderr, "%s Lost signal, restarting device...\n",buffer);
close(devfd);
countdown=5;
if(-1 == (devfd = open(ifname, O_RDWR | O_NONBLOCK))) {
perror("Unable to reopen the device");
exit(EXIT_FAILURE);
} else {
fds[1].fd = devfd;
fds[1].events = POLLIN;
fprintf(stderr,"%s Device reaquired. Usleep for 5k for data\n",buffer);
usleep(5000);
continue;
}
} else if(-1 == retval) {
printf("polling failed\n");
perror("Polling failed");
break;
} else if(fds[0].revents & POLLIN) {
printf("user quit\n");
fprintf(stderr, "User quit\n");
break;
} else if(fds[1].revents & POLLIN) {
nbytes = read(devfd, membuf, MEMBUF_SIZE);
if(0 > nbytes) {
switch(errno) {
case EINTR:
case EAGAIN:{
usleep(2500);
continue;
}
default:
printf("Unknown errno response %d when reading device\n",errno);
perror("Unknown");
exit(EXIT_FAILURE);
}
} else if(MEMBUF_SIZE == nbytes) {
pthread_mutex_lock(&(global->sharedlock));
//if recording.. write out to outfd.
if(global->recording){
//we're about to alter the write/free buffer arrays, so we need this lock.
pthread_mutex_lock(&(global->diskiolock));
char *bufToUse = NULL;
//do we have space?
if(!(global->buffercount<MAX_BUFFERS)){
//this will be a file corruption scenario, as the buffer MAY already be being written from.
//plus the data in this last buffer is overwritten and lost forever.
//in an ideal world, the write thread will catch up, and start freeing buffers,
//and we'll just lose a chunk of the ts.
perror("Out of write buffers!! - reusing last buffer.. ");
bufToUse = global->data[(global->buffercount)-1];
//reduce by 1, as later we re-increment it.
global->buffercount--;
}
//any buffers we can reuse?
if(bufToUse==NULL && global->freecount>0){
global->data[global->buffercount] = global->freebuffers[global->freecount -1];
bufToUse = global->data[global->buffercount];
//this buffer will no longer be free..
global->freecount--;
}
//no buffer yet, but if we have space, we can make one..
if(bufToUse==NULL && (global->buffercount + global->freecount)<MAX_BUFFERS){
if(NULL == (bufToUse = malloc(MEMBUF_SIZE))) {
printf("Not enough memory to allocate buffer (%d)\n",(global->freecount+global->buffercount));
fprintf(stderr, "Not enough memory\n");
exit(EXIT_FAILURE);
}
global->data[global->buffercount] = bufToUse;
}
//at this stage, we pretty much have to have a buffer.. right?
assert(bufToUse!=NULL);
//copy the data from the read buffer to the selected write buffer.
memcpy(bufToUse, membuf, MEMBUF_SIZE);
//bump the write counter, to say theres a new buffer to write.
global->buffercount++;
//all done playing with the buffers.. release the lock.
pthread_mutex_unlock(&(global->diskiolock));
}
//thats the file taken care of.. now lets handle pushing data out to the socket clients.
//iterate over the output fd's.. set them to -1 if they fail to write.
for(outfdcount=0; outfdcount<(global->datarelayenabled); outfdcount++) {
if(global->outfds[outfdcount]!=-1) {
ssize_t written = write(global->outfds[outfdcount], membuf, MEMBUF_SIZE);
if(written==-1) {
global->outfds[outfdcount]=-1;
} else {
fsync(global->outfds[outfdcount]);
}
}
}
//we're still holding the global lock.. so we can manipulate the datarelayenabledcount,
//and move the contents of the outfds array around without fear of a new client corrupting us.
//iterate over the outputfd's.. collapsing the array to move the valids to the front.
int writepos=0;
int currentmax=global->datarelayenabled;
for(outfdcount=0; outfdcount<currentmax; outfdcount++) {
if(global->outfds[outfdcount] != -1) {
if(writepos!=outfdcount) {
//move the data back to the writepos slot, and set self to -1..
global->outfds[writepos] = global->outfds[outfdcount];
global->outfds[outfdcount] = -1;
}
writepos++;
} else {
global->datarelayenabled--;
}
}
pthread_mutex_unlock(&(global->sharedlock));
continue;
} else {
printf("Short read\n");
perror("Short read");
exit(EXIT_FAILURE);
}
} else if(fds[1].revents & POLLERR) {
printf("Pollerr\n");
perror("pollerr");
exit(EXIT_FAILURE);
break;
}
}
/** clean up **/
close(devfd);
}
free(membuf);
return 0;
}
//the ever so slightly optimistic http request handler;
void* SocketHandler(void* lp) {
struct data *datainst = (struct data *)lp;
int csock = datainst->csock; //(int*)lp;
struct shared *global = datainst->data;
char buffer[1024];
int buffer_len = 1024;
int bytecount;
memset(buffer, 0, buffer_len);
if((bytecount = recv(csock, buffer, buffer_len, 0))== -1) {
fprintf(stderr, "Error receiving data %d\n", errno);
goto FINISH;
}
//from here, we just use strstr to check for GET strings..
if(strstr(buffer,"GET /startrec HTTP/1.1")) {
char *text="HTTP/1.0 200 OK\nContent-Type: text/plain\n\n Told to Start Recording : Sent at : ";
send(csock,text,strlen(text),0);
int outfd;
char tbuf[80];
time_t timer;
struct tm* tm_info;
time(&timer);
tm_info = localtime(&timer);
strftime(tbuf,80,"%Y-%m-%d %H:%M:%S", tm_info);
send(csock,tbuf,strlen(tbuf),0);
printf("%s Start recording request...\n",tbuf);
strftime(tbuf,80,"%Y%m%d%H%M%S.ts", tm_info);
/** open the output file **/
if(-1 == (outfd = open(tbuf, O_CREAT | O_RDWR, S_IRWXU | S_IRWXG))) {
perror("Unable to open output file");
} else {
int started=0;
pthread_mutex_lock(&(global->sharedlock));
if(global->recording==0) {
global->outfd=outfd;
global->recording=1;
started=1;
}
pthread_mutex_unlock(&(global->sharedlock));
if(!started) {
char *alreadyrecording="\n\nAlready recording, new request ignored\n";
send(csock,alreadyrecording,strlen(alreadyrecording),0);
} else {
char *nowrecording="\n\nRecording now in progress\n";
send(csock,nowrecording,strlen(nowrecording),0);
}
}
fsync(csock);
close(csock);
//printf("Sent bytes %d\n", bytecount);
} else if(strstr(buffer,"GET /stoprec HTTP/1.1")) {
char *text="HTTP/1.0 200 OK\nContent-Type: text/plain\n\n Told to Stop Recording : Sent at : ";
send(csock,text,strlen(text),0);
int outfd;
char tbuf[80];
time_t timer;
struct tm* tm_info;
time(&timer);
tm_info = localtime(&timer);
strftime(tbuf,80,"%Y-%m-%d %H:%M:%S", tm_info);
send(csock,tbuf,strlen(tbuf),0);
printf("%s Start recording request...\n",tbuf);
int stopped=0;
pthread_mutex_lock(&(global->sharedlock));
if(global->recording==1) {
outfd=global->outfd;
global->outfd=-1;
global->recording=0;
stopped=1;
}
pthread_mutex_unlock(&(global->sharedlock));
if(!stopped) {
char *notrecording="\n\nRecording not in progress, stop request ignored\n";
send(csock,notrecording,strlen(notrecording),0);
} else {
fsync(outfd);
close(outfd);
char *stoppedok="\n\nRecording stopped.\n";
send(csock,stoppedok,strlen(stoppedok),0);
}
fsync(csock);
close(csock);
//printf("Sent bytes %d\n", bytecount);
} else if(strstr(buffer,"GET /video HTTP/1.1")) {
if(global->datarelayenabled < 128) {
char *text="HTTP/1.0 200 OK\nContent-Type: video/h264\nSync-Point: no\nPre-roll: no\nMedia-Start: invalid\nMedia-End: invalid\nStream-Start: invalid\nStream-End: invalid\n\n";
send(csock,text,strlen(text),0);
char ofname[80];
time_t curtime;
struct tm *fmttime;
/** set the output file name to time of creation **/
time(&curtime);
fmttime = localtime(&curtime);
strftime(ofname, 80, "%Y-%m-%d %H:%M:%S", fmttime);
printf("%s Start streaming request...\n",ofname);
pthread_mutex_lock(&(global->sharedlock));
global->outfds[global->datarelayenabled]=csock;
global->datarelayenabled++;
pthread_mutex_unlock(&(global->sharedlock));
}
} else if(strstr(buffer,"GET /status HTTP/1.1")) {
char *text="HTTP/1.0 200 OK\nContent-Type: text/plain\n\n Status at : ";
send(csock,text,strlen(text),0);
char tbuf[80];
time_t timer;
struct tm* tm_info;
time(&timer);
tm_info = localtime(&timer);
strftime(tbuf,80,"%Y-%m-%d %H:%M:%S", tm_info);
send(csock,tbuf,strlen(tbuf),0);
printf("%s Status request...\n",tbuf);
int freebuffers = 0;
int databuffers = 0;
int noofconnections = 0;
int recording = 0;
pthread_mutex_lock(&(global->sharedlock));
noofconnections = global->datarelayenabled;
recording = global->recording;
pthread_mutex_unlock(&(global->sharedlock));
pthread_mutex_lock(&(global->diskiolock));
freebuffers = global->freecount;
databuffers = global->buffercount;
pthread_mutex_unlock(&(global->diskiolock));
snprintf(tbuf,80,"\nNo of Connections : %d\n",noofconnections);
send(csock,tbuf,strlen(tbuf),0);
snprintf(tbuf,80, "Recording? : %d\n",recording);
send(csock,tbuf,strlen(tbuf),0);
snprintf(tbuf,80, "Free Buffer Count : %d\n",freebuffers);
send(csock,tbuf,strlen(tbuf),0);
snprintf(tbuf,80, "Data Buffer Count : %d\n",databuffers);
send(csock,tbuf,strlen(tbuf),0);
snprintf(tbuf,80, "Total Buffer Usage: %d\n",databuffers+freebuffers);
send(csock,tbuf,strlen(tbuf),0);
fsync(csock);
close(csock);
} else {
//TODO: ignore favicon.ico requests.. they get a little pointless..
//TODO: extract the GET url requested, to log it to console.
char *text="HTTP/1.0 200 OK\nContent-Type: text/plain\n\nUnknown URL requested, Response Sent at : ";
send(csock,text,strlen(text),0);
char tbuf[80];
time_t timer;
struct tm* tm_info;
time(&timer);
tm_info = localtime(&timer);
strftime(tbuf,80,"%Y-%m-%d %H:%M:%S", tm_info);
send(csock,tbuf,strlen(tbuf),0);
printf("%s Unknown URL request\n",tbuf);
char *text2="\nOriginal Headers:\n\n";
send(csock,text2,strlen(text2),0);
if((bytecount = send(csock, buffer, strlen(buffer), 0))== -1) {
fprintf(stderr, "Error sending data %d\n", errno);
goto FINISH;
}
fsync(csock);
close(csock);
}
FINISH:
free(lp);
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment