Last active
August 29, 2015 14:17
-
-
Save kevinkreiser/2d56b9c3c31bcd40fccb to your computer and use it in GitHub Desktop.
A Data Blob Cache with Configurable Eviction Policies
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* sudo apt-get install libboost-all-dev | |
* #if you want to mmap more files than what `sysctl vm.max_map_count` says | |
* sudo sysctl -w vm.max_map_count=some_larger_number | |
* g++ blob_cache.cpp -o blob_cache -std=c++11 -g -O2 -lboost_system -lboost_filesystem | |
* blob_cache /some/directory/with/files 1073741824 | |
* blob_cache /some/directory/with/files 1073741824 mm | |
*/ | |
#include <boost/filesystem.hpp> | |
#include <thread> | |
#include <unordered_map> | |
#include <set> | |
#include <memory> | |
#include <chrono> | |
#include <random> | |
#include <string> | |
#include <vector> | |
#include <limits> | |
#include <algorithm> | |
#include <fstream> | |
#include <sys/mman.h> | |
#include <fcntl.h> | |
#include <unistd.h> | |
#include <iostream> | |
namespace { | |
//a cached item, memory mapped file with size of the file | |
struct blob_t { | |
std::shared_ptr<void> handle; | |
size_t size; | |
std::chrono::nanoseconds accessed; | |
std::string key; | |
bool operator==(const blob_t& other) { | |
return handle.get() == other.handle.get() && size == other.size && accessed == other.accessed && key == other.key; | |
} | |
}; | |
//sorting method to figure out who gets evicted from cache first | |
using eviction_strategy_t = std::function<bool (const blob_t&, const blob_t&)>; | |
//an LRU cache eviction strategy | |
const eviction_strategy_t lru_strategy = | |
[](const blob_t& a, const blob_t& b) { | |
if(a.accessed == b.accessed) | |
return a.handle.get() < b.handle.get(); | |
return a.accessed < b.accessed; | |
}; | |
//an MRU cache eviction strategy | |
const eviction_strategy_t mru_strategy = | |
[](const blob_t& a, const blob_t& b) { | |
if(a.accessed == b.accessed) | |
return a.handle.get() > b.handle.get(); | |
return a.accessed > b.accessed; | |
}; | |
//a random cache eviction strategy | |
const eviction_strategy_t rand_strategy = | |
[](const blob_t& a, const blob_t& b) { | |
return a.handle.get() < b.handle.get(); | |
}; | |
//a least recently used eviction schedule cache for accessing | |
//data blobs, not really useful for standard in memory objects | |
template <class key_t, class value_t> | |
class blob_cache_t { | |
public: | |
//how to turn a key type into a file name location | |
using key_predicate_t = std::function<std::string (const key_t&)>; | |
//how to turn a cached item (void* with size) into a value type | |
using value_predicate_t = std::function<value_t (const blob_t&)>; | |
blob_cache_t(bool use_mmap, key_predicate_t key_predicate, value_predicate_t value_predicate, size_t max_bytes, eviction_strategy_t eviction_strategy = lru_strategy): | |
key_predicate(key_predicate), value_predicate(value_predicate), cache(eviction_strategy), use_mmap(use_mmap), | |
bytes(0), max_bytes(max_bytes > 0 ? max_bytes : std::numeric_limits<size_t>::max()), hits(0), misses(0) { | |
} | |
//get a value out of cache using the key predicate, if its not already there it will be after this | |
value_t fetch(const key_t& key) { | |
//if its a cache miss go get it | |
auto file_name = key_predicate(key); | |
auto entry = lookup.find(file_name); | |
if(entry == lookup.end()) | |
return value_predicate(load(file_name)); | |
//it was a cache hit so update the timestamp | |
auto blob = *entry->second; | |
cache.erase(entry->second); | |
blob.accessed = std::chrono::system_clock::now().time_since_epoch(); | |
entry->second = cache.insert(blob).first; | |
++hits; //TODO: long running worry about roll-over | |
return value_predicate(blob); | |
} | |
//map a blob into cache | |
size_t prime(const key_t& key) { | |
auto file_name = key_predicate(key); | |
auto entry = lookup.find(file_name); | |
if(entry == lookup.end()) | |
return load(file_name).size; | |
return entry->second->size; | |
} | |
//TODO: add(file_name, void*, size_t) write blob to file and cache it | |
//hows the cache performing | |
float hit_ratio() const { | |
if(hits > 0) | |
return static_cast<float>(hits) / (static_cast<float>(hits) + static_cast<float>(misses)); | |
return 0.f; | |
} | |
//clear the cache | |
void clear() { | |
cache.clear(); | |
lookup.clear(); | |
bytes = hits = misses = 0; | |
} | |
//get the size in bytes that is currently mapped | |
size_t size() const { | |
return bytes; | |
} | |
protected: | |
//put a new item in cache | |
const blob_t& load(const std::string& file_name) { | |
//while we are over-committed evict stuff from cache | |
while(bytes > max_bytes && cache.size() > 0) { | |
auto blob = cache.begin(); | |
auto entry = lookup.find(blob->key); | |
bytes -= blob->size; | |
lookup.erase(entry); | |
cache.erase(blob); | |
} | |
//get the length | |
std::fstream file(file_name, std::ios_base::binary | std::ios_base::in | std::ios_base::ate); | |
if(!file) | |
throw std::runtime_error(file_name + "(fstream): " + strerror(errno)); | |
auto length = static_cast<size_t>(file.tellg()); | |
bytes += length; | |
std::shared_ptr<void> data; | |
//mmap the file | |
if(use_mmap) { | |
auto fd = open(file_name.c_str(), O_RDWR, 0); | |
if(fd == -1) | |
throw std::runtime_error(file_name + "(open): " + strerror(errno)); | |
void* handle = mmap(nullptr, length, PROT_READ, MAP_SHARED, fd, 0); | |
if(handle == MAP_FAILED) | |
throw std::runtime_error(file_name + "(mmap): " + strerror(errno)); | |
auto cl = close(fd); | |
if(cl == -1) | |
throw std::runtime_error(file_name + "(close): " + strerror(errno)); | |
data.reset(handle, [length](void* handle) { munmap(handle, length); }); | |
} | |
else { | |
//just read in the file | |
file.seekg(0); | |
char* handle = new char[length]; | |
file.readsome(handle, length); | |
data.reset(static_cast<void*>(handle), [](void* handle){ delete [] static_cast<char*>(handle);}); | |
} | |
//keep a copy of it | |
auto cached = cache.insert( | |
{data, length, std::chrono::system_clock::now().time_since_epoch(), file_name}); | |
lookup.insert(std::make_pair(file_name, cached.first)); | |
++misses; //TODO: long running worry about roll-over | |
return *cached.first; | |
} | |
std::set<blob_t, eviction_strategy_t> cache; | |
std::unordered_map<std::string, std::set<blob_t>::iterator> lookup; | |
key_predicate_t key_predicate; | |
value_predicate_t value_predicate; | |
bool use_mmap; | |
size_t bytes; | |
const size_t max_bytes; | |
size_t hits; | |
size_t misses; | |
}; | |
//get the files at the root_dir recursively | |
std::vector<std::string> get_files(const char* root_dir) { | |
std::vector<std::string> files; | |
for (boost::filesystem::recursive_directory_iterator i(root_dir), end; i != end; ++i) | |
if (!is_directory(i->path())) | |
files.push_back(i->path().string()); | |
return files; | |
} | |
size_t prime_cache(const std::vector<std::string>& files, blob_cache_t<std::string, std::string>& cache) { | |
//for each file | |
size_t bytes = 0; | |
for(const auto& file_name : files) | |
bytes += cache.prime(file_name); | |
return bytes; | |
} | |
size_t fetch_cache(const std::vector<std::string>& files, blob_cache_t<std::string, std::string>& cache) { | |
//for each file | |
size_t bytes = 0; | |
for(const auto& file_name : files) { | |
std::string data = cache.fetch(file_name); | |
bytes += data.size(); | |
} | |
return bytes; | |
} | |
size_t read_files(const std::vector<std::string>& files) { | |
//for each file | |
size_t bytes = 0; | |
for(const auto& file_name : files) { | |
std::fstream file(file_name, std::ios_base::binary | std::ios_base::in | std::ios_base::ate); | |
if(!file) | |
throw std::runtime_error(file_name + "(fstream): " + strerror(errno)); | |
auto length = static_cast<size_t>(file.tellg()); | |
bytes += length; | |
file.seekg(0); | |
std::string buffer(length + 1, '\0'); | |
file.readsome(&buffer.front(), length); | |
} | |
return bytes; | |
} | |
} | |
int main(int argc, char** argv) { | |
//get the list of files in the dir | |
auto files = get_files(argv[1]); | |
size_t byte_limit = std::numeric_limits<size_t>::max(); | |
if(argc > 2) | |
byte_limit = std::stoul(argv[2]); | |
bool use_mmap = false; | |
if(argc > 3) | |
use_mmap = std::string(argv[3]) == "mm"; | |
//make the cache | |
auto key_predicate = [](const std::string& key){return key;}; | |
auto value_predicate = [](const blob_t& blob) | |
{ return std::string(static_cast<const char*>(blob.handle.get()), blob.size); }; | |
blob_cache_t<std::string, std::string> cache(use_mmap, key_predicate, value_predicate, byte_limit, rand_strategy); | |
std::cout << "===Testing with " << argv[1] << "===" << std::endl; | |
//randomize | |
unsigned seed = std::chrono::system_clock::now().time_since_epoch().count(); | |
std::shuffle(files.begin(), files.end(), std::default_random_engine(seed)); | |
//load them into cache | |
std::cout << "===Priming cache===" << std::endl; | |
auto start = std::chrono::system_clock::now(); | |
auto bytes = prime_cache(files, cache); | |
auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now() - start).count(); | |
std::cout << files.size() << " took " << elapsed << " ms" << std::endl; | |
std::cout << static_cast<float>(elapsed)/static_cast<float>(files.size()) << " ms/tile" << std::endl; | |
std::cout << bytes << " bytes read" << std::endl; | |
std::cout << cache.size() << " bytes mapped" << std::endl; | |
std::cout << cache.hit_ratio() << " hit ratio" << std::endl; | |
//randomize | |
std::shuffle(files.begin(), files.end(), std::default_random_engine(seed)); | |
//scan through cache seeing the first 1mb of each item | |
std::cout << "===Reading cache===" << std::endl; | |
start = std::chrono::system_clock::now(); | |
bytes = fetch_cache(files, cache); | |
elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now() - start).count(); | |
std::cout << files.size() << " took " << elapsed << " ms" << std::endl; | |
std::cout << static_cast<float>(elapsed)/static_cast<float>(files.size()) << " ms/tile" << std::endl; | |
std::cout << bytes << " bytes read" << std::endl; | |
std::cout << cache.size() << " bytes mapped" << std::endl; | |
std::cout << cache.hit_ratio() << " hit ratio" << std::endl; | |
cache.clear(); | |
//read them directly from disk to see if its any faster | |
std::cout << "===Reading cache===" << std::endl; | |
start = std::chrono::system_clock::now(); | |
bytes = read_files(files); | |
elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now() - start).count(); | |
std::cout << files.size() << " took " << elapsed << " ms" << std::endl; | |
std::cout << static_cast<float>(elapsed)/static_cast<float>(files.size()) << " ms/tile" << std::endl; | |
std::cout << bytes << " bytes read" << std::endl; | |
return 0; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment