Created
April 23, 2021 22:15
-
-
Save thanoskoutr/c4347341659db92f79b5d806aa2904a8 to your computer and use it in GitHub Desktop.
A word count tool that uses pthreads to perform parallel operation. Takes a file or a directory as argument.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <dirent.h> | |
#include <fcntl.h> | |
#include <pthread.h> | |
#include <stdio.h> | |
#include <stdlib.h> | |
#include <string.h> | |
#include <sys/stat.h> | |
#include <sys/types.h> | |
#include <sys/wait.h> | |
#include <unistd.h> | |
#define FILE_PATH_MAX 4096 // Maximum filepath length on linux | |
#define FILE_NAME_MAX 256 // Maximum name length on linux | |
#define MAX_FILES 1000 // TEMPORARY SOLUTION | |
#define MAX_THREADS 64 // Number of maximum threads | |
#define NTHREADS 4 // Number of threads | |
pthread_mutex_t mutex1 = PTHREAD_MUTEX_INITIALIZER; | |
// The struct that is passed as an argument to each thread. | |
typedef struct { | |
int t_id; | |
int fd; | |
off_t t_offset; | |
size_t bytes_to_read; | |
int counter; | |
} tdata_t; | |
void *thread_word_count(void *targ); | |
int main(int argc, char **argv) { | |
struct stat dirstat; // Struct for stat(), for directory | |
struct stat filestat; // Struct for stat(), for files | |
char directory_path[FILE_PATH_MAX]; // Path of directory | |
char file_path[FILE_PATH_MAX]; // Path of files to be opened | |
DIR *directory; // DIR pointer for directory | |
struct dirent *dp; // Struct for entries of directory | |
/* ---------- CHECK ARGUMENT IF A DIRECTORY ---------- */ | |
// Chech if only 1 argument is given | |
if (argc > 2) { | |
// If more than 1, exit | |
printf("Only 1 argument must be given\n"); | |
exit(EXIT_FAILURE); | |
} | |
// If 1 argument is given | |
else if (argc == 2) { | |
// Check if it is a valid path | |
if (stat(argv[1], &dirstat) != 0) { | |
perror("stat"); | |
exit(EXIT_FAILURE); | |
} | |
// If it is a directory | |
if (S_ISDIR(dirstat.st_mode)) { | |
// Save it in a variable | |
strcpy(directory_path, argv[1]); | |
} else { | |
printf("Not a directory\n"); | |
exit(EXIT_FAILURE); | |
} | |
} | |
// Else, no argument is given | |
else { | |
// Find the current directory | |
strcpy(directory_path, "."); | |
} | |
/* ---------- FIND FILES IN DIRECTORY ---------- */ | |
int files_in_dir = 0; | |
size_t file_path_size; | |
// char files_array[MAX_FILES][FILE_NAME_MAX]; // (TEMP SOLUTION) | |
char *files_array[MAX_FILES]; // 2D Array for the filenames (TEMP SOLUTION) | |
// BEST SOLUTION: Linked List | |
// Open directory | |
directory = opendir(directory_path); | |
// If unable to open directory stream | |
if (!directory) { | |
printf("Cannot open directory stream\n"); | |
exit(EXIT_FAILURE); | |
} | |
// For all files in the directory | |
printf("List files in directory: %s\n", directory_path); | |
while ((dp = readdir(directory)) != NULL) { | |
// Save file path | |
strcpy(file_path, directory_path); | |
strcat(file_path, "/"); | |
strcat(file_path, dp->d_name); | |
// printf("Entry: \t %s, %d \n", file_path, dp->d_type); | |
// Check if it is a regular file (ASCII ??) | |
stat(file_path, &filestat); | |
if (!S_ISREG(filestat.st_mode)) { | |
continue; | |
} | |
// Add file to files_array | |
file_path_size = strlen(file_path); | |
file_path_size++; | |
files_array[files_in_dir] = (char *)malloc(sizeof(char) * file_path_size); | |
strcpy(files_array[files_in_dir], file_path); | |
files_in_dir++; // Update file counter | |
printf("File: \t %s\n", file_path); | |
} | |
// Close directory stream | |
closedir(directory); | |
printf("files_in_dir = %d\n", files_in_dir); | |
/* ---------- PROCESS EACH FILE ---------- */ | |
pid_t *children = malloc(sizeof(pid_t) * files_in_dir); | |
pid_t wpid; | |
int fd_in; | |
int fd_out; | |
size_t file_size; | |
size_t chars_per_thread, remainder_chars; | |
int total_count = 0; | |
char output_string[FILE_NAME_MAX]; | |
pthread_t threads[NTHREADS]; | |
off_t t_offsets[NTHREADS]; | |
size_t t_bytes_to_read[NTHREADS]; | |
tdata_t t_data[NTHREADS]; | |
// Open output.txt for Read/Write, if exists delete contents | |
fd_out = open("output.txt", O_RDWR | O_CREAT | O_TRUNC, | |
S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH); | |
// Check if file couldn't be opened | |
if (fd_out < 0) { | |
perror("open"); | |
printf("Failed to open output.txt\n"); | |
exit(EXIT_FAILURE); | |
} else { | |
printf("Successfully opened output.txt\n"); | |
} | |
// For every file for processing | |
for (int i = 0; i < files_in_dir; i++) { | |
// Create a new process | |
children[i] = fork(); | |
// If fork is unsuccessful | |
if (children[i] < 0) { | |
perror("fork"); | |
exit(EXIT_FAILURE); | |
} | |
/* Code executed by child */ | |
else if (children[i] == 0) { | |
// Open file for word count | |
fd_in = open(files_array[i], O_RDONLY); | |
// Check if file couldn't be opened | |
if (fd_in < 0) { | |
perror("open"); | |
printf("Failed to open %s\n", files_array[i]); | |
exit(EXIT_FAILURE); | |
} else { | |
printf("Successfully opened %s\n", files_array[i]); | |
} | |
/* ---------- THREAD PROCESSING ---------- */ | |
// Find File Size | |
stat(files_array[i], &filestat); | |
file_size = filestat.st_size; | |
// Define chars/thread (and remaining chars) for each thread | |
chars_per_thread = file_size / NTHREADS; | |
remainder_chars = file_size % NTHREADS; | |
printf( | |
"Child, pid = %d, File size: %ld bytes, Chars/Thread: %ld, " | |
"Remainder-Chars: %ld, " | |
"File: %s\n", | |
getpid(), file_size, chars_per_thread, remainder_chars, | |
files_array[i]); | |
// Assign chars offsets for processing text chunk for each thread | |
for (int i = 0; i < NTHREADS; i++) { | |
if (i == NTHREADS - 1) { | |
// printf("Thread %d gets %lld chars \n", i, | |
// chars_per_thread + remainder_chars); | |
t_bytes_to_read[i] = chars_per_thread + remainder_chars; | |
} else { | |
// printf("Thread %d gets %lld chars \n", i, chars_per_thread); | |
t_bytes_to_read[i] = chars_per_thread; | |
} | |
t_offsets[i] = i * chars_per_thread; | |
} | |
// Print threads offsets | |
for (int i = 0; i < NTHREADS; i++) { | |
printf("Thread %d gets %ld chars offset \n", i, t_offsets[i]); | |
} | |
// Create Threads | |
for (int i = 0; i < NTHREADS; i++) { | |
t_data[i].t_id = i; | |
t_data[i].fd = fd_in; | |
t_data[i].t_offset = t_offsets[i]; | |
t_data[i].bytes_to_read = t_bytes_to_read[i]; | |
t_data[i].counter = 0; | |
if (pthread_create(&threads[i], NULL, thread_word_count, &t_data[i])) { | |
printf("Error creating thread %d.\n", i); | |
exit(EXIT_FAILURE); | |
} | |
} | |
/* Wait till threads are complete before main continues. Unless we */ | |
/* wait we run the risk of executing an exit which will terminate */ | |
/* the process and all threads before the threads have completed. */ | |
for (int i = 0; i < NTHREADS; i++) { | |
if (pthread_join(threads[i], NULL)) { | |
printf("Failure on pthread_join for thread %d.\n", i); | |
exit(EXIT_FAILURE); | |
} | |
} | |
/* Now that all threads are complete we can print the final result. */ | |
/* Without the join we could be printing a value before all the */ | |
/* threads have been completed. */ | |
// printf("Final counter value: %d\n", thread_counter); | |
for (int i = 0; i < NTHREADS; i++) { | |
total_count += t_data[i].counter; | |
} | |
printf("Final word count: %d\n", total_count); | |
// Write string to output.txt | |
sprintf(output_string, "%d, %s, %d\n", getpid(), files_array[i], | |
total_count); | |
write(fd_out, output_string, strlen(output_string)); | |
// Exit | |
exit(EXIT_SUCCESS); | |
} | |
} | |
/* Code executed by parent */ | |
printf("Parent, pid = %d, is executed\n", getpid()); | |
// Wait for all children processes to finish | |
for (int i = 0; i < files_in_dir; i++) { | |
wpid = waitpid(-1, NULL, 0); | |
printf("Child, pid = %d, is done\n", wpid); | |
} | |
/* CLEAN UP CODE */ | |
// Close the file descriptor | |
close(fd_out); | |
// Free children array | |
free(children); | |
// Free dynamic array | |
for (int i = 0; i < files_in_dir; i++) { | |
free(files_array[i]); | |
} | |
return 0; | |
} | |
// This is the function that is executed by each spawned thread. | |
void *thread_word_count(void *targ) { | |
tdata_t *mydata = targ; | |
ssize_t bytes_read; | |
off_t read_offset; | |
char *read_buffer = malloc(sizeof(char) * mydata->bytes_to_read); | |
// Print Thread data | |
printf("pid = %d, t_id = %d, offset = %ld, bytes_to_read = %ld\n", getpid(), | |
mydata->t_id, mydata->t_offset, mydata->bytes_to_read); | |
// Lock | |
pthread_mutex_lock(&mutex1); | |
read_offset = lseek(mydata->fd, mydata->t_offset, SEEK_SET); | |
if (read_offset < 0) { | |
perror("lseek"); | |
exit(EXIT_FAILURE); | |
} | |
bytes_read = read(mydata->fd, read_buffer, mydata->bytes_to_read); | |
// Unlock | |
pthread_mutex_unlock(&mutex1); | |
// printf("t_id = %d, bytes_read = %ld, read_buffer:\n---%s\n", mydata->t_id, | |
// bytes_read, read_buffer); | |
// Count word in text | |
for (int i = 0; i < bytes_read; i++) { | |
// Check for all whitespace chars ! | |
if ((read_buffer[i] == ' ' || read_buffer[i] == '\t' || | |
read_buffer[i] == '\n') && | |
(read_buffer[i + 1] != ' ' && read_buffer[i + 1] != '\n' && | |
read_buffer[i + 1] != '\t')) { | |
mydata->counter++; | |
} | |
} | |
return NULL; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment