Created
July 7, 2014 06:07
-
-
Save FlyingJester/f05a231a58fb6c5744a7 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <unistd.h> | |
#include <cstdlib> | |
#include <pthread.h> | |
#include <cstdio> | |
#ifdef USE_PIPES | |
template<typename T> | |
class concurrent_queue { | |
int FD[2]; | |
public: | |
concurrent_queue(){ | |
pipe(FD); | |
} | |
~concurrent_queue(){ | |
close(FD[0]); | |
close(FD[1]); | |
} | |
void push(const T &aP){ | |
write(FD[1], &aP, sizeof(T)); | |
} | |
bool try_pop(T &aTo){ | |
return (read(FD[0], &aTo, sizeof(T))>0); | |
} | |
bool pop_if_present(T &aTo){ | |
return try_pop(aTo); | |
} | |
size_t unsafe_size(){ | |
uint64_t lAt = lseek(FD[0], 0, SEEK_CUR); | |
uint64_t rAt = lseek(FD[0], 0, SEEK_END); | |
lseek(FD[0], lAt, SEEK_SET); | |
return rAt; | |
} | |
}; | |
#else | |
#include <queue> | |
#include <mutex> | |
template<typename T> | |
class concurrent_queue { | |
std::queue<T> aQueue; | |
pthread_mutex_t aMutex; | |
public: | |
concurrent_queue(){ | |
aQueue = std::queue<T> (); | |
pthread_mutex_init(&aMutex, NULL); | |
} | |
~concurrent_queue(){ | |
pthread_mutex_destroy(&aMutex); | |
} | |
void push(const T &aP){ | |
pthread_mutex_lock(&aMutex); | |
aQueue.push(aP); | |
pthread_mutex_unlock(&aMutex); | |
} | |
bool try_pop(T &aTo){ | |
pthread_mutex_lock(&aMutex); | |
bool HasAThing = true; | |
if(!aQueue.empty()){ | |
aTo = aQueue.front(); | |
aQueue.pop(); | |
} | |
else | |
HasAThing = false; | |
pthread_mutex_unlock(&aMutex); | |
return HasAThing; | |
} | |
bool pop_if_present(T &aTo){ | |
return try_pop(aTo); | |
} | |
size_t unsafe_size(){ | |
pthread_mutex_lock(&aMutex); | |
size_t size = aQueue.size(); | |
pthread_mutex_unlock(&aMutex); | |
return size; | |
} | |
}; | |
#endif | |
void *thread_func(void *aIn){ | |
printf("Starting iteration.\n"); | |
concurrent_queue<intptr_t> *aQueue = (concurrent_queue<intptr_t> *)aIn; | |
intptr_t lPtr = 0x10; | |
unsigned long long count = 0; | |
do{ | |
while(true){ | |
if(aQueue->try_pop(lPtr)) | |
break; | |
} | |
count++; | |
if((count%100000)==0) | |
printf("On iteration %llu.\n", count); | |
}while(lPtr!=0x0); | |
printf("Ending iteration.\n"); | |
pthread_exit(0); | |
return NULL; | |
} | |
int main(int argc, char *argv[]){ | |
pthread_t receiver_thread; | |
concurrent_queue<intptr_t> queue; | |
pthread_create(&receiver_thread, NULL, thread_func, &queue); | |
for(int i = 0; i<10000000; i++){ | |
// | |
// | |
queue.push(intptr_t(i+1)); | |
} | |
queue.push(intptr_t(0)); | |
pthread_join(receiver_thread, NULL); | |
return EXIT_SUCCESS; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment