Created
September 13, 2023 13:45
-
-
Save jan-auer/d239241b45c359ac5b71cd37044f3988 to your computer and use it in GitHub Desktop.
dynamic sampling evaluator
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//! Types to specify conditions on data. | |
//! | |
//! The root type is [`RuleCondition`]. | |
use relay_common::glob3::GlobPatterns; | |
use relay_protocol::{Getter, Val}; | |
use serde::{Deserialize, Serialize}; | |
use serde_json::{Number, Value}; | |
use crate::utils; | |
/// Options for [`EqCondition`]. | |
#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq)] | |
#[serde(rename_all = "camelCase")] | |
pub struct EqCondOptions { | |
/// If `true`, string values are compared in case-insensitive mode. | |
/// | |
/// This has no effect on numeric or boolean comparisons. | |
#[serde(default)] | |
pub ignore_case: bool, | |
} | |
/// A condition that compares values for equality. | |
/// | |
/// This operator supports: | |
/// - boolean | |
/// - strings (with `ignore_case` flag) | |
/// - UUIDs | |
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] | |
#[serde(rename_all = "camelCase")] | |
pub struct EqCondition { | |
/// Path of the field that should match the value. | |
pub name: String, | |
/// The value to check against. | |
/// | |
/// When comparing with a string field, this value can be an array. The condition matches if any | |
/// of the provided values matches the field. | |
pub value: Value, | |
/// Configuration options for the condition. | |
#[serde(default, skip_serializing_if = "utils::is_default")] | |
pub options: EqCondOptions, | |
} | |
impl EqCondition { | |
fn cmp(&self, left: &str, right: &str) -> bool { | |
if self.options.ignore_case { | |
unicase::eq(left, right) | |
} else { | |
left == right | |
} | |
} | |
fn matches<T>(&self, instance: &T) -> bool | |
where | |
T: Getter + ?Sized, | |
{ | |
match (instance.get_value(self.name.as_str()), &self.value) { | |
(None, Value::Null) => true, | |
(Some(Val::String(f)), Value::String(ref val)) => self.cmp(f, val), | |
(Some(Val::String(f)), Value::Array(ref arr)) => arr | |
.iter() | |
.filter_map(|v| v.as_str()) | |
.any(|v| self.cmp(v, f)), | |
(Some(Val::Uuid(f)), Value::String(ref val)) => Some(f) == val.parse().ok(), | |
(Some(Val::Bool(f)), Value::Bool(v)) => f == *v, | |
_ => false, | |
} | |
} | |
} | |
macro_rules! impl_cmp_condition { | |
($struct_name:ident, $operator:tt, $doc:literal) => { | |
#[doc = $doc] | |
/// | |
/// Strings are explicitly not supported by this. | |
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] | |
pub struct $struct_name { | |
/// Path of the field that should match the value. | |
pub name: String, | |
/// The numeric value to check against. | |
pub value: Number, | |
} | |
impl $struct_name { | |
fn matches<T>(&self, instance: &T) -> bool | |
where | |
T: Getter + ?Sized, | |
{ | |
let Some(value) = instance.get_value(self.name.as_str()) else { | |
return false; | |
}; | |
// Try various conversion functions in order of expensiveness and likelihood | |
// - as_i64 is not really fast, but most values in sampling rules can be i64, so we | |
// could return early | |
// - f64 is more likely to succeed than u64, but we might lose precision | |
if let (Some(a), Some(b)) = (value.as_i64(), self.value.as_i64()) { | |
a $operator b | |
} else if let (Some(a), Some(b)) = (value.as_u64(), self.value.as_u64()) { | |
a $operator b | |
} else if let (Some(a), Some(b)) = (value.as_f64(), self.value.as_f64()) { | |
a $operator b | |
} else { | |
false | |
} | |
} | |
} | |
} | |
} | |
impl_cmp_condition!(GteCondition, >=, "A condition that applies `>=`."); | |
impl_cmp_condition!(LteCondition, <=, "A condition that applies `<=`."); | |
impl_cmp_condition!(GtCondition, >, "A condition that applies `>`."); | |
impl_cmp_condition!(LtCondition, <, "A condition that applies `<`."); | |
/// A condition that uses glob matching. | |
/// | |
/// This is similar to [`EqCondition`], but it allows for wildcards in `value`. This is slightly | |
/// more expensive to construct and check, so preferrably use [`EqCondition`] when no wildcard | |
/// matching is needed. | |
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] | |
pub struct GlobCondition { | |
/// Path of the field that should match the value. | |
pub name: String, | |
/// A list of glob patterns to check. | |
/// | |
/// Note that this cannot be a single value, it must be a list of values. | |
pub value: GlobPatterns, | |
} | |
impl GlobCondition { | |
fn matches<T>(&self, instance: &T) -> bool | |
where | |
T: Getter + ?Sized, | |
{ | |
match instance.get_value(self.name.as_str()) { | |
Some(Val::String(s)) => self.value.is_match(s), | |
_ => false, | |
} | |
} | |
} | |
/// Combines multiple conditions using logical OR. | |
/// | |
/// This condition matches if **any** of the inner conditions matches. | |
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] | |
pub struct OrCondition { | |
/// Inner rules to combine. | |
pub inner: Vec<RuleCondition>, | |
} | |
impl OrCondition { | |
fn supported(&self) -> bool { | |
self.inner.iter().all(RuleCondition::supported) | |
} | |
fn matches<T>(&self, value: &T) -> bool | |
where | |
T: Getter + ?Sized, | |
{ | |
self.inner.iter().any(|cond| cond.matches(value)) | |
} | |
} | |
/// Combines multiple conditions using logical AND. | |
/// | |
/// This condition matches if **all** of the inner conditions matches. | |
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] | |
pub struct AndCondition { | |
/// Inner rules to combine. | |
pub inner: Vec<RuleCondition>, | |
} | |
impl AndCondition { | |
fn supported(&self) -> bool { | |
self.inner.iter().all(RuleCondition::supported) | |
} | |
fn matches<T>(&self, value: &T) -> bool | |
where | |
T: Getter + ?Sized, | |
{ | |
self.inner.iter().all(|cond| cond.matches(value)) | |
} | |
} | |
/// Applies logical NOT to a condition. | |
/// | |
/// This condition matches if the inner condition does not match. | |
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] | |
pub struct NotCondition { | |
/// An inner rule to negate. | |
pub inner: Box<RuleCondition>, | |
} | |
impl NotCondition { | |
fn supported(&self) -> bool { | |
self.inner.supported() | |
} | |
fn matches<T>(&self, value: &T) -> bool | |
where | |
T: Getter + ?Sized, | |
{ | |
!self.inner.matches(value) | |
} | |
} | |
/// A condition from a sampling rule. | |
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] | |
#[serde(rename_all = "camelCase", tag = "op")] | |
pub enum RuleCondition { | |
/// A condition that compares values for equality. | |
Eq(EqCondition), | |
/// A condition that applies `>=`. | |
Gte(GteCondition), | |
/// A condition that applies `<=`. | |
Lte(LteCondition), | |
/// A condition that applies `>`. | |
Gt(GtCondition), | |
/// A condition that applies `<`. | |
Lt(LtCondition), | |
/// A condition that uses glob matching. | |
Glob(GlobCondition), | |
/// Combines multiple conditions using logical OR. | |
Or(OrCondition), | |
/// Combines multiple conditions using logical AND. | |
And(AndCondition), | |
/// Applies logical NOT to a condition. | |
Not(NotCondition), | |
/// An unsupported condition for future compatibility. | |
#[serde(other)] | |
Unsupported, | |
} | |
impl RuleCondition { | |
/// Returns a condition that matches everything. | |
pub fn all() -> Self { | |
Self::And(AndCondition { inner: Vec::new() }) | |
} | |
/// Checks if Relay supports this condition (in other words if the condition had any unknown configuration | |
/// which was serialized as "Unsupported" (because the configuration is either faulty or was created for a | |
/// newer relay that supports some other condition types) | |
pub fn supported(&self) -> bool { | |
match self { | |
RuleCondition::Unsupported => false, | |
// we have a known condition | |
RuleCondition::Gte(_) | |
| RuleCondition::Lte(_) | |
| RuleCondition::Gt(_) | |
| RuleCondition::Lt(_) | |
| RuleCondition::Eq(_) | |
| RuleCondition::Glob(_) => true, | |
// dig down for embedded conditions | |
RuleCondition::And(rules) => rules.supported(), | |
RuleCondition::Or(rules) => rules.supported(), | |
RuleCondition::Not(rule) => rule.supported(), | |
} | |
} | |
/// Returns `true` if the rule matches the given value instance. | |
pub fn matches<T>(&self, value: &T) -> bool | |
where | |
T: Getter + ?Sized, | |
{ | |
match self { | |
RuleCondition::Eq(condition) => condition.matches(value), | |
RuleCondition::Lte(condition) => condition.matches(value), | |
RuleCondition::Gte(condition) => condition.matches(value), | |
RuleCondition::Gt(condition) => condition.matches(value), | |
RuleCondition::Lt(condition) => condition.matches(value), | |
RuleCondition::Glob(condition) => condition.matches(value), | |
RuleCondition::And(conditions) => conditions.matches(value), | |
RuleCondition::Or(conditions) => conditions.matches(value), | |
RuleCondition::Not(condition) => condition.matches(value), | |
RuleCondition::Unsupported => false, | |
} | |
} | |
} | |
#[cfg(test)] | |
mod tests { | |
use super::*; | |
#[test] | |
fn deserialize() { | |
let serialized_rules = r#"[ | |
{ | |
"op":"eq", | |
"name": "field_1", | |
"value": ["UPPER","lower"], | |
"options":{ | |
"ignoreCase": true | |
} | |
}, | |
{ | |
"op":"eq", | |
"name": "field_2", | |
"value": ["UPPER","lower"] | |
}, | |
{ | |
"op":"glob", | |
"name": "field_3", | |
"value": ["1.2.*","2.*"] | |
}, | |
{ | |
"op":"not", | |
"inner": { | |
"op":"glob", | |
"name": "field_4", | |
"value": ["1.*"] | |
} | |
}, | |
{ | |
"op":"and", | |
"inner": [{ | |
"op":"glob", | |
"name": "field_5", | |
"value": ["2.*"] | |
}] | |
}, | |
{ | |
"op":"or", | |
"inner": [{ | |
"op":"glob", | |
"name": "field_6", | |
"value": ["3.*"] | |
}] | |
} | |
]"#; | |
let rules: Result<Vec<RuleCondition>, _> = serde_json::from_str(serialized_rules); | |
assert!(rules.is_ok()); | |
let rules = rules.unwrap(); | |
insta::assert_ron_snapshot!(rules, @r#" | |
[ | |
EqCondition( | |
op: "eq", | |
name: "field_1", | |
value: [ | |
"UPPER", | |
"lower", | |
], | |
options: EqCondOptions( | |
ignoreCase: true, | |
), | |
), | |
EqCondition( | |
op: "eq", | |
name: "field_2", | |
value: [ | |
"UPPER", | |
"lower", | |
], | |
), | |
GlobCondition( | |
op: "glob", | |
name: "field_3", | |
value: [ | |
"1.2.*", | |
"2.*", | |
], | |
), | |
NotCondition( | |
op: "not", | |
inner: GlobCondition( | |
op: "glob", | |
name: "field_4", | |
value: [ | |
"1.*", | |
], | |
), | |
), | |
AndCondition( | |
op: "and", | |
inner: [ | |
GlobCondition( | |
op: "glob", | |
name: "field_5", | |
value: [ | |
"2.*", | |
], | |
), | |
], | |
), | |
OrCondition( | |
op: "or", | |
inner: [ | |
GlobCondition( | |
op: "glob", | |
name: "field_6", | |
value: [ | |
"3.*", | |
], | |
), | |
], | |
), | |
]"#); | |
} | |
#[test] | |
fn unsupported_rule_deserialize() { | |
let bad_json = r#"{ | |
"op": "BadOperator", | |
"name": "foo", | |
"value": "bar" | |
}"#; | |
let rule: RuleCondition = serde_json::from_str(bad_json).unwrap(); | |
assert!(matches!(rule, RuleCondition::Unsupported)); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//! Functionality for calculating if a trace should be processed or dropped. | |
use chrono::{DateTime, Utc}; | |
use relay_base_schema::project::ProjectKey; | |
use relay_event_schema::protocol::Event; | |
use relay_protocol::Getter; | |
use relay_sampling::config::{RuleType, SamplingRule}; | |
use relay_sampling::evaluation::{ | |
merge_rules_from_configs, MatchedRuleIds, SamplingEvaluator, SamplingMatch, | |
}; | |
use relay_sampling::DynamicSamplingContext; | |
use crate::actors::project::ProjectState; | |
use crate::envelope::{Envelope, ItemType}; | |
/// The result of a sampling operation. | |
#[derive(Debug, Default, Clone, PartialEq, Eq)] | |
pub enum SamplingResult { | |
/// Keep the event. | |
/// | |
/// Relay either applied sampling rules and decided to keep the event, or was unable to parse | |
/// the rules. | |
#[default] | |
Keep, | |
/// Drop the event, due to a list of rules with provided identifiers. | |
Drop(MatchedRuleIds), | |
} | |
impl SamplingResult { | |
fn determine_from_sampling_match(sampling_match: Option<SamplingMatch>) -> Self { | |
match sampling_match { | |
Some(SamplingMatch { | |
sample_rate, | |
matched_rule_ids, | |
seed, | |
}) => { | |
let random_number = relay_sampling::evaluation::pseudo_random_from_uuid(seed); | |
relay_log::trace!( | |
sample_rate, | |
random_number, | |
"applying dynamic sampling to matching event" | |
); | |
if random_number >= sample_rate { | |
relay_log::trace!("dropping event that matched the configuration"); | |
SamplingResult::Drop(matched_rule_ids) | |
} else { | |
relay_log::trace!("keeping event that matched the configuration"); | |
SamplingResult::Keep | |
} | |
} | |
None => { | |
relay_log::trace!("keeping event that didn't match the configuration"); | |
SamplingResult::Keep | |
} | |
} | |
} | |
} | |
/// Gets the sampling match result by creating the merged configuration and matching it against | |
/// the sampling configuration. | |
fn get_sampling_match_result( | |
processing_enabled: bool, | |
project_state: Option<&ProjectState>, | |
root_project_state: Option<&ProjectState>, | |
dsc: Option<&DynamicSamplingContext>, | |
event: Option<&Event>, | |
now: DateTime<Utc>, | |
) -> Option<SamplingMatch> { | |
let mut evaluator = SamplingEvaluator::new(now, |rule_type| match rule_type { | |
RuleType::Trace => dsc.map(|dsc| dsc as &dyn Getter), | |
RuleType::Transaction => event.map(|e| e as &dyn Getter), | |
_ => None, | |
}); | |
if let Some(state) = project_state { | |
if let Some(result) = evaluator.evaluate_all(state.iter_rules(RuleType::Transaction)) { | |
return Some(result); | |
} | |
} | |
if let Some(state) = root_project_state { | |
if let Some(result) = evaluator.evaluate_all(state.iter_rules(RuleType::Trace)) { | |
return Some(result); | |
} | |
} | |
evaluator.evaluate_all(rules); | |
None | |
} | |
/// Runs dynamic sampling on an incoming event/dsc and returns whether or not the event should be | |
/// kept or dropped. | |
pub fn get_sampling_result( | |
processing_enabled: bool, | |
project_state: Option<&ProjectState>, | |
root_project_state: Option<&ProjectState>, | |
dsc: Option<&DynamicSamplingContext>, | |
event: Option<&Event>, | |
) -> SamplingResult { | |
let sampling_result = get_sampling_match_result( | |
processing_enabled, | |
project_state, | |
root_project_state, | |
dsc, | |
event, | |
// For consistency reasons we take a snapshot in time and use that time across all code that | |
// requires it. | |
Utc::now(), | |
); | |
SamplingResult::determine_from_sampling_match(sampling_result) | |
} | |
/// Runs dynamic sampling if the dsc and root project state are not None and returns whether the | |
/// transactions received with such dsc and project state would be kept or dropped by dynamic | |
/// sampling. | |
pub fn is_trace_fully_sampled( | |
processing_enabled: bool, | |
root_project_state: Option<&ProjectState>, | |
dsc: Option<&DynamicSamplingContext>, | |
) -> Option<bool> { | |
let dsc = dsc?; | |
let root_project_state = root_project_state?; | |
// If the sampled field is not set, we prefer to not tag the error since we have no clue on | |
// whether the head of the trace was kept or dropped on the client side. | |
// In addition, if the head of the trace was dropped on the client we will immediately mark | |
// the trace as not fully sampled. | |
if !(dsc.sampled?) { | |
return Some(false); | |
} | |
let sampling_result = get_sampling_result( | |
processing_enabled, | |
None, | |
Some(root_project_state), | |
Some(dsc), | |
None, | |
); | |
let sampled = match sampling_result { | |
SamplingResult::Keep => true, | |
SamplingResult::Drop(_) => false, | |
}; | |
Some(sampled) | |
} | |
/// Returns the project key defined in the `trace` header of the envelope. | |
/// | |
/// This function returns `None` if: | |
/// - there is no [`DynamicSamplingContext`] in the envelope headers. | |
/// - there are no transactions or events in the envelope, since in this case sampling by trace is redundant. | |
pub fn get_sampling_key(envelope: &Envelope) -> Option<ProjectKey> { | |
// If the envelope item is not of type transaction or event, we will not return a sampling key | |
// because it doesn't make sense to load the root project state if we don't perform trace | |
// sampling. | |
envelope | |
.get_item_by(|item| item.ty() == &ItemType::Transaction || item.ty() == &ItemType::Event)?; | |
envelope.dsc().map(|dsc| dsc.public_key) | |
} | |
#[cfg(test)] | |
mod tests { | |
use relay_base_schema::events::EventType; | |
use relay_event_schema::protocol::{EventId, LenientString}; | |
use relay_protocol::Annotated; | |
use relay_sampling::condition::{EqCondOptions, EqCondition, RuleCondition}; | |
use relay_sampling::config::{ | |
RuleId, RuleType, SamplingConfig, SamplingMode, SamplingRule, SamplingValue, | |
}; | |
use similar_asserts::assert_eq; | |
use uuid::Uuid; | |
use super::*; | |
use crate::testutils::project_state_with_config; | |
fn eq(name: &str, value: &[&str], ignore_case: bool) -> RuleCondition { | |
RuleCondition::Eq(EqCondition { | |
name: name.to_owned(), | |
value: value.iter().map(|s| s.to_string()).collect(), | |
options: EqCondOptions { ignore_case }, | |
}) | |
} | |
fn mocked_event(event_type: EventType, transaction: &str, release: &str) -> Event { | |
Event { | |
id: Annotated::new(EventId::new()), | |
ty: Annotated::new(event_type), | |
transaction: Annotated::new(transaction.to_string()), | |
release: Annotated::new(LenientString(release.to_string())), | |
..Event::default() | |
} | |
} | |
fn mocked_simple_dynamic_sampling_context( | |
sample_rate: Option<f64>, | |
release: Option<&str>, | |
transaction: Option<&str>, | |
environment: Option<&str>, | |
sampled: Option<bool>, | |
) -> DynamicSamplingContext { | |
DynamicSamplingContext { | |
trace_id: Uuid::new_v4(), | |
public_key: "12345678901234567890123456789012".parse().unwrap(), | |
release: release.map(|value| value.to_string()), | |
environment: environment.map(|value| value.to_string()), | |
transaction: transaction.map(|value| value.to_string()), | |
sample_rate, | |
user: Default::default(), | |
other: Default::default(), | |
replay_id: None, | |
sampled, | |
} | |
} | |
fn mocked_sampling_rule(id: u32, ty: RuleType, sample_rate: f64) -> SamplingRule { | |
SamplingRule { | |
condition: RuleCondition::all(), | |
sampling_value: SamplingValue::SampleRate { value: sample_rate }, | |
ty, | |
id: RuleId(id), | |
time_range: Default::default(), | |
decaying_fn: Default::default(), | |
} | |
} | |
#[test] | |
/// Tests that an event is kept when there is a match and we have 100% sample rate. | |
fn test_get_sampling_result_return_keep_with_match_and_100_sample_rate() { | |
let project_state = project_state_with_config(SamplingConfig { | |
rules: vec![], | |
rules_v2: vec![mocked_sampling_rule(1, RuleType::Transaction, 1.0)], | |
mode: SamplingMode::Received, | |
}); | |
let event = mocked_event(EventType::Transaction, "transaction", "2.0"); | |
let result = get_sampling_result(true, Some(&project_state), None, None, Some(&event)); | |
assert_eq!(result, SamplingResult::Keep) | |
} | |
#[test] | |
/// Tests that an event is dropped when there is a match and we have 0% sample rate. | |
fn test_get_sampling_result_return_drop_with_match_and_0_sample_rate() { | |
let project_state = project_state_with_config(SamplingConfig { | |
rules: vec![], | |
rules_v2: vec![mocked_sampling_rule(1, RuleType::Transaction, 0.0)], | |
mode: SamplingMode::Received, | |
}); | |
let event = mocked_event(EventType::Transaction, "transaction", "2.0"); | |
let result = get_sampling_result(true, Some(&project_state), None, None, Some(&event)); | |
assert_eq!( | |
result, | |
SamplingResult::Drop(MatchedRuleIds(vec![RuleId(1)])) | |
) | |
} | |
#[test] | |
/// Tests that an event is kept when there is no match. | |
fn test_get_sampling_result_return_keep_with_no_match() { | |
let project_state = project_state_with_config(SamplingConfig { | |
rules: vec![], | |
rules_v2: vec![SamplingRule { | |
condition: eq("event.transaction", &["foo"], true), | |
sampling_value: SamplingValue::SampleRate { value: 0.5 }, | |
ty: RuleType::Transaction, | |
id: RuleId(3), | |
time_range: Default::default(), | |
decaying_fn: Default::default(), | |
}], | |
mode: SamplingMode::Received, | |
}); | |
let event = mocked_event(EventType::Transaction, "bar", "2.0"); | |
let result = get_sampling_result(true, Some(&project_state), None, None, Some(&event)); | |
assert_eq!(result, SamplingResult::Keep) | |
} | |
#[test] | |
/// Tests that an event is kept when there are unsupported rules with no processing and vice versa. | |
fn test_get_sampling_result_return_keep_with_unsupported_rule() { | |
let project_state = project_state_with_config(SamplingConfig { | |
rules: vec![], | |
rules_v2: vec![ | |
mocked_sampling_rule(1, RuleType::Unsupported, 0.0), | |
mocked_sampling_rule(2, RuleType::Transaction, 0.0), | |
], | |
mode: SamplingMode::Received, | |
}); | |
let event = mocked_event(EventType::Transaction, "transaction", "2.0"); | |
let result = get_sampling_result(false, Some(&project_state), None, None, Some(&event)); | |
assert_eq!(result, SamplingResult::Keep); | |
let result = get_sampling_result(true, Some(&project_state), None, None, Some(&event)); | |
assert_eq!( | |
result, | |
SamplingResult::Drop(MatchedRuleIds(vec![RuleId(2)])) | |
) | |
} | |
#[test] | |
/// Tests that an event is kept when there is a trace match and we have 100% sample rate. | |
fn test_get_sampling_result_with_traces_rules_return_keep_when_match() { | |
let root_project_state = project_state_with_config(SamplingConfig { | |
rules: vec![], | |
rules_v2: vec![mocked_sampling_rule(1, RuleType::Trace, 1.0)], | |
mode: SamplingMode::Received, | |
}); | |
let dsc = mocked_simple_dynamic_sampling_context(Some(1.0), Some("3.0"), None, None, None); | |
let result = get_sampling_result(true, None, Some(&root_project_state), Some(&dsc), None); | |
assert_eq!(result, SamplingResult::Keep) | |
} | |
#[test] | |
/// Tests that a trace is marked as fully sampled correctly when dsc and project state are set. | |
fn test_is_trace_fully_sampled_with_valid_dsc_and_project_state() { | |
// We test with `sampled = true` and 100% rule. | |
let project_state = project_state_with_config(SamplingConfig { | |
rules: vec![], | |
rules_v2: vec![mocked_sampling_rule(1, RuleType::Trace, 1.0)], | |
mode: SamplingMode::Received, | |
}); | |
let dsc = | |
mocked_simple_dynamic_sampling_context(Some(1.0), Some("3.0"), None, None, Some(true)); | |
let result = is_trace_fully_sampled(true, Some(&project_state), Some(&dsc)).unwrap(); | |
assert!(result); | |
// We test with `sampled = true` and 0% rule. | |
let project_state = project_state_with_config(SamplingConfig { | |
rules: vec![], | |
rules_v2: vec![mocked_sampling_rule(1, RuleType::Trace, 0.0)], | |
mode: SamplingMode::Received, | |
}); | |
let dsc = | |
mocked_simple_dynamic_sampling_context(Some(1.0), Some("3.0"), None, None, Some(true)); | |
let result = is_trace_fully_sampled(true, Some(&project_state), Some(&dsc)).unwrap(); | |
assert!(!result); | |
// We test with `sampled = false` and 100% rule. | |
let project_state = project_state_with_config(SamplingConfig { | |
rules: vec![], | |
rules_v2: vec![mocked_sampling_rule(1, RuleType::Trace, 1.0)], | |
mode: SamplingMode::Received, | |
}); | |
let dsc = | |
mocked_simple_dynamic_sampling_context(Some(1.0), Some("3.0"), None, None, Some(false)); | |
let result = is_trace_fully_sampled(true, Some(&project_state), Some(&dsc)).unwrap(); | |
assert!(!result); | |
} | |
#[test] | |
/// Tests that a trace is not marked as fully sampled or not if inputs are invalid. | |
fn test_is_trace_fully_sampled_with_invalid_inputs() { | |
// We test with missing `sampled`. | |
let project_state = project_state_with_config(SamplingConfig { | |
rules: vec![], | |
rules_v2: vec![mocked_sampling_rule(1, RuleType::Trace, 1.0)], | |
mode: SamplingMode::Received, | |
}); | |
let dsc = mocked_simple_dynamic_sampling_context(Some(1.0), Some("3.0"), None, None, None); | |
let result = is_trace_fully_sampled(true, Some(&project_state), Some(&dsc)); | |
assert!(result.is_none()); | |
// We test with missing dsc and project config. | |
let result = is_trace_fully_sampled(true, None, None); | |
assert!(result.is_none()) | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//! Evaluation of dynamic sampling rules. | |
use std::fmt; | |
use std::num::ParseIntError; | |
use chrono::{DateTime, Utc}; | |
use rand::distributions::Uniform; | |
use rand::Rng; | |
use rand_pcg::Pcg32; | |
use relay_base_schema::events::EventType; | |
use relay_event_schema::protocol::Event; | |
use relay_protocol::Getter; | |
use serde::Serialize; | |
use uuid::Uuid; | |
use crate::config::{ | |
DecayingFunction, RuleId, RuleType, SamplingConfig, SamplingMode, SamplingRule, SamplingValue, | |
TimeRange, | |
}; | |
use crate::dsc::DynamicSamplingContext; | |
/// Generates a pseudo random number by seeding the generator with the given id. | |
/// | |
/// The return is deterministic, always generates the same number from the same id. | |
pub fn pseudo_random_from_uuid(id: Uuid) -> f64 { | |
let big_seed = id.as_u128(); | |
let mut generator = Pcg32::new((big_seed >> 64) as u64, big_seed as u64); | |
let dist = Uniform::new(0f64, 1f64); | |
generator.sample(dist) | |
} | |
/// Returns an iterator of references that chains together and merges rules. | |
/// | |
/// The chaining logic will take all the non-trace rules from the project and all the trace/unsupported | |
/// rules from the root project and concatenate them. | |
pub fn merge_rules_from_configs<'a>( | |
sampling_config: Option<&'a SamplingConfig>, | |
root_sampling_config: Option<&'a SamplingConfig>, | |
) -> impl Iterator<Item = &'a SamplingRule> { | |
let transaction_rules = sampling_config | |
.into_iter() | |
.flat_map(|config| config.rules_v2.iter()) | |
.filter(|&rule| rule.ty == RuleType::Transaction); | |
let trace_rules = root_sampling_config | |
.into_iter() | |
.flat_map(|config| config.rules_v2.iter()) | |
.filter(|&rule| rule.ty == RuleType::Trace); | |
transaction_rules.chain(trace_rules) | |
} | |
/// Checks whether unsupported rules result in a direct keep of the event or depending on the | |
/// type of Relay an ignore of unsupported rules. | |
fn check_unsupported_rules( | |
processing_enabled: bool, | |
sampling_config: Option<&SamplingConfig>, | |
root_sampling_config: Option<&SamplingConfig>, | |
) -> Result<(), ()> { | |
// When we have unsupported rules disable sampling for non processing relays. | |
if sampling_config.map_or(false, |config| config.unsupported()) | |
|| root_sampling_config.map_or(false, |config| config.unsupported()) | |
{ | |
if !processing_enabled { | |
return Err(()); | |
} else { | |
relay_log::error!("found unsupported rules even as processing relay"); | |
} | |
} | |
Ok(()) | |
} | |
/// Gets the sampling match result by creating the merged configuration and matching it against | |
/// the sampling configuration. | |
pub fn merge_configs_and_match( | |
processing_enabled: bool, | |
sampling_config: Option<&SamplingConfig>, | |
root_sampling_config: Option<&SamplingConfig>, | |
dsc: Option<&DynamicSamplingContext>, | |
event: Option<&Event>, | |
now: DateTime<Utc>, | |
) -> Option<SamplingMatch> { | |
// We check if there are unsupported rules in any of the two configurations. | |
check_unsupported_rules(processing_enabled, sampling_config, root_sampling_config).ok()?; | |
// We perform the rule matching with the multi-matching logic on the merged rules. | |
let rules = merge_rules_from_configs(sampling_config, root_sampling_config); | |
let mut match_result = SamplingMatch::match_against_rules(rules, event, dsc, now)?; | |
// If we have a match, we will try to derive the sample rate based on the sampling mode. | |
// | |
// Keep in mind that the sample rate received here has already been derived by the matching | |
// logic, based on multiple matches and decaying functions. | |
// | |
// The determination of the sampling mode occurs with the following priority: | |
// 1. Non-root project sampling mode | |
// 2. Root project sampling mode | |
let Some(primary_config) = sampling_config.or(root_sampling_config) else { | |
relay_log::error!("cannot sample without at least one sampling config"); | |
return None; | |
}; | |
let sample_rate = match primary_config.mode { | |
SamplingMode::Received => match_result.sample_rate, | |
SamplingMode::Total => match dsc { | |
Some(dsc) => dsc.adjusted_sample_rate(match_result.sample_rate), | |
None => match_result.sample_rate, | |
}, | |
SamplingMode::Unsupported => { | |
if processing_enabled { | |
relay_log::error!("found unsupported sampling mode even as processing Relay"); | |
} | |
return None; | |
} | |
}; | |
match_result.set_sample_rate(sample_rate); | |
// Only if we arrive at this stage, it means that we have found a match and we want to prepare | |
// the data for making the sampling decision. | |
Some(match_result) | |
} | |
/// Represents the specification for sampling an incoming event. | |
#[derive(Clone, Debug, PartialEq, Serialize)] | |
pub struct SamplingMatch { | |
/// The sample rate to use for the incoming event. | |
pub sample_rate: f64, | |
/// The seed to feed to the random number generator which allows the same number to be | |
/// generated given the same seed. | |
/// | |
/// This is especially important for trace sampling, even though we can have inconsistent | |
/// traces due to multi-matching. | |
pub seed: Uuid, | |
/// The list of rule ids that have matched the incoming event and/or dynamic sampling context. | |
pub matched_rule_ids: MatchedRuleIds, | |
} | |
impl SamplingMatch { | |
/// Setter for `sample_rate`. | |
pub fn set_sample_rate(&mut self, new_sample_rate: f64) { | |
self.sample_rate = new_sample_rate; | |
} | |
/// Matches an event and/or dynamic sampling context against the rules of the sampling configuration. | |
/// | |
/// The multi-matching algorithm used iterates by collecting and multiplying factor rules until | |
/// it finds a sample rate rule. Once a sample rate rule is found, the final sample rate is | |
/// computed by multiplying it with the previously accumulated factors. | |
/// | |
/// The default accumulated factors equal to 1 because it is the identity of the multiplication | |
/// operation, thus in case no factor rules are matched, the final result will just be the | |
/// sample rate of the matching rule. | |
/// | |
/// In case no sample rate rule is matched, we are going to return a None, signaling that no | |
/// match has been found. | |
pub fn match_against_rules<'a, I>( | |
rules: I, | |
event: Option<&Event>, | |
dsc: Option<&DynamicSamplingContext>, | |
now: DateTime<Utc>, | |
) -> Option<SamplingMatch> | |
where | |
I: Iterator<Item = &'a SamplingRule>, | |
{ | |
let mut matched_rule_ids = vec![]; | |
// Even though this seed is changed based on whether we match event or trace rules, we will | |
// still incur in inconsistent trace sampling because of multi-matching of rules across event | |
// and trace rules. | |
// | |
// An example of inconsistent trace sampling could be: | |
// /hello -> /world -> /transaction belong to trace_id = abc | |
// * /hello has uniform rule with 0.2 sample rate which will match all the transactions of the trace | |
// * each project has a single transaction rule with different factors (2, 3, 4) | |
// | |
// 1. /hello is matched with a transaction rule with a factor of 2 and uses as seed abc -> 0.2 * 2 = 0.4 sample rate | |
// 2. /world is matched with a transaction rule with a factor of 3 and uses as seed abc -> 0.2 * 3 = 0.6 sample rate | |
// 3. /transaction is matched with a transaction rule with a factor of 4 and uses as seed abc -> 0.2 * 4 = 0.8 sample rate | |
// | |
// We can see that we have 3 different samples rates but given the same seed, the random number generated will be the same. | |
let mut seed = event.and_then(|e| e.id.value()).map(|id| id.0); | |
let mut accumulated_factors = 1.0; | |
for rule in rules { | |
let matches = match rule.ty { | |
RuleType::Trace => match dsc { | |
Some(dsc) => rule.condition.matches(dsc), | |
_ => false, | |
}, | |
RuleType::Transaction => event.map_or(false, |event| match event.ty.0 { | |
Some(EventType::Transaction) => rule.condition.matches(event), | |
_ => false, | |
}), | |
_ => false, | |
}; | |
if matches { | |
if let Some(evaluator) = SamplingValueEvaluator::create(rule, now) { | |
matched_rule_ids.push(rule.id); | |
if rule.ty == RuleType::Trace { | |
if let Some(dsc) = dsc { | |
seed = Some(dsc.trace_id); | |
} | |
} | |
let value = evaluator.evaluate(now); | |
match rule.sampling_value { | |
SamplingValue::Factor { .. } => accumulated_factors *= value, | |
SamplingValue::SampleRate { .. } => { | |
return Some(SamplingMatch { | |
sample_rate: (value * accumulated_factors).clamp(0.0, 1.0), | |
seed: match seed { | |
Some(seed) => seed, | |
// In case we are not able to generate a seed, we will return a no | |
// match. | |
None => return None, | |
}, | |
matched_rule_ids: MatchedRuleIds(matched_rule_ids), | |
}); | |
} | |
} | |
} | |
} | |
} | |
// In case no match is available, we won't return any specification. | |
None | |
} | |
} | |
/// The result of a sampling operation. | |
#[derive(Debug, Default, Clone, PartialEq, Eq)] | |
pub enum SamplingResult { | |
/// Keep the event. | |
/// | |
/// Relay either applied sampling rules and decided to keep the event, or was unable to parse | |
/// the rules. | |
#[default] | |
Keep, | |
/// Drop the event, due to a list of rules with provided identifiers. | |
Drop(MatchedRuleIds), | |
} | |
pub struct SamplingEvaluator<F> { | |
now: DateTime<Utc>, | |
rule_ids: Vec<RuleId>, // vec![] | |
factor: f64, // 1 | |
getter: F, | |
} | |
impl<'a, F> SamplingEvaluator<F> | |
where | |
F: FnMut(RuleType) -> Option<&'a dyn Getter> + 'a, | |
{ | |
pub fn new(now: DateTime<Utc>, getter: F) -> Self { | |
Self { | |
now, | |
rule_ids: Vec::new(), | |
factor: 1.0, | |
getter, | |
} | |
} | |
pub fn evaluate(&mut self, rule: &SamplingRule) -> Option<SamplingResult> { | |
let instance = (self.getter)(rule.ty)?; | |
if !rule.condition.matches(instance) { | |
return None; | |
} | |
let evaluator = SamplingValueEvaluator::create(rule, self.now)?; | |
self.rule_ids.push(rule.id); | |
// TODO: For later | |
// if rule.ty == RuleType::Trace { | |
// if let Some(dsc) = dsc { | |
// seed = Some(dsc.trace_id); | |
// } | |
// } | |
let value = evaluator.evaluate(self.now); | |
match rule.sampling_value { | |
SamplingValue::Factor { .. } => self.factor *= value, | |
SamplingValue::SampleRate { .. } => { | |
return Some(todo!()); | |
} | |
} | |
None | |
} | |
pub fn evaluate_all<'b, I>(&mut self, mut rules: I) -> Option<SamplingResult> | |
where | |
I: Iterator<Item = &'b SamplingRule>, | |
{ | |
rules.find_map(|rule| self.evaluate(rule)) | |
} | |
} | |
/// Represents a list of rule ids which is used for outcomes. | |
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)] | |
pub struct MatchedRuleIds(pub Vec<RuleId>); | |
impl MatchedRuleIds { | |
/// Parses `MatchedRuleIds` from a string with concatenated rule identifiers. | |
/// | |
/// The format it parses from is: | |
/// | |
/// ```text | |
/// rule_id_1,rule_id_2,... | |
/// ``` | |
pub fn parse(value: &str) -> Result<MatchedRuleIds, ParseIntError> { | |
let mut rule_ids = vec![]; | |
for rule_id in value.split(',') { | |
rule_ids.push(RuleId(rule_id.parse()?)); | |
} | |
Ok(MatchedRuleIds(rule_ids)) | |
} | |
} | |
impl fmt::Display for MatchedRuleIds { | |
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { | |
for (i, rule_id) in self.0.iter().enumerate() { | |
if i > 0 { | |
write!(f, ",")?; | |
} | |
write!(f, "{rule_id}")?; | |
} | |
Ok(()) | |
} | |
} | |
/// A struct representing the evaluation context of a sample rate. | |
#[derive(Debug, Clone, Copy)] | |
enum SamplingValueEvaluator { | |
Linear { | |
start: DateTime<Utc>, | |
end: DateTime<Utc>, | |
initial_value: f64, | |
decayed_value: f64, | |
}, | |
Constant { | |
initial_value: f64, | |
}, | |
} | |
impl SamplingValueEvaluator { | |
/// Returns a [`SamplingValueEvaluator`] if the rule is active at the given time. | |
fn create(rule: &SamplingRule, now: DateTime<Utc>) -> Option<Self> { | |
let sampling_base_value = rule.sampling_value.value(); | |
match rule.decaying_fn { | |
DecayingFunction::Linear { decayed_value } => { | |
if let TimeRange { | |
start: Some(start), | |
end: Some(end), | |
} = rule.time_range | |
{ | |
// As in the TimeRange::contains method we use a right non-inclusive time bound. | |
if sampling_base_value > decayed_value && start <= now && now < end { | |
return Some(Self::Linear { | |
start, | |
end, | |
initial_value: sampling_base_value, | |
decayed_value, | |
}); | |
} | |
} | |
} | |
DecayingFunction::Constant => { | |
if rule.time_range.contains(now) { | |
return Some(Self::Constant { | |
initial_value: sampling_base_value, | |
}); | |
} | |
} | |
} | |
None | |
} | |
/// Evaluates the value of the sampling strategy given a the current time. | |
fn evaluate(&self, now: DateTime<Utc>) -> f64 { | |
match self { | |
Self::Linear { | |
start, | |
end, | |
initial_value, | |
decayed_value, | |
} => { | |
let now_timestamp = now.timestamp() as f64; | |
let start_timestamp = start.timestamp() as f64; | |
let end_timestamp = end.timestamp() as f64; | |
let progress_ratio = ((now_timestamp - start_timestamp) | |
/ (end_timestamp - start_timestamp)) | |
.clamp(0.0, 1.0); | |
// This interval will always be < 0. | |
let interval = decayed_value - initial_value; | |
initial_value + (interval * progress_ratio) | |
} | |
Self::Constant { initial_value } => *initial_value, | |
} | |
} | |
} | |
#[cfg(test)] | |
mod tests { | |
use crate::condition::RuleCondition; | |
use super::*; | |
#[test] | |
/// Test that the we get the same sampling decision from the same trace id | |
fn test_repeatable_seed() { | |
let id = "4a106cf6-b151-44eb-9131-ae7db1a157a3".parse().unwrap(); | |
let val1 = pseudo_random_from_uuid(id); | |
let val2 = pseudo_random_from_uuid(id); | |
assert!(val1 + f64::EPSILON > val2 && val2 + f64::EPSILON > val1); | |
} | |
#[test] | |
/// Tests if the MatchedRuleIds struct is displayed correctly as string. | |
fn matched_rule_ids_display() { | |
let matched_rule_ids = MatchedRuleIds(vec![RuleId(123), RuleId(456)]); | |
assert_eq!(matched_rule_ids.to_string(), "123,456"); | |
let matched_rule_ids = MatchedRuleIds(vec![RuleId(123)]); | |
assert_eq!(matched_rule_ids.to_string(), "123"); | |
let matched_rule_ids = MatchedRuleIds(vec![]); | |
assert_eq!(matched_rule_ids.to_string(), "") | |
} | |
#[test] | |
/// Tests if the MatchRuleIds struct is created correctly from its string representation. | |
fn matched_rule_ids_parse() { | |
assert_eq!( | |
MatchedRuleIds::parse("123,456"), | |
Ok(MatchedRuleIds(vec![RuleId(123), RuleId(456)])) | |
); | |
assert_eq!( | |
MatchedRuleIds::parse("123"), | |
Ok(MatchedRuleIds(vec![RuleId(123)])) | |
); | |
assert!(MatchedRuleIds::parse("").is_err()); | |
assert!(MatchedRuleIds::parse(",").is_err()); | |
assert!(MatchedRuleIds::parse("123.456").is_err()); | |
assert!(MatchedRuleIds::parse("a,b").is_err()); | |
} | |
macro_rules! assert_rule_ids_eq { | |
($exc:expr, $res:expr) => { | |
if ($exc.len() != $res.len()) { | |
panic!("The rule ids don't match.") | |
} | |
for (index, rule) in $res.iter().enumerate() { | |
assert_eq!(rule.id.0, $exc[index]) | |
} | |
}; | |
} | |
fn mocked_sampling_rule(id: u32, ty: RuleType, sample_rate: f64) -> SamplingRule { | |
SamplingRule { | |
condition: RuleCondition::all(), | |
sampling_value: SamplingValue::SampleRate { value: sample_rate }, | |
ty, | |
id: RuleId(id), | |
time_range: Default::default(), | |
decaying_fn: Default::default(), | |
} | |
} | |
fn merge_root_and_non_root_configs_with( | |
rules: Vec<SamplingRule>, | |
root_rules: Vec<SamplingRule>, | |
) -> Vec<SamplingRule> { | |
crate::evaluation::merge_rules_from_configs( | |
Some(&SamplingConfig { | |
rules: vec![], | |
rules_v2: rules, | |
mode: SamplingMode::Received, | |
}), | |
Some(&SamplingConfig { | |
rules: vec![], | |
rules_v2: root_rules, | |
mode: SamplingMode::Received, | |
}), | |
) | |
.cloned() | |
.collect() | |
} | |
#[test] | |
/// Tests the merged config of the two configs with rules. | |
fn test_get_merged_config_with_rules_in_both_project_config_and_root_project_config() { | |
assert_rule_ids_eq!( | |
[1, 7], | |
merge_root_and_non_root_configs_with( | |
vec![ | |
mocked_sampling_rule(1, RuleType::Transaction, 0.1), | |
mocked_sampling_rule(3, RuleType::Trace, 0.3), | |
mocked_sampling_rule(4, RuleType::Unsupported, 0.1), | |
], | |
vec![ | |
mocked_sampling_rule(5, RuleType::Transaction, 0.4), | |
mocked_sampling_rule(7, RuleType::Trace, 0.6), | |
mocked_sampling_rule(8, RuleType::Unsupported, 0.1), | |
], | |
) | |
); | |
} | |
#[test] | |
/// Tests the merged config of the two configs without rules. | |
fn test_get_merged_config_with_no_rules_in_both_project_config_and_root_project_config() { | |
assert!(merge_root_and_non_root_configs_with(vec![], vec![]).is_empty()); | |
} | |
#[test] | |
/// Tests the merged config of the project config with rules and the root project config | |
/// without rules. | |
fn test_get_merged_config_with_rules_in_project_config_and_no_rules_in_root_project_config() { | |
assert_rule_ids_eq!( | |
[1], | |
merge_root_and_non_root_configs_with( | |
vec![ | |
mocked_sampling_rule(1, RuleType::Transaction, 0.1), | |
mocked_sampling_rule(3, RuleType::Trace, 0.3), | |
mocked_sampling_rule(4, RuleType::Unsupported, 0.1), | |
], | |
vec![], | |
) | |
); | |
} | |
#[test] | |
/// Tests the merged config of the project config without rules and the root project config | |
/// with rules. | |
fn test_get_merged_config_with_no_rules_in_project_config_and_with_rules_in_root_project_config( | |
) { | |
assert_rule_ids_eq!( | |
[6], | |
merge_root_and_non_root_configs_with( | |
vec![], | |
vec![ | |
mocked_sampling_rule(4, RuleType::Transaction, 0.4), | |
mocked_sampling_rule(6, RuleType::Trace, 0.6), | |
mocked_sampling_rule(7, RuleType::Unsupported, 0.1), | |
] | |
) | |
); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment