Skip to content

Instantly share code, notes, and snippets.

@przygienda
Last active January 24, 2017 19:28
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save przygienda/f7386a432c876988f35f7e05bfc0c0b3 to your computer and use it in GitHub Desktop.
Save przygienda/f7386a432c876988f35f7e05bfc0c0b3 to your computer and use it in GitHub Desktop.
SLOG Rust filter that can act on key/values of the log entries ...
//! Implements an SLOG filter that can filter on values of keys.
//! Preconditions a serializer that writes out and caches values
//! of keys for an `OwnedKeyValueList`
#![feature(drop_types_in_const)]
#[macro_use]
extern crate slog;
extern crate slog_term;
extern crate slog_atomic;
use std::collections::HashSet;
use std::collections::HashMap;
use std::collections::hash_map::Entry;
use std::iter::FromIterator;
use slog::*;
use std::fmt;
use std::cell::RefCell;
use std::sync::Mutex;
/// very simple serializer just storint the value
/// of key as string since otherwise it's not accessible
struct ToStringSerializer {
val: Option<String>,
}
impl ToStringSerializer {
pub fn new() -> ToStringSerializer {
ToStringSerializer { val: None }
}
pub fn value(&self) -> &Option<String> {
&self.val
}
}
macro_rules! impl_empty_emit_for {
($f:ident, $ot:ty) => {
fn $f(&mut self, _: &'static str, _: $ot)
-> ser::Result {
unreachable!();
}
};
}
macro_rules! impl_empty_emit_for_no_param {
($f:ident) => {
fn $f(&mut self, _: &'static str)
-> ser::Result {
unreachable!();
}
};
}
impl Serializer for ToStringSerializer {
// do nothing on other emits since you only care about strings
fn emit_str(&mut self, _: &'static str, val: &str) -> ser::Result {
self.val = Some(String::from(val));
Ok(())
}
fn emit_none(&mut self, _: &'static str) -> ser::Result {
self.val = None;
Ok(())
}
impl_empty_emit_for_no_param!(emit_unit);
impl_empty_emit_for!(emit_usize, usize);
impl_empty_emit_for!(emit_isize, isize);
impl_empty_emit_for!(emit_bool, bool);
impl_empty_emit_for!(emit_char, char);
impl_empty_emit_for!(emit_u8, u8);
impl_empty_emit_for!(emit_u16, u16);
impl_empty_emit_for!(emit_u32, u32);
impl_empty_emit_for!(emit_u64, u64);
impl_empty_emit_for!(emit_i8, i8);
impl_empty_emit_for!(emit_i16, i16);
impl_empty_emit_for!(emit_i32, i32);
impl_empty_emit_for!(emit_i64, i64);
impl_empty_emit_for!(emit_f32, f32);
impl_empty_emit_for!(emit_f64, f64);
impl_empty_emit_for!(emit_arguments, &fmt::Arguments);
}
/// this is a map of string values behind a string key
/// used to install
/// map of filtering values per key
pub type FilteringMap = HashMap<String, HashSet<String>>;
/// used to gather values set for set of keys
/// keyed of id of value chain and then key & its values gathered
type FilteringHash = Mutex<RefCell<HashMap<usize, FilteringMap>>>;
/// This `Drain` filters a log entry on a filtermap
/// that holds the key name in question and acceptable values
/// Key values are gathered up the whole hierarchy of inherited
/// loggers.
///
/// Example
/// =======
///
/// Logger( ... ; o!("thread" => "100");
/// log( ... ; "packet" => "send");
/// log( ... ; "packet" => "receive");
///
/// can be filtered on a map containing "thread" key component. If the
/// values contain "100" the log will be output, otherwise filtered.
/// The filtering map can contain further key "packet" and value "send".
/// With that the output for "receive" would be filtered.
///
/// More precisely, a key is ignored until present and an entry must
/// match for all the keys present any of the values to be left
/// unfiltered. Refer to the test for an example.
///
/// Usage
/// =====
///
/// Filtering in large systems that run multiple threads of same
/// code or have functionality of interest across many components,
/// modules, such as e.g. "sending packets" or "running FSM"
pub struct AttributeValuesFilter<D: Drain> {
drain: D,
/// cache of values found per value_chain id
cache: FilteringHash,
filters: FilteringMap,
level: Level,
keys: HashSet<String>,
}
impl<'a, D: Drain> AttributeValuesFilter<D> {
/// Create `NodeComponentLevelFilter`
/// * `drain` - drain to be sent to
/// * `level` - maximum level filtered, higher levels pass by
/// * `filters` - Hashmap of keys with lists of allowed values
pub fn new(drain: D, level: Level, filters: FilteringMap) -> Self {
let k = HashSet::from_iter(filters.keys().cloned());
AttributeValuesFilter {
drain: drain,
level: level,
filters: filters,
keys: k,
cache: Mutex::new(RefCell::new(HashMap::new())),
}
}
fn gather_key_values(&self,
info: &Record,
logger_values_ref: &OwnedKeyValueList)
-> Vec<(&str, Option<String>)> {
let mut res = vec![];
// dense, walk all the values, first recursively, then
// run the chain looking for keys we're interested in,
// serializing the value
if let &Some(ref logger_values_ref) = logger_values_ref.parent() {
res.extend(self.gather_key_values(info, &logger_values_ref));
}
res.extend(logger_values_ref.iter()
.filter_map(|(k, v)| {
// println!("{} {:?}", id, k);
if self.keys.contains(k) {
let mut s = ToStringSerializer::new();
v.serialize(info, k, &mut s).ok();
// println!("{} {:?} -> {:?} ", id, k, s.value());
Some((k, s.value().clone()))
} else {
None
}
})
.filter(|&(_, ref v)| v.is_some()));
res
}
/// filters record returning all matching keys that have been satisfied
fn filter_record(&self, record_values: &FilteringMap) -> HashSet<String> {
// println!("filter recs: map {:?} v: {:?}", self.filters, record_values);
HashSet::from_iter(self.filters
.iter()
.flat_map(|(key, valuechain)| -> Option<String> {
match record_values.get(key) {
None => None,
Some(v) => {
if valuechain.is_disjoint(v) {
None
} else {
Some(key.clone())
}
}
}
}))
}
fn update_cache_and_filter(&'a self, info: &Record, logger_values: &OwnedKeyValueList) -> bool {
let mut filterit = false;
if !info.level().is_at_least(self.level) {
let filter;
// get the loggers key/values possibly from cache
match self.cache
.lock()
.unwrap()
.get_mut()
.entry(logger_values.id()) {
Entry::Vacant(e) => {
// gather all the values for the keys we're interested in
// up the chain once
let mut newhash = HashMap::new();
let values = self.gather_key_values(info, logger_values);
// println!("key/values: {:#?}", values);
for k in self.keys
.iter() {
newhash.insert(k.clone().to_string(),
HashSet::from_iter(values.iter()
.filter_map(|&(ref _k, ref v)| -> Option<String> {
if k == _k && v.is_some() {
Some(v.clone().unwrap().to_string())
} else {
None
}
})));
}
// println!("key/values: {:#?}", newhash);
filter = self.filter_record(&newhash);
e.insert(newhash);
}
Entry::Occupied(e) => {
filter = self.filter_record(&e.get());
}
}
filterit = filter != self.keys;
if filterit {
/// if any borrowed values on info and we don't have a
/// logger match for all keys yet,
/// check those additionally
let missing_keys = HashSet::<&String>::from_iter(self.keys
.symmetric_difference(&filter));
if missing_keys.len() > 0 {
let locallyfound = HashSet::from_iter(info.values()
.iter()
.filter_map(|&(k, v)| -> Option<&'a String> {
if missing_keys.contains(&k.to_string()) {
match self.filters.get(k) {
None => None,
Some(ivalues) => {
let mut s = ToStringSerializer::new();
v.serialize(info, k, &mut s).ok();
if ivalues.contains(&s.value().clone().unwrap()) {
self.keys.get(k)
} else {
None
}
}
}
} else {
None
}
}));
// println!("{} locally found {:?} missing keys {:?}",
// id,locallyfound,missing_keys);
// locally found is superset or equal which means it passes
filterit = !locallyfound.is_superset(&missing_keys);
}
}
// println!("{} filter {:?}",id,filter);
}
filterit
}
}
impl<'a, D: Drain> Drain for AttributeValuesFilter<D> {
type Error = D::Error;
fn log(&self, info: &Record, logger_values: &OwnedKeyValueList) -> Result<(), Self::Error> {
if !self.update_cache_and_filter(info, logger_values) {
self.drain.log(info, logger_values)
} else {
Ok(())
}
}
}
#[cfg(test)]
mod tests {
use super::AttributeValuesFilter;
use slog::*;
use std::collections::{HashSet, HashMap};
use std::iter::FromIterator;
use std::cell::RefCell;
use std::sync::Mutex;
use std::fmt::Display;
use std::fmt::Formatter;
use std::fmt::Result as FmtResult;
use std::io;
const YES: &'static str = "YES";
const NO: &'static str = "NO";
#[derive(Debug)]
struct StringDrain<'a> {
output: Mutex<RefCell<&'a mut Option<Vec<String>>>>,
cnt: Mutex<RefCell<&'a mut i32>>,
}
impl<'a> Drain for StringDrain<'a> {
type Error = io::Error;
fn log(&self, info: &Record, _: &OwnedKeyValueList) -> io::Result<()> {
let lo = self.output.lock().unwrap();
let mut lob = lo.borrow_mut();
let fmt = format!("{:?}", info.msg());
assert!(fmt.contains(YES) && !fmt.contains(NO));
match *lob {
&mut None => {},
&mut Some(ref mut c) => c.push(fmt),
}
self.cnt.lock()
.and_then(|e| {
**e.borrow_mut() += 1;
Ok(())
})
.ok();
Ok(())
}
}
impl<'a> Display for StringDrain<'a> {
fn fmt(&self, f: &mut Formatter) -> FmtResult {
write!(f, "none")
}
}
static mut CNT: i32 = 0;
static mut OUT: Option<Vec<String>> = None;
#[test]
/// get an asserting serializer, get a couple of loggers that
/// have different nodes, components and see whether filtering
/// is applied properly on the derived `Logger` copies
/// @note: unfortunately, ugly unsafe block needed to gather
/// data for the test in the background over statics
/// drain itself is being moved into the filter & then logger.
/// cleaner would be some channel work but it's just a test.
fn nodecomponentlogfilter() {
{
unsafe {
OUT = Some(Vec::new());
let drain = StringDrain {
output: Mutex::new(RefCell::new(&mut OUT)),
cnt: Mutex::new(RefCell::new(&mut CNT)),
};
// build some small filter
let filter = AttributeValuesFilter::new(drain,
Level::Warning,
HashMap::from_iter(vec![("thread".to_string(),
HashSet::from_iter(vec!["100".to_string(), "200".to_string()])),
("direction".to_string(),
HashSet::from_iter(vec!["send".to_string(), "receive".to_string()]))]));
// Get a root logger that will log into a given drain.
let mainlog = Logger::root(filter.fuse(), o!("version" => env!("CARGO_PKG_VERSION")));
info!(mainlog, "NO: filtered");
info!(mainlog, "YES: unfiltered";
"thread" => "100", "direction" => "send");
warn ! (mainlog, "YES: unfiltered"); // level high enough
info ! (mainlog, "NO: filtered"; "thread" => "300", "direction" => "send");
let wrongthread = mainlog.new(o ! ("thread" => "100", "sub" => "sub"));
info ! (wrongthread, "NO: filtered");
let sublog = mainlog.new(o ! ("thread" => "200", "sub" => "sub"));
info! (sublog, "NO: filtered sublog");
info ! (sublog, "YES: unfiltered sublog"; "direction" => "receive");
let subsublog = sublog.new(o ! ("direction" => "send"));
let subsubsublog = subsublog.new(o ! ());
info !(subsubsublog, "YES: unfiltered subsubsublog");
// test twice same keyword with right value will give filter match
let stackedthreadslog = wrongthread.new(o ! ("thread" => "200"));
info ! (stackedthreadslog,
"YES: unfiltered since one of the threads matches"; "direction" => "send");
println!("{:#?}", OUT);
assert ! ( CNT == 5);
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment