Skip to content

Instantly share code, notes, and snippets.

@mangalaman93
Created August 25, 2017 23:34
Show Gist options
  • Save mangalaman93/b66ca3ff09c515023ff072870ad13ac4 to your computer and use it in GitHub Desktop.
Save mangalaman93/b66ca3ff09c515023ff072870ad13ac4 to your computer and use it in GitHub Desktop.
#include <iostream>
#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <cassert>
#include <condition_variable>
#include <mutex>
#include <thread>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <unistd.h>
using namespace std;
#define N 4
#define ntohll(x) ((1==ntohl(1)) ? (x) : ((uint64_t)ntohl((x) & 0xFFFFFFFF) << 32) | ntohl((x) >> 32))
/* Specialized Queue implementation with following properties -
* - One thread will only call pop()
* - Pop will always return with a number
* - The other thread will only call push()
*/
typedef struct Node {
int elem;
struct Node* next;
} Node;
class Queue {
Node* first = nullptr;
Node* last = nullptr;
mutex mut;
condition_variable cond;
public:
Queue();
~Queue();
int pop();
void push(int elem);
};
Queue::Queue() {}
Queue::~Queue() {}
int Queue::pop() {
unique_lock<std::mutex> lk(this->mut);
while (!this->first) {
this->cond.wait(lk);
}
if (first == last) {
last = nullptr;
}
int elem = this->first->elem;
Node* to_delete = this->first;
this->first = this->first->next;
delete to_delete;
return elem;
}
void Queue::push(int elem) {
Node* node = new Node();
node->elem = elem;
node->next = nullptr;
unique_lock<std::mutex> lk(this->mut);
if (this->last) {
this->last->next = node;
this->last = node;
} else {
this->first = node;
this->last = node;
}
this->cond.notify_one();
}
/* Min Heap implementation */
typedef struct HeapNode {
uint32_t exnum;
uint64_t num;
HeapNode(int e, int n) : exnum(e), num(n) {}
} HeapNode;
class MinHeap {
HeapNode** data;
int length = 0;
public:
MinHeap(int maxelem);
~MinHeap();
int size();
HeapNode* pop();
void push(HeapNode* node);
};
MinHeap::MinHeap(int maxelem) {
data = new HeapNode*[maxelem];
}
MinHeap::~MinHeap() {
delete[] this->data;
}
int MinHeap::size() {
return this->length;
}
HeapNode* MinHeap::pop() {
assert(this->length > 0);
this->length--;
HeapNode* root = this->data[0];
this->data[0] = this->data[this->length];
this->data[this->length] = nullptr;
int i = 0;
while (true) {
int least = i;
if (2*i+1 < this->length && this->data[least]->num > this->data[2*i+1]->num) {
least = 2*i + 1;
}
if (2*i+2 < this->length && this->data[least]->num > this->data[2*i+2]->num) {
least = 2*i + 2;
}
if (least == i) {
break;
}
HeapNode* temp = this->data[i];
this->data[i] = this->data[least];
this->data[least] = temp;
i = least;
}
return root;
}
void MinHeap::push(HeapNode* node) {
int i = this->length;
int pi = (i-1)/2; // Parent(i) = i/2
this->length++;
while (i > 0 && this->data[pi]->num > node->num) {
this->data[i] = this->data[pi];
i = pi;
pi = (i-1)/2;
}
this->data[i] = node;
}
/* Main function */
Queue q[N];
// Ensure that we read 12 bytes
HeapNode readOne(int fd) {
char buffer[12];
int read_bytes = 0;
while (read_bytes < 12) {
int n = read(fd, (void*)(buffer+read_bytes), 12-read_bytes);
read_bytes += n;
}
uint32_t exnum = *(uint32_t*)(buffer);
uint64_t num = *(uint64_t*)(buffer+4);
// exnum = ntohl(exnum);
// num = ntohll(num);
return HeapNode(exnum, num);
}
void read_num(int fd) {
while (true) {
HeapNode node = readOne(fd);
q[node.exnum].push(node.num);
if (node.num == 0) {
close(fd);
break;
}
}
}
int main(int argc, char* argv[]) {
if (argc < 2) {
cout<<"Usage: "<<argv[0]<<" <Port>"<<endl;
exit(1);
}
// TODO: error handling
int port;
port = atoi(argv[1]);
int server_fd;
struct sockaddr_in address;
int opt = 1;
int addrlen = sizeof(address);
// Creating socket file descriptor
if ((server_fd = socket(AF_INET, SOCK_STREAM, 0)) == 0) {
perror("socket failed");
exit(EXIT_FAILURE);
}
if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt))) {
perror("setsockopt failed");
exit(EXIT_FAILURE);
}
address.sin_family = AF_INET;
address.sin_addr.s_addr = INADDR_ANY;
address.sin_port = htons(port);
if(bind(server_fd, (struct sockaddr *)&address, sizeof(address)) < 0) {
perror("bind failed");
exit(EXIT_FAILURE);
}
if (listen(server_fd, N) < 0) {
perror("listen failed");
exit(EXIT_FAILURE);
}
// first accept connections from all the N exchanges
thread* t = new thread[N];
for (int i = 0; i < N; ++i) {
int new_socket;
if ((new_socket = accept(server_fd, (struct sockaddr *)&address, (socklen_t*)&addrlen)) < 0) {
perror("accept failed");
exit(EXIT_FAILURE);
}
cout<<"accepted connection "<<i<<endl;
t[i] = thread(read_num, new_socket);
}
cout<<"accepted all connections"<<endl;
// sorting algorithm
// first, add first element to the minheap
MinHeap mh(N);
for (int i = 0; i < N; ++i) {
int num = q[i].pop();
// there are no elements coming from this exchange
if (num == 0) {
continue;
}
HeapNode* node = new HeapNode(i, num);
mh.push(node);
}
cout<<"inserted first element of all queues into MinHeap"<<endl;
while (mh.size() > 0) {
HeapNode* node = mh.pop();
cout<<node->num<<endl;
uint32_t exnum = node->exnum;
delete node;
uint64_t num = q[exnum].pop();
if (num == 0) {
continue;
}
mh.push(new HeapNode(exnum, num));
}
for (int i = 0; i < N; ++i) {
t[i].join();
}
close(server_fd);
delete[] t;
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment