Last active
January 24, 2017 19:28
-
-
Save przygienda/f7386a432c876988f35f7e05bfc0c0b3 to your computer and use it in GitHub Desktop.
SLOG Rust filter that can act on key/values of the log entries ...
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//! Implements an SLOG filter that can filter on values of keys. | |
//! Preconditions a serializer that writes out and caches values | |
//! of keys for an `OwnedKeyValueList` | |
#![feature(drop_types_in_const)] | |
#[macro_use] | |
extern crate slog; | |
extern crate slog_term; | |
extern crate slog_atomic; | |
use std::collections::HashSet; | |
use std::collections::HashMap; | |
use std::collections::hash_map::Entry; | |
use std::iter::FromIterator; | |
use slog::*; | |
use std::fmt; | |
use std::cell::RefCell; | |
use std::sync::Mutex; | |
/// very simple serializer just storint the value | |
/// of key as string since otherwise it's not accessible | |
struct ToStringSerializer { | |
val: Option<String>, | |
} | |
impl ToStringSerializer { | |
pub fn new() -> ToStringSerializer { | |
ToStringSerializer { val: None } | |
} | |
pub fn value(&self) -> &Option<String> { | |
&self.val | |
} | |
} | |
macro_rules! impl_empty_emit_for { | |
($f:ident, $ot:ty) => { | |
fn $f(&mut self, _: &'static str, _: $ot) | |
-> ser::Result { | |
unreachable!(); | |
} | |
}; | |
} | |
macro_rules! impl_empty_emit_for_no_param { | |
($f:ident) => { | |
fn $f(&mut self, _: &'static str) | |
-> ser::Result { | |
unreachable!(); | |
} | |
}; | |
} | |
impl Serializer for ToStringSerializer { | |
// do nothing on other emits since you only care about strings | |
fn emit_str(&mut self, _: &'static str, val: &str) -> ser::Result { | |
self.val = Some(String::from(val)); | |
Ok(()) | |
} | |
fn emit_none(&mut self, _: &'static str) -> ser::Result { | |
self.val = None; | |
Ok(()) | |
} | |
impl_empty_emit_for_no_param!(emit_unit); | |
impl_empty_emit_for!(emit_usize, usize); | |
impl_empty_emit_for!(emit_isize, isize); | |
impl_empty_emit_for!(emit_bool, bool); | |
impl_empty_emit_for!(emit_char, char); | |
impl_empty_emit_for!(emit_u8, u8); | |
impl_empty_emit_for!(emit_u16, u16); | |
impl_empty_emit_for!(emit_u32, u32); | |
impl_empty_emit_for!(emit_u64, u64); | |
impl_empty_emit_for!(emit_i8, i8); | |
impl_empty_emit_for!(emit_i16, i16); | |
impl_empty_emit_for!(emit_i32, i32); | |
impl_empty_emit_for!(emit_i64, i64); | |
impl_empty_emit_for!(emit_f32, f32); | |
impl_empty_emit_for!(emit_f64, f64); | |
impl_empty_emit_for!(emit_arguments, &fmt::Arguments); | |
} | |
/// this is a map of string values behind a string key | |
/// used to install | |
/// map of filtering values per key | |
pub type FilteringMap = HashMap<String, HashSet<String>>; | |
/// used to gather values set for set of keys | |
/// keyed of id of value chain and then key & its values gathered | |
type FilteringHash = Mutex<RefCell<HashMap<usize, FilteringMap>>>; | |
/// This `Drain` filters a log entry on a filtermap | |
/// that holds the key name in question and acceptable values | |
/// Key values are gathered up the whole hierarchy of inherited | |
/// loggers. | |
/// | |
/// Example | |
/// ======= | |
/// | |
/// Logger( ... ; o!("thread" => "100"); | |
/// log( ... ; "packet" => "send"); | |
/// log( ... ; "packet" => "receive"); | |
/// | |
/// can be filtered on a map containing "thread" key component. If the | |
/// values contain "100" the log will be output, otherwise filtered. | |
/// The filtering map can contain further key "packet" and value "send". | |
/// With that the output for "receive" would be filtered. | |
/// | |
/// More precisely, a key is ignored until present and an entry must | |
/// match for all the keys present any of the values to be left | |
/// unfiltered. Refer to the test for an example. | |
/// | |
/// Usage | |
/// ===== | |
/// | |
/// Filtering in large systems that run multiple threads of same | |
/// code or have functionality of interest across many components, | |
/// modules, such as e.g. "sending packets" or "running FSM" | |
pub struct AttributeValuesFilter<D: Drain> { | |
drain: D, | |
/// cache of values found per value_chain id | |
cache: FilteringHash, | |
filters: FilteringMap, | |
level: Level, | |
keys: HashSet<String>, | |
} | |
impl<'a, D: Drain> AttributeValuesFilter<D> { | |
/// Create `NodeComponentLevelFilter` | |
/// * `drain` - drain to be sent to | |
/// * `level` - maximum level filtered, higher levels pass by | |
/// * `filters` - Hashmap of keys with lists of allowed values | |
pub fn new(drain: D, level: Level, filters: FilteringMap) -> Self { | |
let k = HashSet::from_iter(filters.keys().cloned()); | |
AttributeValuesFilter { | |
drain: drain, | |
level: level, | |
filters: filters, | |
keys: k, | |
cache: Mutex::new(RefCell::new(HashMap::new())), | |
} | |
} | |
fn gather_key_values(&self, | |
info: &Record, | |
logger_values_ref: &OwnedKeyValueList) | |
-> Vec<(&str, Option<String>)> { | |
let mut res = vec![]; | |
// dense, walk all the values, first recursively, then | |
// run the chain looking for keys we're interested in, | |
// serializing the value | |
if let &Some(ref logger_values_ref) = logger_values_ref.parent() { | |
res.extend(self.gather_key_values(info, &logger_values_ref)); | |
} | |
res.extend(logger_values_ref.iter() | |
.filter_map(|(k, v)| { | |
// println!("{} {:?}", id, k); | |
if self.keys.contains(k) { | |
let mut s = ToStringSerializer::new(); | |
v.serialize(info, k, &mut s).ok(); | |
// println!("{} {:?} -> {:?} ", id, k, s.value()); | |
Some((k, s.value().clone())) | |
} else { | |
None | |
} | |
}) | |
.filter(|&(_, ref v)| v.is_some())); | |
res | |
} | |
/// filters record returning all matching keys that have been satisfied | |
fn filter_record(&self, record_values: &FilteringMap) -> HashSet<String> { | |
// println!("filter recs: map {:?} v: {:?}", self.filters, record_values); | |
HashSet::from_iter(self.filters | |
.iter() | |
.flat_map(|(key, valuechain)| -> Option<String> { | |
match record_values.get(key) { | |
None => None, | |
Some(v) => { | |
if valuechain.is_disjoint(v) { | |
None | |
} else { | |
Some(key.clone()) | |
} | |
} | |
} | |
})) | |
} | |
fn update_cache_and_filter(&'a self, info: &Record, logger_values: &OwnedKeyValueList) -> bool { | |
let mut filterit = false; | |
if !info.level().is_at_least(self.level) { | |
let filter; | |
// get the loggers key/values possibly from cache | |
match self.cache | |
.lock() | |
.unwrap() | |
.get_mut() | |
.entry(logger_values.id()) { | |
Entry::Vacant(e) => { | |
// gather all the values for the keys we're interested in | |
// up the chain once | |
let mut newhash = HashMap::new(); | |
let values = self.gather_key_values(info, logger_values); | |
// println!("key/values: {:#?}", values); | |
for k in self.keys | |
.iter() { | |
newhash.insert(k.clone().to_string(), | |
HashSet::from_iter(values.iter() | |
.filter_map(|&(ref _k, ref v)| -> Option<String> { | |
if k == _k && v.is_some() { | |
Some(v.clone().unwrap().to_string()) | |
} else { | |
None | |
} | |
}))); | |
} | |
// println!("key/values: {:#?}", newhash); | |
filter = self.filter_record(&newhash); | |
e.insert(newhash); | |
} | |
Entry::Occupied(e) => { | |
filter = self.filter_record(&e.get()); | |
} | |
} | |
filterit = filter != self.keys; | |
if filterit { | |
/// if any borrowed values on info and we don't have a | |
/// logger match for all keys yet, | |
/// check those additionally | |
let missing_keys = HashSet::<&String>::from_iter(self.keys | |
.symmetric_difference(&filter)); | |
if missing_keys.len() > 0 { | |
let locallyfound = HashSet::from_iter(info.values() | |
.iter() | |
.filter_map(|&(k, v)| -> Option<&'a String> { | |
if missing_keys.contains(&k.to_string()) { | |
match self.filters.get(k) { | |
None => None, | |
Some(ivalues) => { | |
let mut s = ToStringSerializer::new(); | |
v.serialize(info, k, &mut s).ok(); | |
if ivalues.contains(&s.value().clone().unwrap()) { | |
self.keys.get(k) | |
} else { | |
None | |
} | |
} | |
} | |
} else { | |
None | |
} | |
})); | |
// println!("{} locally found {:?} missing keys {:?}", | |
// id,locallyfound,missing_keys); | |
// locally found is superset or equal which means it passes | |
filterit = !locallyfound.is_superset(&missing_keys); | |
} | |
} | |
// println!("{} filter {:?}",id,filter); | |
} | |
filterit | |
} | |
} | |
impl<'a, D: Drain> Drain for AttributeValuesFilter<D> { | |
type Error = D::Error; | |
fn log(&self, info: &Record, logger_values: &OwnedKeyValueList) -> Result<(), Self::Error> { | |
if !self.update_cache_and_filter(info, logger_values) { | |
self.drain.log(info, logger_values) | |
} else { | |
Ok(()) | |
} | |
} | |
} | |
#[cfg(test)] | |
mod tests { | |
use super::AttributeValuesFilter; | |
use slog::*; | |
use std::collections::{HashSet, HashMap}; | |
use std::iter::FromIterator; | |
use std::cell::RefCell; | |
use std::sync::Mutex; | |
use std::fmt::Display; | |
use std::fmt::Formatter; | |
use std::fmt::Result as FmtResult; | |
use std::io; | |
const YES: &'static str = "YES"; | |
const NO: &'static str = "NO"; | |
#[derive(Debug)] | |
struct StringDrain<'a> { | |
output: Mutex<RefCell<&'a mut Option<Vec<String>>>>, | |
cnt: Mutex<RefCell<&'a mut i32>>, | |
} | |
impl<'a> Drain for StringDrain<'a> { | |
type Error = io::Error; | |
fn log(&self, info: &Record, _: &OwnedKeyValueList) -> io::Result<()> { | |
let lo = self.output.lock().unwrap(); | |
let mut lob = lo.borrow_mut(); | |
let fmt = format!("{:?}", info.msg()); | |
assert!(fmt.contains(YES) && !fmt.contains(NO)); | |
match *lob { | |
&mut None => {}, | |
&mut Some(ref mut c) => c.push(fmt), | |
} | |
self.cnt.lock() | |
.and_then(|e| { | |
**e.borrow_mut() += 1; | |
Ok(()) | |
}) | |
.ok(); | |
Ok(()) | |
} | |
} | |
impl<'a> Display for StringDrain<'a> { | |
fn fmt(&self, f: &mut Formatter) -> FmtResult { | |
write!(f, "none") | |
} | |
} | |
static mut CNT: i32 = 0; | |
static mut OUT: Option<Vec<String>> = None; | |
#[test] | |
/// get an asserting serializer, get a couple of loggers that | |
/// have different nodes, components and see whether filtering | |
/// is applied properly on the derived `Logger` copies | |
/// @note: unfortunately, ugly unsafe block needed to gather | |
/// data for the test in the background over statics | |
/// drain itself is being moved into the filter & then logger. | |
/// cleaner would be some channel work but it's just a test. | |
fn nodecomponentlogfilter() { | |
{ | |
unsafe { | |
OUT = Some(Vec::new()); | |
let drain = StringDrain { | |
output: Mutex::new(RefCell::new(&mut OUT)), | |
cnt: Mutex::new(RefCell::new(&mut CNT)), | |
}; | |
// build some small filter | |
let filter = AttributeValuesFilter::new(drain, | |
Level::Warning, | |
HashMap::from_iter(vec![("thread".to_string(), | |
HashSet::from_iter(vec!["100".to_string(), "200".to_string()])), | |
("direction".to_string(), | |
HashSet::from_iter(vec!["send".to_string(), "receive".to_string()]))])); | |
// Get a root logger that will log into a given drain. | |
let mainlog = Logger::root(filter.fuse(), o!("version" => env!("CARGO_PKG_VERSION"))); | |
info!(mainlog, "NO: filtered"); | |
info!(mainlog, "YES: unfiltered"; | |
"thread" => "100", "direction" => "send"); | |
warn ! (mainlog, "YES: unfiltered"); // level high enough | |
info ! (mainlog, "NO: filtered"; "thread" => "300", "direction" => "send"); | |
let wrongthread = mainlog.new(o ! ("thread" => "100", "sub" => "sub")); | |
info ! (wrongthread, "NO: filtered"); | |
let sublog = mainlog.new(o ! ("thread" => "200", "sub" => "sub")); | |
info! (sublog, "NO: filtered sublog"); | |
info ! (sublog, "YES: unfiltered sublog"; "direction" => "receive"); | |
let subsublog = sublog.new(o ! ("direction" => "send")); | |
let subsubsublog = subsublog.new(o ! ()); | |
info !(subsubsublog, "YES: unfiltered subsubsublog"); | |
// test twice same keyword with right value will give filter match | |
let stackedthreadslog = wrongthread.new(o ! ("thread" => "200")); | |
info ! (stackedthreadslog, | |
"YES: unfiltered since one of the threads matches"; "direction" => "send"); | |
println!("{:#?}", OUT); | |
assert ! ( CNT == 5); | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment