Skip to content

Instantly share code, notes, and snippets.

@Cranc
Last active December 13, 2017 14:04
Show Gist options
  • Save Cranc/4e2f71c1009ec01161ed1d5afbe9a94e to your computer and use it in GitHub Desktop.
Save Cranc/4e2f71c1009ec01161ed1d5afbe9a94e to your computer and use it in GitHub Desktop.
/*
* File: mythread.h
* Author: dennis
*
* Created on November 29, 2017, 2:18 PM
*/
#ifndef MYTHREAD_H
#define MYTHREAD_H
#ifdef __cplusplus
extern "C" {
#endif
#define ANSI_COLOR_RED "\x1b[31m"
#define ANSI_COLOR_GREEN "\x1b[32m"
#define ANSI_COLOR_YELLOW "\x1b[33m"
#define ANSI_COLOR_BLUE "\x1b[34m"
#define ANSI_COLOR_MAGENTA "\x1b[35m"
#define ANSI_COLOR_CYAN "\x1b[36m"
#define ANSI_COLOR_RESET "\x1b[0m"
#define MAXINPUTSIZE 450
#define LINEBUFFER 1024
#define FILENAMESIZE 40
#define PREFIXBUFFER 10
#define QUEUESIZE 5
#define LOOP 20
#define CONSUMER_THREAD_COUNT 3
#define EXIT 1
#define REPEAT 0
#define DONE 3
#define TOO_LONG 2
#define NO_INPUT 1
#define OK 0
void *producer (void *args);
void *consumer (void *args);
typedef struct {
char buf[LINEBUFFER];
int id;
} data;
typedef struct {
data buf[QUEUESIZE];//char buf[QUEUESIZE][LINEBUFFER];
long head, tail;
int full, empty, producer;
pthread_mutex_t *mut;
pthread_cond_t *notFull, *notEmpty;
} queue;
typedef struct {
queue *fifo;
int id;
} s_consumer;
typedef struct {
queue *fifo;
char input[MAXINPUTSIZE];
} s_producer;
queue *queueInit (void);
s_consumer *consumerInit (queue *q, int i);
s_producer *producerInit (queue *q, char *buff);
void queueDelete (queue *q);
void consumerDelete (s_consumer *c);
void producerDelete (s_producer *p);
void queueAdd (queue *q, data in);
void queueDel (queue *q, data* out);
int getNewFileId();
int getLine(char* prmpt, char *buff, size_t size);
#ifdef __cplusplus
}
#endif
#endif /* MYTHREAD_H */
/*
* File: mythread.c
* Author: delindne
*
* Created on 23. November 2017, 13:03
*/
#include <pthread.h>
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <sys/time.h>
#include "mythread.h"
#include "msocket.h"
#include "socket.c"
/*
*
*/
pthread_mutex_t _file_id;
int highest_free_file_id = 0;
int main(int argc, char** argv) {
pthread_t t_producer;
pthread_t t_consumer[CONSUMER_THREAD_COUNT];
queue *fifo;
s_consumer *consumer_structs[CONSUMER_THREAD_COUNT];
s_producer *producer_struct;
fifo = queueInit();
if(fifo == NULL){
printf(ANSI_COLOR_RED "Could not initilaize Queue.\n" ANSI_COLOR_RESET);
exit(1);
}
char input[MAXINPUTSIZE];
//get file
getLine(ANSI_COLOR_MAGENTA "Enter name of control file: " ANSI_COLOR_RESET,input, sizeof(input));
printf("\n");
producer_struct = producerInit(fifo, input);
//start consumer threads
int rc;
for(int i = 0; i < CONSUMER_THREAD_COUNT; i++){
consumer_structs[i] = consumerInit(fifo, i+1);
rc = pthread_create(&t_consumer[i], NULL, consumer, consumer_structs[i]);
}
//start producer wait till finished and ask if we should stop
rc = pthread_create(&t_producer, NULL, producer, producer_struct);
pthread_join(t_producer, NULL);
producerDelete(producer_struct);
//printf("producer done\n");
//cancel the consumer on exit
for(int i = 0; i < CONSUMER_THREAD_COUNT; i++){
pthread_join(t_consumer[i],NULL);
consumerDelete(consumer_structs[i]);
}
queueDelete(fifo);
return (EXIT_SUCCESS);
}
/**
* reads sites from queue and calls the server
* @param q fifo queue where items are located
* @return void
*/
void *consumer(void* q){
//init stuff
s_consumer *my_struct = (s_consumer *)q;
queue *fifo = (queue *)my_struct->fifo;
int id = my_struct->id;
struct timespec timeToWait;
struct timeval now;
printf(ANSI_COLOR_BLUE "Consumer number %d ready for duty\n" ANSI_COLOR_RESET, id);
//keep looping
while(1){
char b_data[LINEBUFFER];
memset(b_data, 0, sizeof b_data);
pthread_mutex_lock(fifo->mut);
//while queue empty wait and if producer is done get ready to be canceled
while(fifo->empty == 1){
if(fifo->producer == DONE){
//printf("fifo empty and producer done [head:%d,tail:%d,full:%d,empty:%d,producer:%d]\n",fifo->head,fifo->tail,fifo->full,fifo->empty,fifo->producer);
pthread_mutex_unlock(fifo->mut);
return NULL;
}
else{
gettimeofday(&now,NULL);
timeToWait.tv_sec = now.tv_sec+1;
pthread_cond_timedwait(fifo->notEmpty, fifo->mut, &timeToWait);
}
}
//get item from queue and unlock mutex to not block longer than needed
data buffer;
queueDel(fifo, &buffer);
strncpy(b_data,buffer.buf,LINEBUFFER);
pthread_mutex_unlock(fifo->mut);
pthread_cond_signal(fifo->notFull);
//split into web-address and site
char args[2][LINEBUFFER];
memset(args[0], 0, sizeof args[0]);
memset(args[1], 0, sizeof args[1]);
char *result = strstr(b_data, " /");
int position = result - b_data;
int substringLength = strlen(b_data) - position;
strncpy(args[0], b_data, position);
strncpy(args[1], b_data + position + 1, substringLength);
//create file path
char filepath[FILENAMESIZE];
memset(filepath, 0, sizeof filepath);
snprintf(filepath, sizeof(filepath), "test/file_%d_%d.html",buffer.id, id);
//ask server and inform if fatal error occurred
//printf("consumer[%d] ask %s\n",id,args[0]);
errorValue ret = askServer(args[0], args[1],filepath);
switch(ret){
case fatalError:
printf(ANSI_COLOR_RED "Error while asking Server!\n" ANSI_COLOR_RESET);
break;
}
}
return NULL;
}
/**
* askes for file, reads items from list and enters them to a queue
* @param q pointer to queue
* @return void
*/
void *producer (void *q)
{
//init
s_producer *p = (s_producer *)q;
queue *fifo = p->fifo;//(queue *)q;
fifo->producer = 0;
char input[MAXINPUTSIZE];
strncpy(input, p->input, MAXINPUTSIZE);
//if file exist do your worst
if( access( input, F_OK ) != -1 ) {
// file exists
FILE *fp = fopen(input, "r");
if(fp == NULL){
fifo->producer = DONE;
return NULL;
}
char buf[LINEBUFFER];
int line = 1;
while( fgets(buf, LINEBUFFER, fp) ){
//read file line by line and add each line to the queue
pthread_mutex_lock (fifo->mut);
while (fifo->full){
printf(ANSI_COLOR_YELLOW "producer: queue FULL.\n" ANSI_COLOR_RESET);
pthread_cond_wait(fifo->notFull, fifo->mut);
}
data buffer;
strncpy(buffer.buf,buf,LINEBUFFER);
buffer.id = line;
queueAdd(fifo, buffer);
pthread_mutex_unlock(fifo->mut);
pthread_cond_signal(fifo->notEmpty);
line++;
}
fclose(fp);
} else {
// file doesn't exist
printf(ANSI_COLOR_RED"input: %s\n no such file found\n" ANSI_COLOR_RESET, input);
fifo->producer = DONE;
return NULL;
}
//producer done then tell everyone and terminate (who is a good boy)
fifo->producer = DONE;
printf(ANSI_COLOR_GREEN "Producer done!\n" ANSI_COLOR_GREEN);
return NULL;
}
/**
* returns a unique new file id > 0
* @return int (unique id)
*/
int getNewFileId(){
return ++highest_free_file_id;
}
/**
* initialize queue and return it
* @return queue
*/
queue *queueInit (void)
{
queue *q;
q = (queue *)malloc (sizeof (queue));
if (q == NULL) return (NULL);
q->producer = 0;
q->empty = 1;
q->full = 0;
q->head = 0;
q->tail = 0;
q->mut = (pthread_mutex_t *) malloc (sizeof (pthread_mutex_t));
pthread_mutex_init (q->mut, NULL);
q->notFull = (pthread_cond_t *) malloc (sizeof (pthread_cond_t));
pthread_cond_init (q->notFull, NULL);
q->notEmpty = (pthread_cond_t *) malloc (sizeof (pthread_cond_t));
pthread_cond_init (q->notEmpty, NULL);
return (q);
}
/**
* initialize s_consumer and return it
* @return queue
*/
s_consumer *consumerInit (queue *q, int i)
{
s_consumer *c;
c = (s_consumer *)malloc (sizeof(s_consumer));
if (c == NULL) return NULL;
c->fifo = q;
c->id = i;
return (c);
}
/**
* initialize s_producer and return it
* @return queue
*/
s_producer *producerInit (queue *q, char *buff)
{
s_producer *c;
c = (s_producer *)malloc (sizeof(s_producer));
if (c == NULL) return NULL;
c->fifo = q;
strncpy(c->input, buff, MAXINPUTSIZE);
return (c);
}
void consumerDelete (s_consumer *c){
free(c);
}
void producerDelete (s_producer *p){
free(p);
}
/**
* frees a queue and the memory it allocated
* @param q queue to delete
*/
void queueDelete (queue *q)
{
pthread_mutex_destroy (q->mut);
free (q->mut);
pthread_cond_destroy (q->notFull);
free (q->notFull);
pthread_cond_destroy (q->notEmpty);
free (q->notEmpty);
free (q);
}
/**
* adds an item to the queue
* @param q queue to add to
* @param in item to add
*/
void queueAdd (queue *q, data in)
{
//printf("Add %s", in);
//strncpy(q->buf[q->tail].buf,in,LINEBUFFER);
data p;
strncpy(p.buf,in.buf,LINEBUFFER);
p.id = in.id;
q->buf[q->tail] = p;
q->tail++;
if (q->tail == QUEUESIZE)
q->tail = 0;
if (q->tail == q->head)
q->full = 1;
q->empty = 0;
return;
}
/**
* pops an item from the queue
* @param q queue to pop from
* @param out item that is returned
*/
void queueDel (queue *q, data* out)
{
//strncpy(out, q->buf[q->head].buf, LINEBUFFER);
strncpy(out->buf,q->buf[q->head].buf,LINEBUFFER);
out->id = q->buf[q->head].id;
q->head++;
if (q->head == QUEUESIZE)
q->head = 0;
if (q->head == q->tail)
q->empty = 1;
q->full = 0;
return;
}
//ask for a user input with a maximum size
//returns OK and the user input in buff
//returns NO_INPUT or TOO_LONG on error
int getLine(char* prmpt, char *buff, size_t size){
int ch, extra;
if (prmpt != NULL){
printf("%s", prmpt);
fflush(stdout);
}
if(fgets(buff, size, stdin) == NULL)
return NO_INPUT;
if(buff[0] == '\n')
return NO_INPUT;
if(buff[strlen(buff)-1] != '\n'){
extra = 0;
while (((ch = getchar()) != '\n') && (ch != EOF))
extra = 1;
return (extra == 1) ? TOO_LONG : OK;
}
buff[strlen(buff)-1] = '\0';
return OK;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment