Instantly share code, notes, and snippets.

Embed
What would you like to do?
A program which simulates the read and write behavior of LinkedIn's GraphDB
/* Instructions on compilation and execution
* =========================================
*
* Compile this program with pthreads:
*
* g++ -Wall -lpthread -o graphdb-simulator graphdb-simulator.cpp
*
* Before you run this program, you need to create the following
* directories:
*
* ./mmap_set_0 <-- fill it with data files named [0..N).dat
* ./mmap_set_1 <-- fill it with data files named [0..N).dat
* ./mmap_live_data
*
* The value of N is determined by the NUM_FILES compile time constant. The
* size of the file is determined by the FILE_SIZE compile constant.
*
* - Apurva Mehta <amehta@linkedin.com>
*/
#include <iostream>
#include <string>
#include <sys/mman.h>
#include <cerrno>
#include <fcntl.h>
#include <unistd.h>
#include <vector>
#include <cstdlib>
#include <ctime>
#include <cstring>
#include <sstream>
#include <fstream>
#include <sys/time.h>
#include <sys/resource.h>
#include <pthread.h>
using namespace std;
static const unsigned int FILE_SIZE = 1048576 * 10; // 10MB FILE_SIZE
static const string SET_0 = "./mmap_set_0";
static const string LIVE_DATA = "./mmap_live_data";
static const string SET_1 = "./mmap_set_1";
static const unsigned int NUM_FILES = 2500;
static const unsigned int NUM_READ_THREADS = 25;
static const unsigned int NUM_WRITE_THREADS = 10;
static pthread_rwlock_t addr_lock = PTHREAD_RWLOCK_INITIALIZER;
vector<unsigned long* > open_and_mmap_files_from_dir(string dname)
{
vector<unsigned long* > addresses(NUM_FILES);
for (unsigned int i = 0; i < NUM_FILES; ++i){
ostringstream os;
os << dname << "/" << i << ".dat";
const char *filename = os.str().c_str();
int fd = open(filename, O_RDONLY);
if (fd == -1){
cerr << "Unable to open " << filename << ". Error code: " << errno << endl;
exit(1);
}
unsigned long *data = (unsigned long *)mmap(0, FILE_SIZE, PROT_READ, MAP_PRIVATE, fd, 0);
if ((void *)data == MAP_FAILED) {
cerr << "Could not mmap " << filename << ". Error code: " << errno << endl;
exit(1);
}
close(fd);
addresses[i] = data;
}
return addresses;
}
unsigned long long timeval_to_us(struct timeval tv)
{
unsigned long long ret = tv.tv_sec * 1000000;
return ret + tv.tv_usec;
}
unsigned long get_user_time_ms(struct rusage start_time, struct rusage end_time)
{
return (timeval_to_us(end_time.ru_utime) - timeval_to_us(start_time.ru_utime)) / 1000;
}
unsigned long get_system_time_ms(struct rusage start_time, struct rusage end_time)
{
return (timeval_to_us(end_time.ru_stime) - timeval_to_us(start_time.ru_stime)) / 1000;
}
string get_time_string()
{
time_t raw_time;
char buffer[80];
struct tm *timeinfo;
time(&raw_time);
timeinfo = localtime(&raw_time);
strftime(buffer, 80, "%D %T %Z", timeinfo);
return string(buffer);
}
void open_mmap_read_close(const char *filename)
{
int fd = open(filename, O_RDONLY);
if (fd == -1){
cerr << "Unable to open " << filename << ". Error code: " << errno << endl;
exit(1);
}
unsigned long *data = (unsigned long *)mmap(0, FILE_SIZE, PROT_READ, MAP_PRIVATE, fd, 0);
if (data == MAP_FAILED) {
cerr << "Could not mmap " << filename << ". Error code: " << errno << endl;
exit(1);
}
unsigned int num_ints = FILE_SIZE / sizeof(unsigned long);
int read_bytes = 0;
unsigned long tmp;
for (unsigned int i = 0; i < num_ints; ++i ){
tmp = data[i];
read_bytes += sizeof(unsigned long);
}
if (munmap(data, FILE_SIZE) == -1) cerr << "Cound not unmap file" << endl;
cout << get_time_string() << " | Opened, mmapped, and closed file: " << string(filename) << ". Read: " << read_bytes << " bytes." << endl;
close(fd);
}
void *read_randomly_from_addresses(void *addrs)
{
struct rusage r_start_time;
struct rusage r_end_time;
struct timeval start_time;
struct timeval end_time;
unsigned long long start;
unsigned long long end;
unsigned long tmp;
vector<unsigned long*> *mapped_addrs = (vector<unsigned long*> *) addrs;
while(true){
int lock_rc = 0;
while ((lock_rc = pthread_rwlock_rdlock(&addr_lock)) == EBUSY);
if (lock_rc != 0){
cerr << get_time_string() << " | Failed to acquire read lock for thread " << (unsigned int) pthread_self() << ". Error : " << lock_rc << ". Exiting." << endl;
return addrs;
}
unsigned int file_no = rand() % mapped_addrs->size();
unsigned int file_sz = FILE_SIZE/sizeof(unsigned long);
unsigned int offset = 0 ;
unsigned int rd_size = rand() % (file_sz - offset);
getrusage(RUSAGE_THREAD, &r_start_time);
gettimeofday(&start_time, NULL);
for (unsigned int i = 0; i < rd_size; i++)
tmp = (*mapped_addrs)[file_no][offset+i];
lock_rc = pthread_rwlock_unlock(&addr_lock);
if (lock_rc != 0)
cerr << get_time_string() << " | WARN : Could not release read lock in thread " << (unsigned int) pthread_self() << ". Error " << lock_rc << endl;
getrusage(RUSAGE_THREAD, &r_end_time);
gettimeofday(&end_time, NULL);
start = timeval_to_us(start_time);
end = timeval_to_us(end_time);
if ((end - start) > 100000){
// print diagnostics if memory access took more that 500ms.
cout << get_time_string() << " | usr: " << get_user_time_ms(r_start_time, r_end_time) << " ms | sys: " << get_system_time_ms(r_start_time, r_end_time) << " ms | elapsed time : " << (end - start) / 1000 << " ms." << endl;
}
usleep(20000);
}
return 0;
}
void *create_data_and_update_pool(void *pool)
{
vector<unsigned long *> *addresses = (vector<unsigned long *> *) pool;
while (true) {
// Create a new .dat file. Name will be Epoch time in microseconds.
ostringstream fname;
timeval tv;
gettimeofday(&tv, NULL);
unsigned long long time = timeval_to_us(tv);
fname << LIVE_DATA << "/" << (unsigned int) pthread_self() << "-" << time << ".dat";
ofstream fout;
fout.open(fname.str().c_str(), ios::trunc | ios::binary);
// Populate the file in random sized increments, with random waits (upto 10 ms) in between updates. This simulates our write traffic
unsigned int num_written = 0;
const unsigned int num_ints = FILE_SIZE/sizeof(unsigned long);
while (num_written < (num_ints-1)){
unsigned int wr_size = rand() % (num_ints - num_written);
for (unsigned int i = 0; i < wr_size; i++){
unsigned long data = (unsigned long)rand();
fout.write(reinterpret_cast<const char *>(&data), sizeof(data));
++num_written;
}
usleep(rand() % 100000);
}
fout.close();
cout << get_time_string() << " | Finished writing " << fname.str() << endl;
// Reopen the file in readonly mode.
int fd = open(fname.str().c_str(), O_RDONLY);
if (fd == -1){
cerr << get_time_string() << " | Could not open " << fname.str() << " for reading. Error code: " << errno << endl;
exit(1);
}
unsigned long *base = (unsigned long *) mmap(NULL, FILE_SIZE, PROT_READ, MAP_PRIVATE, fd, 0);
if ((void *)base == MAP_FAILED) {
cerr << get_time_string() << " | Could not mmap " << fname.str() << " for reading. Error code : " << errno << endl;
exit(1);
}
close(fd);
// Lock the table and update it.
int lock_rc = 0;
while ((lock_rc = pthread_rwlock_wrlock(&addr_lock)) == EBUSY);
if (lock_rc != 0) {
cerr << get_time_string() << " | Could not acquire write lock while trying to insert file: " << fname.str() << ". Return value : " << lock_rc << endl;
return pool;
}
unsigned int file_no = rand() % addresses->size();
unsigned long *old_addr = (*addresses)[file_no];
// Add the new file
(*addresses)[file_no] = base ;
// Unmap the old file.
if (munmap(old_addr, FILE_SIZE) != 0)
cerr << get_time_string() << " | Warn: could not mmap file at offset " << file_no << ". Error code : " << errno << endl;
lock_rc = pthread_rwlock_unlock(&addr_lock);
if (lock_rc != 0){
cerr << get_time_string() << " | Could not unlock write lock for file : " << fname.str() << ". Aborting" <<endl;
exit(1);
}
cout << get_time_string() << " | Successfully added " << fname.str() << " to working set at index " << file_no << endl;
}
return pool;
}
int main()
{
srand(time(0));
for (unsigned int i = 0; i < NUM_FILES; ++i){
ostringstream os;
os << SET_0 << "/" << i << ".dat";
const char *filename = os.str().c_str();
open_mmap_read_close(filename);
}
vector<unsigned long*> mapped_addrs = open_and_mmap_files_from_dir(SET_1);
vector<pthread_t> read_threads(NUM_READ_THREADS);
vector<pthread_t> write_threads(NUM_WRITE_THREADS);
void *res;
pthread_rwlock_init(&addr_lock, NULL);
cout << get_time_string() << " | Starting read threads.. " << endl;
for (unsigned int i = 0; i < NUM_READ_THREADS; ++i)
pthread_create(&read_threads[i], 0, &read_randomly_from_addresses, (void *) &mapped_addrs);
cout << get_time_string() << " | Starting write threads.. " << endl;
for(unsigned int i = 0; i < NUM_WRITE_THREADS; ++i)
pthread_create(&write_threads[i], 0, &create_data_and_update_pool, (void *) &mapped_addrs);
pthread_join(write_threads[0], &res);
pthread_join(read_threads[0], &res);
for(vector<unsigned long*>::iterator i = mapped_addrs.begin(); i != mapped_addrs.end(); ++i)
munmap(*i, FILE_SIZE);
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment