Skip to content

Instantly share code, notes, and snippets.

@theasder
Last active December 31, 2015 18:59
Show Gist options
  • Save theasder/8030417 to your computer and use it in GitHub Desktop.
Save theasder/8030417 to your computer and use it in GitHub Desktop.
Parallel selection sort
#include <signal.h>
#include <stdio.h>
#include <sys/types.h>
#include <unistd.h>
#include <string.h>
#include <fcntl.h>
#include <sys/wait.h>
#include <sys/stat.h>
#include <stdlib.h>
#include <limits.h>
#include <sys/time.h>
#include <time.h>
#include <errno.h>
#include <wait.h>
void
logstr(char *who, char *what, int id)
{
// opening log file for appending information
FILE *f = fopen("psort.log","wa");
char * tstr;
time_t rawt;
struct tm *tinfo;
time ( &rawt );
// setting current time
tinfo = localtime (&rawt);
// string, that contains all information about time
tstr = strdup(asctime(tinfo));
if(tstr[strlen(tstr) - 1] == '\n')
tstr[strlen(tstr) - 1] = '\0';
// id -- number of process
fprintf(f, "%s %s %d %s\n", tstr, who, id, what);
fclose(f);
return;
}
void
sort(char **s,int n)
{
int i, j, iMin, val = -1;
char *buf;
for(i = 0; i < n - 1; i++)
{
iMin = i;
// test against elements after j to find the smallest
for(j = i + 1; j < n; j++)
// if this element is less, then it is the new minimum
if(strcmp(s[j], s[iMin]) == val)
// found new minimum; remember its index
iMin = j;
// iMin is the index of the minimum element. Swap it with the current position
if(iMin != i) {
buf = s[i];
s[i] = s[iMin];
s[iMin] = buf;
}
}
}
void
sorter(int pipein, int pipeout, int nid)
{
logstr("sorter", "started", nid);
char *line;
int maxlen = -1;
int k = 0;
int i = 0;
int n = 100;
char ** s = (char **)malloc(sizeof(char*) * n);
int len = 0;
for(;;)
{
if(i + 1 == n)
{
n *= 2;
s = (char**)realloc(s, n * sizeof(char*));
}
// reading length of array
read(pipein, &len, 4);
line = (char*)malloc(len + 2);
if(len > maxlen)
maxlen = len;
//EOF sign
if(len == -1) break;
else {
read(pipein, line, len);
line[len + 1] = '\0';
// copying readed line to array of lines
s[i++] = strdup(line);
free(line);
}
}
k = i; // number of strings
if(k == 1) {
// if we have only one string
// we don't need to sort it
write(pipeout, &k, 4);
len = strlen(s[0]);
len++;
write(pipeout, &maxlen, 4);
// writing length of the string
write(pipeout, &len, 4);
// writing first element of array of strings
write(pipeout, s[0], len);
// for unification of protocol of sending
len = -1;
write(pipeout, &len, 4);
} else if(k > 1) {
sort(s, k);
// writing number of readed strings
write(pipeout, &k, 4);
// writing maximum length of the string
write(pipeout, &maxlen, 4);
for(i = 0; i < k; i++)
{
len = strlen(s[i]);
len++;
// writing length of the string
write(pipeout, &len, 4);
// writing element of array of strings
write(pipeout, s[i], len);
}
len = -1;
write(pipeout, &len, 4);
} else {
k = 0;
write(pipeout, &k, 4);
write(pipeout, &maxlen, 4);
len = -1;
write(pipeout, &len, 4);
}
logstr("sorter", "exited", nid);
exit(0);
}
void
merger(int pipein1, int pipein2, int pipeout, int id)
{
logstr("merger", "started", id);
char *line1, *line2;
int iflag = 1, jflag = 1, i = 0, j = 0, k = 0, n1, max1, len, n2, max2, max, n;
// reading number of strings in first pipe
read(pipein1, &n1, 4);
// reading number of strings in second pipe
read(pipein2, &n2, 4);
// finding size of merged array
n = n1 + n2;
// reading maximum length of array in first pipe
read(pipein1, &max1, 4);
line1 = (char *)malloc(max1 * sizeof(char));
// reading maximum length of array in second pipe
read(pipein2, &max2, 4);
line2 = (char *)malloc(max2 * sizeof(char));
max = max1 > max2? max1: max2;
// writing in next pipe number of elements and maximum length
write(pipeout, &n, 4);
write(pipeout, &max, 4);
// Using iflag and jflag to update two strings which traverse two inputs
if(n1 == 0 || n2 == 0)
{
// in case of empty array just writing one of the arrays in next pipe
pipein1 = (n2 == 0)? pipein1: pipein2;
for(i = 0; i < n; i++)
{
read(pipein1, &len, 4);
read(pipein1, line1, len);
write(pipeout, &len, 4);
write(pipeout, line1, len);
}
} else {
while (i < n1 && j < n2)
{
// at first we read all strings
if(iflag)
{
read(pipein1, &len, 4);
read(pipein1, line1, len);
iflag = 0;
}
if(jflag)
{
read(pipein2, &len, 4);
read(pipein2, line2, len);
jflag = 0;
}
// then we compare them and
if (strcmp(line1, line2) == -1)
{
// writing length and line1 if line1 < line2
len = strlen(line1) + 1;
write(pipeout, &len, 4);
write(pipeout, line1, len);
iflag = 1;
i++;
} else {
// writing length and line2 if line1 > line2
len = strlen(line2) + 1;
write(pipeout, &len, 4);
write(pipeout, line2, len);
jflag = 1;
j++;
}
}
// if i == n1 it is possible that j < n2
if(i == n1) {
// if we have already read the string and didn't write it
// than we should write it
if(!jflag)
{
len = strlen(line2) + 1;
write(pipeout, &len, 4);
write(pipeout, line2, len);
}
// so we should write all the rest of strings
for ( k = j; k < n2; k++)
{
read(pipein2, &len, 4);
read(pipein2, line2, len);
len = strlen(line2) + 1;
write(pipeout, &len, 4);
write(pipeout, line2, len);
}
} else {
// same
if(!iflag)
{
len = strlen(line1) + 1;
write(pipeout, &len, 4);
write(pipeout, line1, len);
}
for (k = i; k < n1; k++)
{
read(pipein1, &len, 4);
read(pipein1, line1, len);
len = strlen(line1);
len++;
write(pipeout, &len, 4);
write(pipeout, line1, len);
}
}
}
len = -1;
write(pipeout, &len, 4);
logstr("Merger","Ended",id);
_exit(0);
}
struct pipenode
{
int fd;
struct pipenode *next;
};
void partitioner(int n, int fd)
{
logstr("partitioner","started",0);
int i, curr = 1024, size;
char *s = (char *)malloc(curr * sizeof(char) + 1);
int *out = (int *)malloc(sizeof(int) * n);
// reading array of descriptors
for(i = 0; i < n; i++)
read(fd, &out[i], 4);
for(i = 0; fgets(s, curr + 1, stdin) > 0; i = (i + 1) % n)
{
// if buffer is overloaded and it doesn't end with newline character
// then reallocating
while(s[curr - 1] != '\n' && strlen(s) == curr )
{
curr = curr * 2;
s = (char *)realloc(s, curr * sizeof(char) + 1);
fgets(s + curr / 2 , curr / 2 + 1, stdin);
}
// eliminating carriage and newline
if(s[strlen(s) - 1] == '\n')
s[strlen(s) - 1] = '\0';
if(s[strlen(s) - 2] == '\r')
s[strlen(s) - 2] = '\0';
// writing strings to file descriptors
size = strlen(s) + 1;
write(out[i], &size, 4);
write(out[i], s, size);
}
size = -1;
for(i = 0; i < n; i++)
write(out[i], &size, 4);
free(s);
free(out);
logstr("partitioner", "ended", 0);
_exit(0);
}
int
main(int argc, char** argv)
{
logstr("manager","started", 0);
// mode for future version
int N, /*mode = 0,*/ i = 0;
struct pipenode *head =(struct pipenode *)malloc(sizeof(struct pipenode)), *curr= head, *bufnode;
int fdbuf[2], fd1[2], fd2[2], *partitioner_input, elem1, elem2;
int buf[2];
int out, len, n, maxlen;
N = atoi(argv[1]);
partitioner_input = (int*)malloc(sizeof(int) * N);
for(i = 0; i < N; i++) //Creating sorters
{
if(i != 0)
curr = curr->next;
pipe(fd1);
pipe(fd2);
if(!fork())
sorter(fd1[0], fd2[1], i);
else {
curr->fd = fd2[0];//pipe for sorters
partitioner_input[i] = fd1[1];
curr->next = (struct pipenode *)malloc(sizeof(struct pipenode));
}
}
curr->next = head;
pipe(fdbuf);
if(!fork())
partitioner(N, fdbuf[0]); //Create partitioner
for(i=0; i<N; i++)
write(fdbuf[1], &partitioner_input[i], 4);//Send sorters pipes to write to
curr = head;
i = 0;
while(curr->next != curr)//Creating mergers using cyclic list
{
pipe(buf);
elem1 = curr->fd;
elem2 = curr->next->fd;
curr->fd = buf[0];// new pipe to read from
bufnode = curr->next;
curr->next = curr->next->next;//replacing 2 fd with 1 fd
free(bufnode);
if(fork()) {
curr = curr->next;
i++;
}
else
merger(elem1, elem2, buf[1], i);
}
out = curr->fd;
len = 0;
read(out, &n, 4);
read(out, &maxlen, 4);
char *line1 = (char*)malloc(maxlen), *line2=(char *)malloc(maxlen);
//Getting output from last fd
for(i = 0; i < n; i++)
{
read(out, &len, 4);
read(out, line1, len);
printf("%s\n", line1);
}
for(i = 0; i < 2 * N; i++)
wait(NULL);
logstr("manager", "ended", 0);
free(line1);
free(line2);
return 0;
}
#include <signal.h>
#include <stdio.h>
#include <sys/types.h>
#include <unistd.h>
#include <string.h>
#include <fcntl.h>
#include <sys/wait.h>
#include <sys/stat.h>
#include <stdlib.h>
#include <limits.h>
#include <sys/time.h>
#include <time.h>
#include <errno.h>
#include <wait.h>
void
logstr(char *who, char *what, int id)
{
// opening log file for appending information
FILE *f = fopen("psort.log","wa");
char * tstr;
time_t rawt;
struct tm *tinfo;
time ( &rawt );
// setting current time
tinfo = localtime (&rawt);
// string, that contains all information about time
tstr = strdup(asctime(tinfo));
if(tstr[strlen(tstr) - 1] == '\n')
tstr[strlen(tstr) - 1] = '\0';
// id -- number of process
fprintf(f, "%s %s %d %s\n", tstr, who, id, what);
fclose(f);
return;
}
void
sort(char **s,int n)
{
int i, j, iMin, val = -1;
char *buf;
for(i = 0; i < n - 1; i++)
{
iMin = i;
// test against elements after j to find the smallest
for(j = i + 1; j < n; j++)
// if this element is less, then it is the new minimum
if(strcmp(s[j], s[iMin]) == val)
// found new minimum; remember its index
iMin = j;
// iMin is the index of the minimum element. Swap it with the current position
if(iMin != i) {
buf = s[i];
s[i] = s[iMin];
s[iMin] = buf;
}
}
}
void
sorter(int pipein, int pipeout, int id, int fd_len)
{
logstr("sorter", "started", id);
char *line;
int maxlen = -1;
int k = 0;
int i = 0;
int n;
lseek(fd_len, 4 * (id - 1), SEEK_SET);
read(fd_len, &n, 4);
char ** s = (char **)malloc(n * sizeof(char*));
int len = 0;
for(i = 0; i < n; i++)
{
// reading length of array
read(pipein, &len, 4);
line = malloc(len + 1);
if(len > maxlen)
maxlen = len;
// EOF sign
if(len == -1) break;
else {
read(pipein, line, len);
line[len] = '\0';
// copying readed line to array of lines
s[i] = strdup(line);
free(line);
}
}
if(n == 1) {
// if we have only one string
// we don't need to sort it
write(pipeout, &k, 4);
len = strlen(s[0]);
len++;
write(pipeout, &maxlen, 4);
// writing length of the string
write(pipeout, &len, 4);
// writing first element of array of strings
write(pipeout, s[0], len);
} else if(n > 1) {
sort(s, n);
// writing number of readed strings
write(pipeout, &n, 4);
// writing maximum length of the string
write(pipeout, &maxlen, 4);
for(i = 0; i < n; i++)
{
len = strlen(s[i]);
len++;
// writing length of the string
write(pipeout, &len, 4);
// writing element of array of strings
write(pipeout, s[i], len);
}
}
logstr("sorter", "exited", id);
exit(0);
}
void
merger(int pipein1, int pipein2, int pipeout, int id)
{
logstr("merger", "started", id);
char *line1, *line2;
int iflag = 1, jflag = 1, i = 0, j = 0, k = 0, n1, max1, len, n2, max2, max, n;
// reading number of strings in first pipe
read(pipein1, &n1, 4);
// reading number of strings in second pipe
read(pipein2, &n2, 4);
// finding size of merged array
n = n1 + n2;
// reading maximum length of array in first pipe
read(pipein1, &max1, 4);
line1 = (char *)malloc(max1 * sizeof(char));
// reading maximum length of array in second pipe
read(pipein2, &max2, 4);
line2 = (char *)malloc(max2 * sizeof(char));
max = max1 > max2? max1: max2;
// writing in next pipe number of elements and maximum length
write(pipeout, &n, 4);
write(pipeout, &max, 4);
// Using iflag and jflag to update two strings which traverse two inputs
if(n1 == 0 || n2 == 0)
{
// in case of empty array just writing one of the arrays in next pipe
pipein1 = (n2 == 0)? pipein1: pipein2;
for(i = 0; i < n; i++)
{
read(pipein1, &len, 4);
read(pipein1, line1, len);
write(pipeout, &len, 4);
write(pipeout, line1, len);
}
} else {
while (i < n1 && j < n2)
{
// at first we read all strings
if(iflag)
{
read(pipein1, &len, 4);
read(pipein1, line1, len);
iflag = 0;
}
if(jflag)
{
read(pipein2, &len, 4);
read(pipein2, line2, len);
jflag = 0;
}
// then we compare them and
if (strcmp(line1, line2) == -1)
{
// writing length and line1 if line1 < line2
len = strlen(line1) + 1;
write(pipeout, &len, 4);
write(pipeout, line1, len);
iflag = 1;
i++;
} else {
// writing length and line2 if line1 > line2
len = strlen(line2) + 1;
write(pipeout, &len, 4);
write(pipeout, line2, len);
jflag = 1;
j++;
}
}
// if i == n1 it is possible that j < n2
if(i == n1) {
// if we have already read the string and didn't write it
// than we should write it
if(!jflag)
{
len = strlen(line2) + 1;
write(pipeout, &len, 4);
write(pipeout, line2, len);
}
// so we should write all the rest of strings
for ( k = j; k < n2; k++)
{
read(pipein2, &len, 4);
read(pipein2, line2, len);
len = strlen(line2) + 1;
write(pipeout, &len, 4);
write(pipeout, line2, len);
}
} else {
// same
if(!iflag)
{
len = strlen(line1) + 1;
write(pipeout, &len, 4);
write(pipeout, line1, len);
}
for (k = i; k < n1; k++)
{
read(pipein1, &len, 4);
read(pipein1, line1, len);
len = strlen(line1);
len++;
write(pipeout, &len, 4);
write(pipeout, line1, len);
}
}
}
len = -1;
write(pipeout, &len, 4);
logstr("Merger","Ended",id);
_exit(0);
}
void partitioner(int n, int fd, int fd_len)
{
logstr("partitioner","started",0);
// array of amount of strings in each pipe
int amount[n] = { 0 };
int i, curr = 1024, size;
char *s = (char *)malloc(curr * sizeof(char) + 1);
int out[n] = { 0 };
// reading array of descriptors
for(i = 0; i < n; i++)
read(fd, &out[i], 4);
for(i = 0; fgets(s, curr + 1, stdin) > 0; i = (i + 1) % n)
{
// if buffer is overloaded and it doesn't end with newline character
// then reallocating
while(s[curr - 1] != '\n' && strlen(s) == curr)
{
s = (char *)realloc(s, 2 * curr * sizeof(char) + 1);
fgets(s + curr, curr + 1, stdin);
curr = curr * 2;
}
// eliminating carriage and newline
if(s[strlen(s) - 1] == '\n')
s[strlen(s) - 1] = '\0';
if(s[strlen(s) - 2] == '\r')
s[strlen(s) - 2] = '\0';
// writing strings to file descriptors
size = strlen(s) + 1;
write(out[i], &size, 4);
write(out[i], s, size);
amount[i]++;
}
// writing amount of strings in each descriptor
for(i = 0; i < n; i++)
write(fd_len, &amount[i], 4);
free(s);
logstr("partitioner", "ended", 0);
return;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment