Created
June 24, 2017 08:23
-
-
Save siddontang/606288f8bc03926f1731f4a998359643 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
extern crate rocksdb; | |
extern crate byteorder; | |
extern crate rand; | |
extern crate tempdir; | |
use std::fs; | |
use std::sync::Arc; | |
use std::thread; | |
use std::time::Instant; | |
use rand::{Rng, ThreadRng}; | |
use rocksdb::{DB, Options, SliceTransform, BlockBasedOptions, Writable}; | |
use byteorder::{ByteOrder, BigEndian, WriteBytesExt}; | |
use tempdir::TempDir; | |
pub struct CFOptions<'a> { | |
cf: &'a str, | |
options: Options, | |
} | |
impl<'a> CFOptions<'a> { | |
pub fn new(cf: &'a str, options: Options) -> CFOptions<'a> { | |
CFOptions { | |
cf: cf, | |
options: options, | |
} | |
} | |
} | |
pub struct FixedPrefixSliceTransform { | |
pub prefix_len: usize, | |
} | |
impl FixedPrefixSliceTransform { | |
pub fn new(prefix_len: usize) -> FixedPrefixSliceTransform { | |
FixedPrefixSliceTransform { prefix_len: prefix_len } | |
} | |
} | |
impl SliceTransform for FixedPrefixSliceTransform { | |
fn transform<'a>(&mut self, key: &'a [u8]) -> &'a [u8] { | |
&key[..self.prefix_len] | |
} | |
fn in_domain(&mut self, key: &[u8]) -> bool { | |
key.len() >= self.prefix_len | |
} | |
fn in_range(&mut self, _: &[u8]) -> bool { | |
true | |
} | |
} | |
pub fn new_engine_opt(path: &str, opts: Options, cfs_opts: Vec<CFOptions>) -> DB { | |
fs::remove_dir_all(path); | |
let mut cfs = vec![]; | |
let mut cfs_opts_ref = vec![]; | |
if let Some(x) = cfs_opts.iter().find(|x| x.cf == "default") { | |
cfs.push("default"); | |
cfs_opts_ref.push(&x.options); | |
} | |
let mut db = DB::open_cf(opts, path, cfs.as_slice(), cfs_opts_ref.as_slice()).unwrap(); | |
for x in &cfs_opts { | |
if x.cf == "default" { | |
continue; | |
} | |
db.create_cf(x.cf, &x.options).unwrap(); | |
} | |
db | |
} | |
fn get_rocksdb_db_option(allow_concurrent: bool) -> Options { | |
let mut opts = Options::new(); | |
opts.set_max_background_compactions(6); | |
opts.set_max_background_flushes(2); | |
opts.set_base_background_compactions(1); | |
opts.set_max_manifest_file_size(20 * 1024 * 1024); | |
opts.create_if_missing(true); | |
opts.set_max_open_files(40960); | |
opts.enable_statistics(); | |
opts.set_stats_dump_period_sec(600); | |
opts.set_compaction_readahead_size(0); | |
opts.set_max_subcompactions(1); | |
opts.set_writable_file_max_buffer_size(1024 * 1024); | |
opts.set_use_direct_io_for_flush_and_compaction(false); | |
opts.allow_concurrent_memtable_write(allow_concurrent); | |
opts | |
} | |
struct CfOptValues { | |
pub block_size: i64, | |
pub block_cache_size: i64, | |
pub cache_index_and_filter_blocks: bool, | |
pub use_bloom_filter: bool, | |
pub whole_key_filtering: bool, | |
pub bloom_bits_per_key: i64, | |
pub block_based_filter: bool, | |
pub compression_per_level: String, | |
pub write_buffer_size: i64, | |
pub max_write_buffer_number: i64, | |
pub min_write_buffer_number_to_merge: i64, | |
pub max_bytes_for_level_base: i64, | |
pub target_file_size_base: i64, | |
pub level_zero_file_num_compaction_trigger: i64, | |
pub level_zero_slowdown_writes_trigger: i64, | |
pub level_zero_stop_writes_trigger: i64, | |
} | |
const KB: u64 = 1024; | |
const MB: u64 = 1024 * KB; | |
const GB: u64 = 1024 * MB; | |
impl Default for CfOptValues { | |
fn default() -> CfOptValues { | |
CfOptValues { | |
block_size: 64 * KB as i64, | |
block_cache_size: 256 * MB as i64, | |
cache_index_and_filter_blocks: true, | |
use_bloom_filter: false, | |
whole_key_filtering: true, | |
bloom_bits_per_key: 10, | |
block_based_filter: false, | |
compression_per_level: String::from("no:no:lz4:lz4:lz4:zstd:zstd"), | |
write_buffer_size: 128 * MB as i64, | |
max_write_buffer_number: 5, | |
min_write_buffer_number_to_merge: 1, | |
max_bytes_for_level_base: 512 * MB as i64, | |
target_file_size_base: 32 * MB as i64, | |
level_zero_file_num_compaction_trigger: 4, | |
level_zero_slowdown_writes_trigger: 20, | |
level_zero_stop_writes_trigger: 36, | |
} | |
} | |
} | |
fn get_rocksdb_cf_option(default_values: CfOptValues) | |
-> Options { | |
let mut block_base_opts = BlockBasedOptions::new(); | |
block_base_opts.set_block_size(default_values.block_size as usize); | |
block_base_opts.set_lru_cache(default_values.block_cache_size as usize); | |
block_base_opts.set_cache_index_and_filter_blocks(default_values.cache_index_and_filter_blocks); | |
if default_values.use_bloom_filter { | |
block_base_opts.set_bloom_filter(default_values.bloom_bits_per_key as i32, default_values.block_based_filter); | |
block_base_opts.set_whole_key_filtering(default_values.whole_key_filtering); | |
} | |
let mut opts = Options::new(); | |
opts.set_block_based_table_factory(&block_base_opts); | |
opts.set_write_buffer_size(default_values.write_buffer_size as u64); | |
opts.set_max_write_buffer_number(default_values.max_write_buffer_number as i32); | |
opts.set_min_write_buffer_number_to_merge(default_values.min_write_buffer_number_to_merge as i32); | |
opts.set_max_bytes_for_level_base(default_values.max_bytes_for_level_base as u64); | |
opts.set_target_file_size_base(default_values.target_file_size_base as u64); | |
opts.set_level_zero_file_num_compaction_trigger(default_values.level_zero_file_num_compaction_trigger as i32); | |
opts.set_level_zero_slowdown_writes_trigger(default_values.level_zero_slowdown_writes_trigger as i32); | |
opts.set_level_zero_stop_writes_trigger(default_values.level_zero_stop_writes_trigger as i32); | |
opts | |
} | |
fn get_rocksdb_default_cf_option() -> Options { | |
let mut default_values = CfOptValues::default(); | |
default_values.block_cache_size = 1 * 1024 * 1024 * 1024; | |
default_values.use_bloom_filter = true; | |
default_values.whole_key_filtering = true; | |
let mut opts = get_rocksdb_cf_option(default_values); | |
opts | |
} | |
fn get_rocksdb_raftlog_cf_option() -> Options { | |
let mut default_values = CfOptValues::default(); | |
default_values.block_cache_size = 1024 * 1024 * 1024 as i64; | |
let mut opts = get_rocksdb_cf_option(default_values); | |
opts.set_memtable_insert_hint_prefix_extractor("RaftPrefixSliceTransform", | |
Box::new(FixedPrefixSliceTransform::new(8))) | |
.unwrap(); | |
opts | |
} | |
fn raft_log_key(region_id: u64, log_id: u64) -> Vec<u8> { | |
let mut key = Vec::with_capacity(17); | |
key.write_u64::<BigEndian>(region_id).unwrap(); | |
key.push(b':'); | |
key.write_u64::<BigEndian>(log_id).unwrap(); | |
key | |
} | |
fn new_bench_engine(db_path: &str, allow_concurrent: bool ) -> Arc<DB> { | |
let db_opts = get_rocksdb_db_option(allow_concurrent); | |
let cfs_opts = | |
vec![CFOptions::new("default", | |
get_rocksdb_default_cf_option()), | |
CFOptions::new("raft", | |
get_rocksdb_raftlog_cf_option())]; | |
let engine = Arc::new(new_engine_opt(db_path, db_opts, cfs_opts)); | |
engine | |
} | |
pub struct KvGenerator { | |
key_len: usize, | |
value_len: usize, | |
rng: ThreadRng, | |
} | |
impl KvGenerator { | |
pub fn new(key_len: usize, value_len: usize) -> KvGenerator { | |
KvGenerator { | |
key_len: key_len, | |
value_len: value_len, | |
rng: rand::thread_rng(), | |
} | |
} | |
} | |
impl Iterator for KvGenerator { | |
type Item = (Vec<u8>, Vec<u8>); | |
fn next(&mut self) -> Option<(Vec<u8>, Vec<u8>)> { | |
let mut k = vec![0; self.key_len]; | |
self.rng.fill_bytes(&mut k); | |
let mut v = vec![0; self.value_len]; | |
self.rng.fill_bytes(&mut v); | |
Some((k, v)) | |
} | |
} | |
fn bench_raft(db: Arc<DB>) { | |
let region_ids = vec![1, 2, 3, 4, 5]; | |
let mut r = rand::thread_rng(); | |
let mut v = vec![0; 100]; | |
for i in 0..n { | |
let key = raft_log_key(region_ids[i as usize % 5], i); | |
r.fill_bytes(&mut v); | |
let cf = db.cf_handle("raft").unwrap(); | |
db.put_cf(cf, &key, &v); | |
} | |
} | |
fn bench_default(db: Arc<DB>) { | |
let mut g = KvGenerator::new(30, 100); | |
for i in 0..n { | |
let (key, value) = g.next().unwrap(); | |
db.put(&key, &value); | |
} | |
} | |
fn bench(name: &'static str, allow_concurrent: bool, db_raft: Arc<DB>, db_default: Arc<DB>) { | |
let c1 = thread::spawn(move || { | |
//let t = Instant::now(); | |
bench_raft(db_raft); | |
//println!("raft {}, concurrent {}, cost {:?}", name, allow_concurrent, t.elapsed()) | |
}); | |
let c2 = thread::spawn(move || { | |
//let t1 = Instant::now(); | |
bench_default(db_default); | |
//println!("default {}, concurrent {}, cost {:?}", name, allow_concurrent, t1.elapsed()) | |
}); | |
c1.join().unwrap(); | |
c2.join().unwrap(); | |
} | |
fn bench_2(name: &'static str, allow_concurrent: bool, db_raft: Arc<DB>, db_default: Arc<DB>) { | |
let c1 = thread::spawn(move || { | |
//let t = Instant::now(); | |
bench_raft(db_raft); | |
//println!("raft {}-2, concurrent {}, cost {:?}", name, allow_concurrent, t.elapsed()) | |
}); | |
let d1 = db_default.clone(); | |
let c2 = thread::spawn(move || { | |
//let t1 = Instant::now(); | |
bench_default(d1); | |
//println!("default {}-2-1, concurrent {}, cost {:?}", name, allow_concurrent, t1.elapsed()) | |
}); | |
let c3 = thread::spawn(move || { | |
//let t1 = Instant::now(); | |
bench_default(db_default); | |
//println!("default {}-2-2, concurrent {}, cost {:?}", name, allow_concurrent, t1.elapsed()) | |
}); | |
c1.join().unwrap(); | |
c2.join().unwrap(); | |
c3.join().unwrap(); | |
} | |
fn bench_mix(allow_concurrent: bool) { | |
let path = TempDir::new("mix").expect(""); | |
let path_str = path.path().to_str().unwrap(); | |
{ | |
let t = Instant::now(); | |
let db = new_bench_engine(path_str, allow_concurrent); | |
bench("mix", allow_concurrent, db.clone(), db.clone()); | |
println!("1 DB, 1 Raft, 1 Apply, concurrent {}, cost {:?}", allow_concurrent, t.elapsed()); | |
} | |
{ | |
let t = Instant::now(); | |
let db = new_bench_engine(path_str, allow_concurrent); | |
bench_2("mix-2", allow_concurrent, db.clone(), db.clone()); | |
println!("1 DB, 1 Raft, 2 Apply, concurrent {}, cost {:?}", allow_concurrent, t.elapsed()); | |
} | |
} | |
fn bench_separate(allow_concurrent: bool) { | |
let d_path = TempDir::new("default").expect(""); | |
let d_path_str = d_path.path().to_str().unwrap(); | |
let r_path = TempDir::new("raft").expect(""); | |
let r_path_str = r_path.path().to_str().unwrap(); | |
{ | |
let t = Instant::now(); | |
let db_default = new_bench_engine(d_path_str, allow_concurrent); | |
let db_raft = new_bench_engine(r_path_str, allow_concurrent); | |
bench("sep", allow_concurrent, db_raft, db_default); | |
println!("2 DB, 1 Raft, 1 Apply, concurrent {}, cost {:?}", allow_concurrent, t.elapsed()); | |
} | |
{ | |
let t = Instant::now(); | |
let db_default = new_bench_engine(d_path_str, allow_concurrent); | |
let db_raft = new_bench_engine(r_path_str, allow_concurrent); | |
bench_2("sep-2", allow_concurrent, db_raft, db_default); | |
println!("2 DB, 1 Raft, 2 Apply, concurrent {}, cost {:?}", allow_concurrent, t.elapsed()); | |
} | |
} | |
const n: u64 = 1000000; | |
fn main() { | |
bench_mix(false); | |
bench_mix(true); | |
bench_separate(false); | |
bench_separate(true); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment