Last active
December 13, 2017 14:04
-
-
Save Cranc/4e2f71c1009ec01161ed1d5afbe9a94e to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* File: mythread.h | |
* Author: dennis | |
* | |
* Created on November 29, 2017, 2:18 PM | |
*/ | |
#ifndef MYTHREAD_H | |
#define MYTHREAD_H | |
#ifdef __cplusplus | |
extern "C" { | |
#endif | |
#define ANSI_COLOR_RED "\x1b[31m" | |
#define ANSI_COLOR_GREEN "\x1b[32m" | |
#define ANSI_COLOR_YELLOW "\x1b[33m" | |
#define ANSI_COLOR_BLUE "\x1b[34m" | |
#define ANSI_COLOR_MAGENTA "\x1b[35m" | |
#define ANSI_COLOR_CYAN "\x1b[36m" | |
#define ANSI_COLOR_RESET "\x1b[0m" | |
#define MAXINPUTSIZE 450 | |
#define LINEBUFFER 1024 | |
#define FILENAMESIZE 40 | |
#define PREFIXBUFFER 10 | |
#define QUEUESIZE 5 | |
#define LOOP 20 | |
#define CONSUMER_THREAD_COUNT 3 | |
#define EXIT 1 | |
#define REPEAT 0 | |
#define DONE 3 | |
#define TOO_LONG 2 | |
#define NO_INPUT 1 | |
#define OK 0 | |
void *producer (void *args); | |
void *consumer (void *args); | |
typedef struct { | |
char buf[LINEBUFFER]; | |
int id; | |
} data; | |
typedef struct { | |
data buf[QUEUESIZE];//char buf[QUEUESIZE][LINEBUFFER]; | |
long head, tail; | |
int full, empty, producer; | |
pthread_mutex_t *mut; | |
pthread_cond_t *notFull, *notEmpty; | |
} queue; | |
typedef struct { | |
queue *fifo; | |
int id; | |
} s_consumer; | |
typedef struct { | |
queue *fifo; | |
char input[MAXINPUTSIZE]; | |
} s_producer; | |
queue *queueInit (void); | |
s_consumer *consumerInit (queue *q, int i); | |
s_producer *producerInit (queue *q, char *buff); | |
void queueDelete (queue *q); | |
void consumerDelete (s_consumer *c); | |
void producerDelete (s_producer *p); | |
void queueAdd (queue *q, data in); | |
void queueDel (queue *q, data* out); | |
int getNewFileId(); | |
int getLine(char* prmpt, char *buff, size_t size); | |
#ifdef __cplusplus | |
} | |
#endif | |
#endif /* MYTHREAD_H */ | |
/* | |
* File: mythread.c | |
* Author: delindne | |
* | |
* Created on 23. November 2017, 13:03 | |
*/ | |
#include <pthread.h> | |
#include <stdio.h> | |
#include <unistd.h> | |
#include <stdlib.h> | |
#include <string.h> | |
#include <sys/time.h> | |
#include "mythread.h" | |
#include "msocket.h" | |
#include "socket.c" | |
/* | |
* | |
*/ | |
pthread_mutex_t _file_id; | |
int highest_free_file_id = 0; | |
int main(int argc, char** argv) { | |
pthread_t t_producer; | |
pthread_t t_consumer[CONSUMER_THREAD_COUNT]; | |
queue *fifo; | |
s_consumer *consumer_structs[CONSUMER_THREAD_COUNT]; | |
s_producer *producer_struct; | |
fifo = queueInit(); | |
if(fifo == NULL){ | |
printf(ANSI_COLOR_RED "Could not initilaize Queue.\n" ANSI_COLOR_RESET); | |
exit(1); | |
} | |
char input[MAXINPUTSIZE]; | |
//get file | |
getLine(ANSI_COLOR_MAGENTA "Enter name of control file: " ANSI_COLOR_RESET,input, sizeof(input)); | |
printf("\n"); | |
producer_struct = producerInit(fifo, input); | |
//start consumer threads | |
int rc; | |
for(int i = 0; i < CONSUMER_THREAD_COUNT; i++){ | |
consumer_structs[i] = consumerInit(fifo, i+1); | |
rc = pthread_create(&t_consumer[i], NULL, consumer, consumer_structs[i]); | |
} | |
//start producer wait till finished and ask if we should stop | |
rc = pthread_create(&t_producer, NULL, producer, producer_struct); | |
pthread_join(t_producer, NULL); | |
producerDelete(producer_struct); | |
//printf("producer done\n"); | |
//cancel the consumer on exit | |
for(int i = 0; i < CONSUMER_THREAD_COUNT; i++){ | |
pthread_join(t_consumer[i],NULL); | |
consumerDelete(consumer_structs[i]); | |
} | |
queueDelete(fifo); | |
return (EXIT_SUCCESS); | |
} | |
/** | |
* reads sites from queue and calls the server | |
* @param q fifo queue where items are located | |
* @return void | |
*/ | |
void *consumer(void* q){ | |
//init stuff | |
s_consumer *my_struct = (s_consumer *)q; | |
queue *fifo = (queue *)my_struct->fifo; | |
int id = my_struct->id; | |
struct timespec timeToWait; | |
struct timeval now; | |
printf(ANSI_COLOR_BLUE "Consumer number %d ready for duty\n" ANSI_COLOR_RESET, id); | |
//keep looping | |
while(1){ | |
char b_data[LINEBUFFER]; | |
memset(b_data, 0, sizeof b_data); | |
pthread_mutex_lock(fifo->mut); | |
//while queue empty wait and if producer is done get ready to be canceled | |
while(fifo->empty == 1){ | |
if(fifo->producer == DONE){ | |
//printf("fifo empty and producer done [head:%d,tail:%d,full:%d,empty:%d,producer:%d]\n",fifo->head,fifo->tail,fifo->full,fifo->empty,fifo->producer); | |
pthread_mutex_unlock(fifo->mut); | |
return NULL; | |
} | |
else{ | |
gettimeofday(&now,NULL); | |
timeToWait.tv_sec = now.tv_sec+1; | |
pthread_cond_timedwait(fifo->notEmpty, fifo->mut, &timeToWait); | |
} | |
} | |
//get item from queue and unlock mutex to not block longer than needed | |
data buffer; | |
queueDel(fifo, &buffer); | |
strncpy(b_data,buffer.buf,LINEBUFFER); | |
pthread_mutex_unlock(fifo->mut); | |
pthread_cond_signal(fifo->notFull); | |
//split into web-address and site | |
char args[2][LINEBUFFER]; | |
memset(args[0], 0, sizeof args[0]); | |
memset(args[1], 0, sizeof args[1]); | |
char *result = strstr(b_data, " /"); | |
int position = result - b_data; | |
int substringLength = strlen(b_data) - position; | |
strncpy(args[0], b_data, position); | |
strncpy(args[1], b_data + position + 1, substringLength); | |
//create file path | |
char filepath[FILENAMESIZE]; | |
memset(filepath, 0, sizeof filepath); | |
snprintf(filepath, sizeof(filepath), "test/file_%d_%d.html",buffer.id, id); | |
//ask server and inform if fatal error occurred | |
//printf("consumer[%d] ask %s\n",id,args[0]); | |
errorValue ret = askServer(args[0], args[1],filepath); | |
switch(ret){ | |
case fatalError: | |
printf(ANSI_COLOR_RED "Error while asking Server!\n" ANSI_COLOR_RESET); | |
break; | |
} | |
} | |
return NULL; | |
} | |
/** | |
* askes for file, reads items from list and enters them to a queue | |
* @param q pointer to queue | |
* @return void | |
*/ | |
void *producer (void *q) | |
{ | |
//init | |
s_producer *p = (s_producer *)q; | |
queue *fifo = p->fifo;//(queue *)q; | |
fifo->producer = 0; | |
char input[MAXINPUTSIZE]; | |
strncpy(input, p->input, MAXINPUTSIZE); | |
//if file exist do your worst | |
if( access( input, F_OK ) != -1 ) { | |
// file exists | |
FILE *fp = fopen(input, "r"); | |
if(fp == NULL){ | |
fifo->producer = DONE; | |
return NULL; | |
} | |
char buf[LINEBUFFER]; | |
int line = 1; | |
while( fgets(buf, LINEBUFFER, fp) ){ | |
//read file line by line and add each line to the queue | |
pthread_mutex_lock (fifo->mut); | |
while (fifo->full){ | |
printf(ANSI_COLOR_YELLOW "producer: queue FULL.\n" ANSI_COLOR_RESET); | |
pthread_cond_wait(fifo->notFull, fifo->mut); | |
} | |
data buffer; | |
strncpy(buffer.buf,buf,LINEBUFFER); | |
buffer.id = line; | |
queueAdd(fifo, buffer); | |
pthread_mutex_unlock(fifo->mut); | |
pthread_cond_signal(fifo->notEmpty); | |
line++; | |
} | |
fclose(fp); | |
} else { | |
// file doesn't exist | |
printf(ANSI_COLOR_RED"input: %s\n no such file found\n" ANSI_COLOR_RESET, input); | |
fifo->producer = DONE; | |
return NULL; | |
} | |
//producer done then tell everyone and terminate (who is a good boy) | |
fifo->producer = DONE; | |
printf(ANSI_COLOR_GREEN "Producer done!\n" ANSI_COLOR_GREEN); | |
return NULL; | |
} | |
/** | |
* returns a unique new file id > 0 | |
* @return int (unique id) | |
*/ | |
int getNewFileId(){ | |
return ++highest_free_file_id; | |
} | |
/** | |
* initialize queue and return it | |
* @return queue | |
*/ | |
queue *queueInit (void) | |
{ | |
queue *q; | |
q = (queue *)malloc (sizeof (queue)); | |
if (q == NULL) return (NULL); | |
q->producer = 0; | |
q->empty = 1; | |
q->full = 0; | |
q->head = 0; | |
q->tail = 0; | |
q->mut = (pthread_mutex_t *) malloc (sizeof (pthread_mutex_t)); | |
pthread_mutex_init (q->mut, NULL); | |
q->notFull = (pthread_cond_t *) malloc (sizeof (pthread_cond_t)); | |
pthread_cond_init (q->notFull, NULL); | |
q->notEmpty = (pthread_cond_t *) malloc (sizeof (pthread_cond_t)); | |
pthread_cond_init (q->notEmpty, NULL); | |
return (q); | |
} | |
/** | |
* initialize s_consumer and return it | |
* @return queue | |
*/ | |
s_consumer *consumerInit (queue *q, int i) | |
{ | |
s_consumer *c; | |
c = (s_consumer *)malloc (sizeof(s_consumer)); | |
if (c == NULL) return NULL; | |
c->fifo = q; | |
c->id = i; | |
return (c); | |
} | |
/** | |
* initialize s_producer and return it | |
* @return queue | |
*/ | |
s_producer *producerInit (queue *q, char *buff) | |
{ | |
s_producer *c; | |
c = (s_producer *)malloc (sizeof(s_producer)); | |
if (c == NULL) return NULL; | |
c->fifo = q; | |
strncpy(c->input, buff, MAXINPUTSIZE); | |
return (c); | |
} | |
void consumerDelete (s_consumer *c){ | |
free(c); | |
} | |
void producerDelete (s_producer *p){ | |
free(p); | |
} | |
/** | |
* frees a queue and the memory it allocated | |
* @param q queue to delete | |
*/ | |
void queueDelete (queue *q) | |
{ | |
pthread_mutex_destroy (q->mut); | |
free (q->mut); | |
pthread_cond_destroy (q->notFull); | |
free (q->notFull); | |
pthread_cond_destroy (q->notEmpty); | |
free (q->notEmpty); | |
free (q); | |
} | |
/** | |
* adds an item to the queue | |
* @param q queue to add to | |
* @param in item to add | |
*/ | |
void queueAdd (queue *q, data in) | |
{ | |
//printf("Add %s", in); | |
//strncpy(q->buf[q->tail].buf,in,LINEBUFFER); | |
data p; | |
strncpy(p.buf,in.buf,LINEBUFFER); | |
p.id = in.id; | |
q->buf[q->tail] = p; | |
q->tail++; | |
if (q->tail == QUEUESIZE) | |
q->tail = 0; | |
if (q->tail == q->head) | |
q->full = 1; | |
q->empty = 0; | |
return; | |
} | |
/** | |
* pops an item from the queue | |
* @param q queue to pop from | |
* @param out item that is returned | |
*/ | |
void queueDel (queue *q, data* out) | |
{ | |
//strncpy(out, q->buf[q->head].buf, LINEBUFFER); | |
strncpy(out->buf,q->buf[q->head].buf,LINEBUFFER); | |
out->id = q->buf[q->head].id; | |
q->head++; | |
if (q->head == QUEUESIZE) | |
q->head = 0; | |
if (q->head == q->tail) | |
q->empty = 1; | |
q->full = 0; | |
return; | |
} | |
//ask for a user input with a maximum size | |
//returns OK and the user input in buff | |
//returns NO_INPUT or TOO_LONG on error | |
int getLine(char* prmpt, char *buff, size_t size){ | |
int ch, extra; | |
if (prmpt != NULL){ | |
printf("%s", prmpt); | |
fflush(stdout); | |
} | |
if(fgets(buff, size, stdin) == NULL) | |
return NO_INPUT; | |
if(buff[0] == '\n') | |
return NO_INPUT; | |
if(buff[strlen(buff)-1] != '\n'){ | |
extra = 0; | |
while (((ch = getchar()) != '\n') && (ch != EOF)) | |
extra = 1; | |
return (extra == 1) ? TOO_LONG : OK; | |
} | |
buff[strlen(buff)-1] = '\0'; | |
return OK; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment