Skip to content

Instantly share code, notes, and snippets.

@deeso
Created December 11, 2014 22:52
Show Gist options
  • Save deeso/acfae92e6624ce9045be to your computer and use it in GitHub Desktop.
Save deeso/acfae92e6624ce9045be to your computer and use it in GitHub Desktop.
Practical example of how to use redis-rust and rust multi-tasks to import data into redis. If you use cargo, simple add: "https://github.com/mitsuhiko/redis-rs.git" as a dependency.
extern crate getopts;
extern crate redis;
extern crate time;
use redis::RedisResult;
use redis::Value as RV;
use redis::Commands;
use std::io::BufferedReader;
use std::char::{is_digit};
use std::collections::HashMap;
use std::io::File;
use getopts::{optopt,optflag,getopts,OptGroup};
use std::os;
use std::num;
use std::iter::AdditiveIterator;
use std::fmt;
use std::result;
use std::sync::{RWLock, Arc};
use std::time::duration::Duration;
use time::{now, strftime};
use std::io::timer::sleep;
type ArcVecSomeObj<'a> = Arc<RWLock<Vec<SomeObj<'a>>>>;
type ArcBool = Arc<RWLock<bool>>;
// Generalized (real world'ish) example of how to read a file, parse the input,
// and export the data to redis in a multithreaded manner.
// Program ran on a Corei7, 32GB RAM and completed the import of 33M records
// in about 90 Minutes (roughly 3 minutes per million records)
// The current configuration of the program will Spawn 1000 Redis Export tasks,
// which are responsible for taking the SomeObj from a shared (Atomically
// Referenced, RW Lock Protected Vector) and inserting it into a Redis
// key space. The SomeObj are derived from a Key-Value line in a file, and
// once, this line is parsed it is inserted into the shared vector.
// When the file reading is complete, the reader (Producer/main task) will
// signal all the consumer (Redis inserting) tasks, so that once the shared
// vector is empty, they know to quit (instead of waiting). The main task
// will receive a signal in the form if a u32 value via the channel and quit
// once all the redis consumers are done.
// The only other detail that might be of interest is the way the redis threads
// are monitored for quiting. I could not figure out a clean way to "Join" a
// task when once it was complete, so I simply used a HashMap, mapping the task-
// id to its receive channel. When the file reading is complete, the main task
// will poll over these tasks to determine whether or not it has completed. In
// this case, I collect the keys into a vector, and then walk over the HashMap,
// rather than iterating over the HashMap directly, because when I iterated
// over the HashMap (e.g. channel_map.iter()) this results in a compiler error
// at "channel_map.remove(id);". I think it is a bug in the Borrow checker,
// because I get an error that looks like this:
/*
main.rs:492:17: 492:28 error: cannot borrow `channel_map` as mutable because it
is also borrowed as immutable
main.rs:492 channel_map.remove(id);
main.rs:473:31: 440:42 note: previous borrow of `channel_map` occurs here;
the immutable borrow prevents subsequent moves or
mutable borrows of `channel_map`
until the borrow ends
main.rs:473 for (c_id, rx) in channel_map.iter() {
main.rs:473:6: 463:6 note: previous borrow ends here
main.rs:466 while cnt > 0 {
main.rs:500 }
with the following code at 473 instead of whats currently there:
for (c_id, rx) in channel_map.iter() {
let rx = channel_map.get(c_id).unwrap();
let res_recv = rx.try_recv();
match res_recv {
Ok(_) => {
del_ids.push (c_id);
println!("redis thread {} has completed import", c_id);
},
Err (_) => (),
}
}
*/
// This may be a bit over the top,
// but I hope this code is helpful to others - dso
struct SomeObj<'a> {
some_field1 : String,
some_field2 : String,
some_field3 : String,
some_field4 : String,
}
impl <'a> SomeObj<'a> {
pub fn new<'a>(some_field1 : &str, some_field2 : &str, inp_some_field3 : &str) -> SomeObj<'a> {
let somefield4 = "Blah"
SomeObj{some_field1:some_field1.to_string(),
some_field2:some_field2.to_string(),
some_field4:some_field4.to_string(),
some_field3:inp_some_field3.to_string()
}
}
}
impl <'a> Clone for SomeObj<'a> {
fn clone (&self) -> SomeObj<'a> {
SomeObj{some_field1:self.some_field2.clone(),
some_field2:self.some_field2.clone(),
some_field4:self.some_field4.clone(),
some_field3:self.some_field3.clone()
}
}
}
impl <'a> fmt::Show for SomeObj<'a> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let x = format!("field1: 0x{:08x} field2: 0x{:08x} field3: {} field4: {}",
self.some_field1, self.some_field2, self.some_field3, self.some_field4);
write!(f, "{}", x)
}
}
fn get_time () -> String {
let fmt = "%Y:%m:%d:%H:%M:%S";
match strftime(fmt, &now()) {
Ok(s) => s.clone(),
Err(_) => "ERROR_TIME".to_string(),
}
}
fn is_service_done (shared_done : &ArcBool) -> bool {
match shared_done.try_read() {
Some (v) => v.clone(),
None => false,
}
}
fn set_is_service_done (shared_done : &ArcBool, new_value : bool){
let mut set_service = false;
while !set_service {
match shared_done.try_write () {
Some (mut v) => {
*v = new_value;
set_service = true;
},
None => (),
}
}
}
fn is_service_active (shared_keep_running : &ArcBool) -> bool {
match shared_keep_running.try_read() {
Some (v) => v.clone(),
None => true,
}
}
fn execute_redis_insert_task (
namespace : String,
host : String,
shared_vec : ArcVecSomeObj<'static>,
shared_keep_running : ArcBool,
shared_done : ArcBool) {
let uri = format!("redis://{}/", host);
let client : Result<_, _> = redis::Client::open(uri.as_slice());
let rcon : Result<_, _> = client.unwrap().get_connection();
let mut keep_running = is_service_active (&shared_keep_running) &&
rcon.is_ok();
if !keep_running {
return
}
let con = rcon.unwrap();
let mut cnt : uint = 0;
while keep_running {
let mut opt_obj : Option <SomeObj>= None;
{
opt_obj = match shared_vec.try_write() {
Some (ref mut vec) => {
if vec.len() == 0 && is_service_done(&shared_done) {
keep_running = false;
None
} else {
cnt += 1;
if cnt % 10000 == 0 {
println!("Inserted {} fields into redis.", cnt);
}
vec.pop()
}
},
None => None
};
}
if opt_some_obj.is_some() {
let obj = opt_some_obj.unwrap();
do_redis_insert (&con, &obj, &namespace);
} else {
let time_sleep = Duration::seconds(1);
sleep(time_sleep);
}
let keep_running = keep_running && is_service_active (&shared_keep_running);
}
}
fn do_redis_insert<'a>(con : &redis::Connection,
obj : &SomeObj<'a>,
namespace : &String ) -> redis::RedisResult<()>
{
let field1_key = "field1";
let field2_key = "field2";
let field3_key = "field3";
let k = format!("{}-obj_data-{}", namespace, some_obj.some_field1);
let key = k.as_slice();
//let mut sa_str :String = format!("0x{:08x}",some_obj.some_field1);
let _ : RV = try!(redis::pipe().cmd("HMSET").arg(key)
.arg(field1_key).arg(some_obj.some_field1.clone())
.arg(field2_key).arg(some_obj.some_field2.clone())
.arg(field3_key).arg(some_obj.some_field4.clone()).query(con));
Ok(())
}
fn do_redis_code<'a>(host : &String,
vec_objs : &Vec<SomeObj<'a>>,
namespace : &String ) -> redis::RedisResult<()>
{
// general connection handling
let uri = format!("redis://{}/", host);
let client = try!(redis::Client::open(uri.as_slice()));
let con = try!(client.get_connection());
let mut cnt : uint = 0;
for obj in vec_objs.iter() {
let k = format!("{}-obj_data-{}", namespace, some_obj.some_field1);
let mut sa_str :String = format!("0x{:08x}",some_obj.some_field1);
let _ : RedisResult<RV> = con.hset(k.as_slice(),
"some_field1", sa_str.as_slice());
sa_str = format!("0x{:08x}",some_obj.some_field2);
let _ : RedisResult<RV> = con.hset(k.as_slice(),
"some_field2", sa_str.as_slice());
let _ : RedisResult<RV> = con.hset(k.as_slice(),
"some_field3", some_obj.some_field3.as_slice());
let _ : RedisResult<RV> = con.hset(k.as_slice(),
"field4_key", some_obj.some_field4.as_slice());
cnt+=1;
if cnt % 100000 == 0 {
println!("Loaded {} obj fields.", cnt);
}
}
Ok(())
}
fn get_keyed_line (line : &String ) -> HashMap<String, String> {
let mut hm = HashMap::new();
let keys_values: Vec<&str> = line.as_slice().trim().split_str("||").collect();
for kv_pair in keys_values.iter() {
//println!("kv_pair: {}", kv_pair);
let kv_string = kv_pair.to_string();
let key_value : Vec<&str> = kv_string.as_slice().split_str("::").collect();
let k = key_value[0].to_string();
let v = key_value[1].to_string();
hm.insert (k, v);
}
hm
}
fn read_objs_file<'a>(inputfile : &String, vec_objs : &mut Vec<SomeObj<'a>> ) {
let mut file = BufferedReader::new(File::open(&Path::new(inputfile.as_slice())));
let mut cnt : uint = 0;
for line in file.lines() {
let hm = match line {
Ok (ref l) => get_keyed_line (l),
Err(_) => HashMap::new(),
};
if hm.len() == 0 {
break;
}
let obj = SomeObj::new (
hm.get("some_field1").unwrap().as_slice(),
hm.get("some_field2").unwrap().as_slice(),
hm.get("some_field3").unwrap().as_slice());
//println!("{}", obj);
vec_objs.push(obj);
cnt+=1;
if cnt % 100000 == 0 {
println!("Loaded {} obj fields.", cnt);
}
}
}
fn parse_line<'a>( line : &String ) -> Option<SomeObj<'a>>{
let hm = get_keyed_line (line);
if hm.len() == 0 {
return None
}
Some(SomeObj::new (
hm.get("some_field1").unwrap().as_slice(),
hm.get("some_field2").unwrap().as_slice(),
hm.get("some_field3").unwrap().as_slice()))
}
fn queue_obj <'a> (obj : &SomeObj<'a>, shared_vec : ArcVecSomeObj<'static> ) {
let mut inserted_obj = false;
while !inserted_obj {
match shared_vec.try_write (){
Some (ref mut vec) => {
vec.push (some_obj.clone());
inserted_obj = true;
},
None => (),
}
}
}
fn read_objs_file_mt<'a>(inputfile : &String, shared_vec : ArcVecSomeObj<'static> ) {
let mut file = BufferedReader::new(File::open(&Path::new(inputfile.as_slice())));
let mut cnt : uint = 0;
for line in file.lines() {
let ls = match line {
Ok (ref l) => l.clone(),
Err(_) => String::new(),
};
if ls.len() == 0 {
continue;
}
let t_shared_obj = shared_vec.clone();
spawn (proc () {
//println!("{}", obj);
let t_shared_obj = t_shared_obj;
let opt_obj = parse_line (&ls);
match opt_obj {
Some (obj) => {queue_obj(&obj, t_shared_obj );},
None => ()
};
});
cnt+=1;
if cnt % 100000 == 0 {
if cnt % 100000 == 0 {
println!("Inserted {} fields into redis: {}.", cnt, get_time());
}
let time_sleep = Duration::seconds(2);
sleep(time_sleep);
}
}
}
fn print_usage(program: &str, _opts: &[OptGroup]) {
println!("Usage: {} [options]", program);
println!("-f\t\tfile containing identified objs");
println!("-H\t\tredis host");
println!("-h --help\tUsage");
}
fn main() {
let args: Vec<String> = os::args();
let program = args[0].clone();
let NUM_REDIS_TASKS = 1000;
//let mut obj_vec = Vec::new();
let mut shared_vec :ArcVecSomeObj<'static> = Arc::new(RWLock::new(Vec::new()));
let mut shared_done = Arc::new(RWLock::new(false));
let mut shared_keep_running = Arc::new(RWLock::new(true));
let opts = &[
optopt("f", "", "input some_field3", "NAME"),
optopt("H", "", "redi host", "NAME"),
optopt("n", "", "name space for the data", "NAME"),
optflag("h", "help", "print this help menu")
];
let matches = match getopts(args.tail(), opts) {
Ok(m) => { m }
Err(f) => { panic!(f.to_string()) }
};
if matches.opt_present("h") {
print_usage(program.as_slice(), opts);
return;
}
let inputfile = matches.opt_str("f").unwrap();
let namespace = matches.opt_str("n").unwrap();
let redishost = matches.opt_str("H").unwrap();
println!("Hello, world!");
println!("Reading objs from: {}", inputfile);
println!("Sending objs to redis host: {}", namespace);
println!("Sending objs to redis host: {}", redishost);
println!("Starting the redis consumer pool now: {}", get_time());
// clone this proc values so that the clones can be moved
// into the spawned task
let mut cnt : u32 = 0;
let mut channel_map = HashMap::new();
while cnt < NUM_REDIS_TASKS {
let (tx, rx) : (Sender<u32>, Receiver<u32> )= channel();
let t_shared_vec = shared_vec.clone();
let t_shared_done = shared_done.clone();
let t_shared_keep_running = shared_keep_running.clone();
let t_namespace = namespace.clone();
let t_host = redishost.clone();
channel_map.insert (cnt, rx);
cnt+=1;
spawn (proc () {
let t_shared_vec = t_shared_vec;
let t_shared_done = t_shared_done;
let t_shared_keep_running = t_shared_keep_running;
let t_namespace = t_namespace;
let t_host = t_host;
let child_tx = tx.clone();
execute_redis_insert_task (t_namespace, t_host, t_shared_vec,
t_shared_keep_running, t_shared_done );
child_tx.send (0);
});
}
// Old slow way
// 1) Read in all data from a file into a vector
// 2) Write to redis field by field and return
// (ran for 12+ hours and did not finish)
//read_objs_file(&inputfile, &mut obj_vec);
//do_redis_code (&redishost, &obj_vec, &namespace);
// return
// New way
// 1) Read line, and create a spurious task to parse the line, and
// insert it the result into the shared vector
// 2) The 1000+ consumer tasks will submit the data in the back ground
// as the parsing takes place
println!("Consuming the file now: {}", get_time());
read_objs_file_mt(&inputfile, shared_vec.clone());
println!("Completed consuming the file now: {}", get_time());
set_is_service_done (&shared_done, true);
println!("Waiting on the insertion task to finish");
while cnt > 0 {
let mut del_ids = Vec::new();
let mut keys = Vec::new();
{
for k in channel_map.keys() {
keys.push(*k);
}
for c_id in keys.iter() {
let rx = channel_map.get(c_id).unwrap();
let res_recv = rx.try_recv();
match res_recv {
Ok(_) => {
del_ids.push (c_id);
println!("redis thread {} has completed import", c_id);
},
Err (_) => (),
}
}
}
if del_ids.len() == 0 {
let time_sleep = Duration::seconds(1);
sleep(time_sleep);
} else {
for c_id in del_ids.iter() {
let id = c_id.clone();
channel_map.remove(id);
}
}
{
cnt = channel_map.len() as u32;
}
}
println!("Completed All processing: {}", get_time());
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment