Skip to content

Instantly share code, notes, and snippets.

@siddontang
Created June 24, 2017 08:23
Show Gist options
  • Save siddontang/606288f8bc03926f1731f4a998359643 to your computer and use it in GitHub Desktop.
Save siddontang/606288f8bc03926f1731f4a998359643 to your computer and use it in GitHub Desktop.
extern crate rocksdb;
extern crate byteorder;
extern crate rand;
extern crate tempdir;
use std::fs;
use std::sync::Arc;
use std::thread;
use std::time::Instant;
use rand::{Rng, ThreadRng};
use rocksdb::{DB, Options, SliceTransform, BlockBasedOptions, Writable};
use byteorder::{ByteOrder, BigEndian, WriteBytesExt};
use tempdir::TempDir;
pub struct CFOptions<'a> {
cf: &'a str,
options: Options,
}
impl<'a> CFOptions<'a> {
pub fn new(cf: &'a str, options: Options) -> CFOptions<'a> {
CFOptions {
cf: cf,
options: options,
}
}
}
pub struct FixedPrefixSliceTransform {
pub prefix_len: usize,
}
impl FixedPrefixSliceTransform {
pub fn new(prefix_len: usize) -> FixedPrefixSliceTransform {
FixedPrefixSliceTransform { prefix_len: prefix_len }
}
}
impl SliceTransform for FixedPrefixSliceTransform {
fn transform<'a>(&mut self, key: &'a [u8]) -> &'a [u8] {
&key[..self.prefix_len]
}
fn in_domain(&mut self, key: &[u8]) -> bool {
key.len() >= self.prefix_len
}
fn in_range(&mut self, _: &[u8]) -> bool {
true
}
}
pub fn new_engine_opt(path: &str, opts: Options, cfs_opts: Vec<CFOptions>) -> DB {
fs::remove_dir_all(path);
let mut cfs = vec![];
let mut cfs_opts_ref = vec![];
if let Some(x) = cfs_opts.iter().find(|x| x.cf == "default") {
cfs.push("default");
cfs_opts_ref.push(&x.options);
}
let mut db = DB::open_cf(opts, path, cfs.as_slice(), cfs_opts_ref.as_slice()).unwrap();
for x in &cfs_opts {
if x.cf == "default" {
continue;
}
db.create_cf(x.cf, &x.options).unwrap();
}
db
}
fn get_rocksdb_db_option(allow_concurrent: bool) -> Options {
let mut opts = Options::new();
opts.set_max_background_compactions(6);
opts.set_max_background_flushes(2);
opts.set_base_background_compactions(1);
opts.set_max_manifest_file_size(20 * 1024 * 1024);
opts.create_if_missing(true);
opts.set_max_open_files(40960);
opts.enable_statistics();
opts.set_stats_dump_period_sec(600);
opts.set_compaction_readahead_size(0);
opts.set_max_subcompactions(1);
opts.set_writable_file_max_buffer_size(1024 * 1024);
opts.set_use_direct_io_for_flush_and_compaction(false);
opts.allow_concurrent_memtable_write(allow_concurrent);
opts
}
struct CfOptValues {
pub block_size: i64,
pub block_cache_size: i64,
pub cache_index_and_filter_blocks: bool,
pub use_bloom_filter: bool,
pub whole_key_filtering: bool,
pub bloom_bits_per_key: i64,
pub block_based_filter: bool,
pub compression_per_level: String,
pub write_buffer_size: i64,
pub max_write_buffer_number: i64,
pub min_write_buffer_number_to_merge: i64,
pub max_bytes_for_level_base: i64,
pub target_file_size_base: i64,
pub level_zero_file_num_compaction_trigger: i64,
pub level_zero_slowdown_writes_trigger: i64,
pub level_zero_stop_writes_trigger: i64,
}
const KB: u64 = 1024;
const MB: u64 = 1024 * KB;
const GB: u64 = 1024 * MB;
impl Default for CfOptValues {
fn default() -> CfOptValues {
CfOptValues {
block_size: 64 * KB as i64,
block_cache_size: 256 * MB as i64,
cache_index_and_filter_blocks: true,
use_bloom_filter: false,
whole_key_filtering: true,
bloom_bits_per_key: 10,
block_based_filter: false,
compression_per_level: String::from("no:no:lz4:lz4:lz4:zstd:zstd"),
write_buffer_size: 128 * MB as i64,
max_write_buffer_number: 5,
min_write_buffer_number_to_merge: 1,
max_bytes_for_level_base: 512 * MB as i64,
target_file_size_base: 32 * MB as i64,
level_zero_file_num_compaction_trigger: 4,
level_zero_slowdown_writes_trigger: 20,
level_zero_stop_writes_trigger: 36,
}
}
}
fn get_rocksdb_cf_option(default_values: CfOptValues)
-> Options {
let mut block_base_opts = BlockBasedOptions::new();
block_base_opts.set_block_size(default_values.block_size as usize);
block_base_opts.set_lru_cache(default_values.block_cache_size as usize);
block_base_opts.set_cache_index_and_filter_blocks(default_values.cache_index_and_filter_blocks);
if default_values.use_bloom_filter {
block_base_opts.set_bloom_filter(default_values.bloom_bits_per_key as i32, default_values.block_based_filter);
block_base_opts.set_whole_key_filtering(default_values.whole_key_filtering);
}
let mut opts = Options::new();
opts.set_block_based_table_factory(&block_base_opts);
opts.set_write_buffer_size(default_values.write_buffer_size as u64);
opts.set_max_write_buffer_number(default_values.max_write_buffer_number as i32);
opts.set_min_write_buffer_number_to_merge(default_values.min_write_buffer_number_to_merge as i32);
opts.set_max_bytes_for_level_base(default_values.max_bytes_for_level_base as u64);
opts.set_target_file_size_base(default_values.target_file_size_base as u64);
opts.set_level_zero_file_num_compaction_trigger(default_values.level_zero_file_num_compaction_trigger as i32);
opts.set_level_zero_slowdown_writes_trigger(default_values.level_zero_slowdown_writes_trigger as i32);
opts.set_level_zero_stop_writes_trigger(default_values.level_zero_stop_writes_trigger as i32);
opts
}
fn get_rocksdb_default_cf_option() -> Options {
let mut default_values = CfOptValues::default();
default_values.block_cache_size = 1 * 1024 * 1024 * 1024;
default_values.use_bloom_filter = true;
default_values.whole_key_filtering = true;
let mut opts = get_rocksdb_cf_option(default_values);
opts
}
fn get_rocksdb_raftlog_cf_option() -> Options {
let mut default_values = CfOptValues::default();
default_values.block_cache_size = 1024 * 1024 * 1024 as i64;
let mut opts = get_rocksdb_cf_option(default_values);
opts.set_memtable_insert_hint_prefix_extractor("RaftPrefixSliceTransform",
Box::new(FixedPrefixSliceTransform::new(8)))
.unwrap();
opts
}
fn raft_log_key(region_id: u64, log_id: u64) -> Vec<u8> {
let mut key = Vec::with_capacity(17);
key.write_u64::<BigEndian>(region_id).unwrap();
key.push(b':');
key.write_u64::<BigEndian>(log_id).unwrap();
key
}
fn new_bench_engine(db_path: &str, allow_concurrent: bool ) -> Arc<DB> {
let db_opts = get_rocksdb_db_option(allow_concurrent);
let cfs_opts =
vec![CFOptions::new("default",
get_rocksdb_default_cf_option()),
CFOptions::new("raft",
get_rocksdb_raftlog_cf_option())];
let engine = Arc::new(new_engine_opt(db_path, db_opts, cfs_opts));
engine
}
pub struct KvGenerator {
key_len: usize,
value_len: usize,
rng: ThreadRng,
}
impl KvGenerator {
pub fn new(key_len: usize, value_len: usize) -> KvGenerator {
KvGenerator {
key_len: key_len,
value_len: value_len,
rng: rand::thread_rng(),
}
}
}
impl Iterator for KvGenerator {
type Item = (Vec<u8>, Vec<u8>);
fn next(&mut self) -> Option<(Vec<u8>, Vec<u8>)> {
let mut k = vec![0; self.key_len];
self.rng.fill_bytes(&mut k);
let mut v = vec![0; self.value_len];
self.rng.fill_bytes(&mut v);
Some((k, v))
}
}
fn bench_raft(db: Arc<DB>) {
let region_ids = vec![1, 2, 3, 4, 5];
let mut r = rand::thread_rng();
let mut v = vec![0; 100];
for i in 0..n {
let key = raft_log_key(region_ids[i as usize % 5], i);
r.fill_bytes(&mut v);
let cf = db.cf_handle("raft").unwrap();
db.put_cf(cf, &key, &v);
}
}
fn bench_default(db: Arc<DB>) {
let mut g = KvGenerator::new(30, 100);
for i in 0..n {
let (key, value) = g.next().unwrap();
db.put(&key, &value);
}
}
fn bench(name: &'static str, allow_concurrent: bool, db_raft: Arc<DB>, db_default: Arc<DB>) {
let c1 = thread::spawn(move || {
//let t = Instant::now();
bench_raft(db_raft);
//println!("raft {}, concurrent {}, cost {:?}", name, allow_concurrent, t.elapsed())
});
let c2 = thread::spawn(move || {
//let t1 = Instant::now();
bench_default(db_default);
//println!("default {}, concurrent {}, cost {:?}", name, allow_concurrent, t1.elapsed())
});
c1.join().unwrap();
c2.join().unwrap();
}
fn bench_2(name: &'static str, allow_concurrent: bool, db_raft: Arc<DB>, db_default: Arc<DB>) {
let c1 = thread::spawn(move || {
//let t = Instant::now();
bench_raft(db_raft);
//println!("raft {}-2, concurrent {}, cost {:?}", name, allow_concurrent, t.elapsed())
});
let d1 = db_default.clone();
let c2 = thread::spawn(move || {
//let t1 = Instant::now();
bench_default(d1);
//println!("default {}-2-1, concurrent {}, cost {:?}", name, allow_concurrent, t1.elapsed())
});
let c3 = thread::spawn(move || {
//let t1 = Instant::now();
bench_default(db_default);
//println!("default {}-2-2, concurrent {}, cost {:?}", name, allow_concurrent, t1.elapsed())
});
c1.join().unwrap();
c2.join().unwrap();
c3.join().unwrap();
}
fn bench_mix(allow_concurrent: bool) {
let path = TempDir::new("mix").expect("");
let path_str = path.path().to_str().unwrap();
{
let t = Instant::now();
let db = new_bench_engine(path_str, allow_concurrent);
bench("mix", allow_concurrent, db.clone(), db.clone());
println!("1 DB, 1 Raft, 1 Apply, concurrent {}, cost {:?}", allow_concurrent, t.elapsed());
}
{
let t = Instant::now();
let db = new_bench_engine(path_str, allow_concurrent);
bench_2("mix-2", allow_concurrent, db.clone(), db.clone());
println!("1 DB, 1 Raft, 2 Apply, concurrent {}, cost {:?}", allow_concurrent, t.elapsed());
}
}
fn bench_separate(allow_concurrent: bool) {
let d_path = TempDir::new("default").expect("");
let d_path_str = d_path.path().to_str().unwrap();
let r_path = TempDir::new("raft").expect("");
let r_path_str = r_path.path().to_str().unwrap();
{
let t = Instant::now();
let db_default = new_bench_engine(d_path_str, allow_concurrent);
let db_raft = new_bench_engine(r_path_str, allow_concurrent);
bench("sep", allow_concurrent, db_raft, db_default);
println!("2 DB, 1 Raft, 1 Apply, concurrent {}, cost {:?}", allow_concurrent, t.elapsed());
}
{
let t = Instant::now();
let db_default = new_bench_engine(d_path_str, allow_concurrent);
let db_raft = new_bench_engine(r_path_str, allow_concurrent);
bench_2("sep-2", allow_concurrent, db_raft, db_default);
println!("2 DB, 1 Raft, 2 Apply, concurrent {}, cost {:?}", allow_concurrent, t.elapsed());
}
}
const n: u64 = 1000000;
fn main() {
bench_mix(false);
bench_mix(true);
bench_separate(false);
bench_separate(true);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment