Skip to content

Instantly share code, notes, and snippets.

@thanoskoutr
Created April 23, 2021 22:15
Show Gist options
  • Save thanoskoutr/c4347341659db92f79b5d806aa2904a8 to your computer and use it in GitHub Desktop.
Save thanoskoutr/c4347341659db92f79b5d806aa2904a8 to your computer and use it in GitHub Desktop.
A word count tool that uses pthreads to perform parallel operation. Takes a file or a directory as argument.
#include <dirent.h>
#include <fcntl.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>
#define FILE_PATH_MAX 4096 // Maximum filepath length on linux
#define FILE_NAME_MAX 256 // Maximum name length on linux
#define MAX_FILES 1000 // TEMPORARY SOLUTION
#define MAX_THREADS 64 // Number of maximum threads
#define NTHREADS 4 // Number of threads
pthread_mutex_t mutex1 = PTHREAD_MUTEX_INITIALIZER;
// The struct that is passed as an argument to each thread.
typedef struct {
int t_id;
int fd;
off_t t_offset;
size_t bytes_to_read;
int counter;
} tdata_t;
void *thread_word_count(void *targ);
int main(int argc, char **argv) {
struct stat dirstat; // Struct for stat(), for directory
struct stat filestat; // Struct for stat(), for files
char directory_path[FILE_PATH_MAX]; // Path of directory
char file_path[FILE_PATH_MAX]; // Path of files to be opened
DIR *directory; // DIR pointer for directory
struct dirent *dp; // Struct for entries of directory
/* ---------- CHECK ARGUMENT IF A DIRECTORY ---------- */
// Chech if only 1 argument is given
if (argc > 2) {
// If more than 1, exit
printf("Only 1 argument must be given\n");
exit(EXIT_FAILURE);
}
// If 1 argument is given
else if (argc == 2) {
// Check if it is a valid path
if (stat(argv[1], &dirstat) != 0) {
perror("stat");
exit(EXIT_FAILURE);
}
// If it is a directory
if (S_ISDIR(dirstat.st_mode)) {
// Save it in a variable
strcpy(directory_path, argv[1]);
} else {
printf("Not a directory\n");
exit(EXIT_FAILURE);
}
}
// Else, no argument is given
else {
// Find the current directory
strcpy(directory_path, ".");
}
/* ---------- FIND FILES IN DIRECTORY ---------- */
int files_in_dir = 0;
size_t file_path_size;
// char files_array[MAX_FILES][FILE_NAME_MAX]; // (TEMP SOLUTION)
char *files_array[MAX_FILES]; // 2D Array for the filenames (TEMP SOLUTION)
// BEST SOLUTION: Linked List
// Open directory
directory = opendir(directory_path);
// If unable to open directory stream
if (!directory) {
printf("Cannot open directory stream\n");
exit(EXIT_FAILURE);
}
// For all files in the directory
printf("List files in directory: %s\n", directory_path);
while ((dp = readdir(directory)) != NULL) {
// Save file path
strcpy(file_path, directory_path);
strcat(file_path, "/");
strcat(file_path, dp->d_name);
// printf("Entry: \t %s, %d \n", file_path, dp->d_type);
// Check if it is a regular file (ASCII ??)
stat(file_path, &filestat);
if (!S_ISREG(filestat.st_mode)) {
continue;
}
// Add file to files_array
file_path_size = strlen(file_path);
file_path_size++;
files_array[files_in_dir] = (char *)malloc(sizeof(char) * file_path_size);
strcpy(files_array[files_in_dir], file_path);
files_in_dir++; // Update file counter
printf("File: \t %s\n", file_path);
}
// Close directory stream
closedir(directory);
printf("files_in_dir = %d\n", files_in_dir);
/* ---------- PROCESS EACH FILE ---------- */
pid_t *children = malloc(sizeof(pid_t) * files_in_dir);
pid_t wpid;
int fd_in;
int fd_out;
size_t file_size;
size_t chars_per_thread, remainder_chars;
int total_count = 0;
char output_string[FILE_NAME_MAX];
pthread_t threads[NTHREADS];
off_t t_offsets[NTHREADS];
size_t t_bytes_to_read[NTHREADS];
tdata_t t_data[NTHREADS];
// Open output.txt for Read/Write, if exists delete contents
fd_out = open("output.txt", O_RDWR | O_CREAT | O_TRUNC,
S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH);
// Check if file couldn't be opened
if (fd_out < 0) {
perror("open");
printf("Failed to open output.txt\n");
exit(EXIT_FAILURE);
} else {
printf("Successfully opened output.txt\n");
}
// For every file for processing
for (int i = 0; i < files_in_dir; i++) {
// Create a new process
children[i] = fork();
// If fork is unsuccessful
if (children[i] < 0) {
perror("fork");
exit(EXIT_FAILURE);
}
/* Code executed by child */
else if (children[i] == 0) {
// Open file for word count
fd_in = open(files_array[i], O_RDONLY);
// Check if file couldn't be opened
if (fd_in < 0) {
perror("open");
printf("Failed to open %s\n", files_array[i]);
exit(EXIT_FAILURE);
} else {
printf("Successfully opened %s\n", files_array[i]);
}
/* ---------- THREAD PROCESSING ---------- */
// Find File Size
stat(files_array[i], &filestat);
file_size = filestat.st_size;
// Define chars/thread (and remaining chars) for each thread
chars_per_thread = file_size / NTHREADS;
remainder_chars = file_size % NTHREADS;
printf(
"Child, pid = %d, File size: %ld bytes, Chars/Thread: %ld, "
"Remainder-Chars: %ld, "
"File: %s\n",
getpid(), file_size, chars_per_thread, remainder_chars,
files_array[i]);
// Assign chars offsets for processing text chunk for each thread
for (int i = 0; i < NTHREADS; i++) {
if (i == NTHREADS - 1) {
// printf("Thread %d gets %lld chars \n", i,
// chars_per_thread + remainder_chars);
t_bytes_to_read[i] = chars_per_thread + remainder_chars;
} else {
// printf("Thread %d gets %lld chars \n", i, chars_per_thread);
t_bytes_to_read[i] = chars_per_thread;
}
t_offsets[i] = i * chars_per_thread;
}
// Print threads offsets
for (int i = 0; i < NTHREADS; i++) {
printf("Thread %d gets %ld chars offset \n", i, t_offsets[i]);
}
// Create Threads
for (int i = 0; i < NTHREADS; i++) {
t_data[i].t_id = i;
t_data[i].fd = fd_in;
t_data[i].t_offset = t_offsets[i];
t_data[i].bytes_to_read = t_bytes_to_read[i];
t_data[i].counter = 0;
if (pthread_create(&threads[i], NULL, thread_word_count, &t_data[i])) {
printf("Error creating thread %d.\n", i);
exit(EXIT_FAILURE);
}
}
/* Wait till threads are complete before main continues. Unless we */
/* wait we run the risk of executing an exit which will terminate */
/* the process and all threads before the threads have completed. */
for (int i = 0; i < NTHREADS; i++) {
if (pthread_join(threads[i], NULL)) {
printf("Failure on pthread_join for thread %d.\n", i);
exit(EXIT_FAILURE);
}
}
/* Now that all threads are complete we can print the final result. */
/* Without the join we could be printing a value before all the */
/* threads have been completed. */
// printf("Final counter value: %d\n", thread_counter);
for (int i = 0; i < NTHREADS; i++) {
total_count += t_data[i].counter;
}
printf("Final word count: %d\n", total_count);
// Write string to output.txt
sprintf(output_string, "%d, %s, %d\n", getpid(), files_array[i],
total_count);
write(fd_out, output_string, strlen(output_string));
// Exit
exit(EXIT_SUCCESS);
}
}
/* Code executed by parent */
printf("Parent, pid = %d, is executed\n", getpid());
// Wait for all children processes to finish
for (int i = 0; i < files_in_dir; i++) {
wpid = waitpid(-1, NULL, 0);
printf("Child, pid = %d, is done\n", wpid);
}
/* CLEAN UP CODE */
// Close the file descriptor
close(fd_out);
// Free children array
free(children);
// Free dynamic array
for (int i = 0; i < files_in_dir; i++) {
free(files_array[i]);
}
return 0;
}
// This is the function that is executed by each spawned thread.
void *thread_word_count(void *targ) {
tdata_t *mydata = targ;
ssize_t bytes_read;
off_t read_offset;
char *read_buffer = malloc(sizeof(char) * mydata->bytes_to_read);
// Print Thread data
printf("pid = %d, t_id = %d, offset = %ld, bytes_to_read = %ld\n", getpid(),
mydata->t_id, mydata->t_offset, mydata->bytes_to_read);
// Lock
pthread_mutex_lock(&mutex1);
read_offset = lseek(mydata->fd, mydata->t_offset, SEEK_SET);
if (read_offset < 0) {
perror("lseek");
exit(EXIT_FAILURE);
}
bytes_read = read(mydata->fd, read_buffer, mydata->bytes_to_read);
// Unlock
pthread_mutex_unlock(&mutex1);
// printf("t_id = %d, bytes_read = %ld, read_buffer:\n---%s\n", mydata->t_id,
// bytes_read, read_buffer);
// Count word in text
for (int i = 0; i < bytes_read; i++) {
// Check for all whitespace chars !
if ((read_buffer[i] == ' ' || read_buffer[i] == '\t' ||
read_buffer[i] == '\n') &&
(read_buffer[i + 1] != ' ' && read_buffer[i + 1] != '\n' &&
read_buffer[i + 1] != '\t')) {
mydata->counter++;
}
}
return NULL;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment