Skip to content

Instantly share code, notes, and snippets.

@jan-auer
Created September 13, 2023 13:45
Show Gist options
  • Save jan-auer/d239241b45c359ac5b71cd37044f3988 to your computer and use it in GitHub Desktop.
Save jan-auer/d239241b45c359ac5b71cd37044f3988 to your computer and use it in GitHub Desktop.
dynamic sampling evaluator
//! Types to specify conditions on data.
//!
//! The root type is [`RuleCondition`].
use relay_common::glob3::GlobPatterns;
use relay_protocol::{Getter, Val};
use serde::{Deserialize, Serialize};
use serde_json::{Number, Value};
use crate::utils;
/// Options for [`EqCondition`].
#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct EqCondOptions {
/// If `true`, string values are compared in case-insensitive mode.
///
/// This has no effect on numeric or boolean comparisons.
#[serde(default)]
pub ignore_case: bool,
}
/// A condition that compares values for equality.
///
/// This operator supports:
/// - boolean
/// - strings (with `ignore_case` flag)
/// - UUIDs
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct EqCondition {
/// Path of the field that should match the value.
pub name: String,
/// The value to check against.
///
/// When comparing with a string field, this value can be an array. The condition matches if any
/// of the provided values matches the field.
pub value: Value,
/// Configuration options for the condition.
#[serde(default, skip_serializing_if = "utils::is_default")]
pub options: EqCondOptions,
}
impl EqCondition {
fn cmp(&self, left: &str, right: &str) -> bool {
if self.options.ignore_case {
unicase::eq(left, right)
} else {
left == right
}
}
fn matches<T>(&self, instance: &T) -> bool
where
T: Getter + ?Sized,
{
match (instance.get_value(self.name.as_str()), &self.value) {
(None, Value::Null) => true,
(Some(Val::String(f)), Value::String(ref val)) => self.cmp(f, val),
(Some(Val::String(f)), Value::Array(ref arr)) => arr
.iter()
.filter_map(|v| v.as_str())
.any(|v| self.cmp(v, f)),
(Some(Val::Uuid(f)), Value::String(ref val)) => Some(f) == val.parse().ok(),
(Some(Val::Bool(f)), Value::Bool(v)) => f == *v,
_ => false,
}
}
}
macro_rules! impl_cmp_condition {
($struct_name:ident, $operator:tt, $doc:literal) => {
#[doc = $doc]
///
/// Strings are explicitly not supported by this.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct $struct_name {
/// Path of the field that should match the value.
pub name: String,
/// The numeric value to check against.
pub value: Number,
}
impl $struct_name {
fn matches<T>(&self, instance: &T) -> bool
where
T: Getter + ?Sized,
{
let Some(value) = instance.get_value(self.name.as_str()) else {
return false;
};
// Try various conversion functions in order of expensiveness and likelihood
// - as_i64 is not really fast, but most values in sampling rules can be i64, so we
// could return early
// - f64 is more likely to succeed than u64, but we might lose precision
if let (Some(a), Some(b)) = (value.as_i64(), self.value.as_i64()) {
a $operator b
} else if let (Some(a), Some(b)) = (value.as_u64(), self.value.as_u64()) {
a $operator b
} else if let (Some(a), Some(b)) = (value.as_f64(), self.value.as_f64()) {
a $operator b
} else {
false
}
}
}
}
}
impl_cmp_condition!(GteCondition, >=, "A condition that applies `>=`.");
impl_cmp_condition!(LteCondition, <=, "A condition that applies `<=`.");
impl_cmp_condition!(GtCondition, >, "A condition that applies `>`.");
impl_cmp_condition!(LtCondition, <, "A condition that applies `<`.");
/// A condition that uses glob matching.
///
/// This is similar to [`EqCondition`], but it allows for wildcards in `value`. This is slightly
/// more expensive to construct and check, so preferrably use [`EqCondition`] when no wildcard
/// matching is needed.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct GlobCondition {
/// Path of the field that should match the value.
pub name: String,
/// A list of glob patterns to check.
///
/// Note that this cannot be a single value, it must be a list of values.
pub value: GlobPatterns,
}
impl GlobCondition {
fn matches<T>(&self, instance: &T) -> bool
where
T: Getter + ?Sized,
{
match instance.get_value(self.name.as_str()) {
Some(Val::String(s)) => self.value.is_match(s),
_ => false,
}
}
}
/// Combines multiple conditions using logical OR.
///
/// This condition matches if **any** of the inner conditions matches.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct OrCondition {
/// Inner rules to combine.
pub inner: Vec<RuleCondition>,
}
impl OrCondition {
fn supported(&self) -> bool {
self.inner.iter().all(RuleCondition::supported)
}
fn matches<T>(&self, value: &T) -> bool
where
T: Getter + ?Sized,
{
self.inner.iter().any(|cond| cond.matches(value))
}
}
/// Combines multiple conditions using logical AND.
///
/// This condition matches if **all** of the inner conditions matches.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct AndCondition {
/// Inner rules to combine.
pub inner: Vec<RuleCondition>,
}
impl AndCondition {
fn supported(&self) -> bool {
self.inner.iter().all(RuleCondition::supported)
}
fn matches<T>(&self, value: &T) -> bool
where
T: Getter + ?Sized,
{
self.inner.iter().all(|cond| cond.matches(value))
}
}
/// Applies logical NOT to a condition.
///
/// This condition matches if the inner condition does not match.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct NotCondition {
/// An inner rule to negate.
pub inner: Box<RuleCondition>,
}
impl NotCondition {
fn supported(&self) -> bool {
self.inner.supported()
}
fn matches<T>(&self, value: &T) -> bool
where
T: Getter + ?Sized,
{
!self.inner.matches(value)
}
}
/// A condition from a sampling rule.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase", tag = "op")]
pub enum RuleCondition {
/// A condition that compares values for equality.
Eq(EqCondition),
/// A condition that applies `>=`.
Gte(GteCondition),
/// A condition that applies `<=`.
Lte(LteCondition),
/// A condition that applies `>`.
Gt(GtCondition),
/// A condition that applies `<`.
Lt(LtCondition),
/// A condition that uses glob matching.
Glob(GlobCondition),
/// Combines multiple conditions using logical OR.
Or(OrCondition),
/// Combines multiple conditions using logical AND.
And(AndCondition),
/// Applies logical NOT to a condition.
Not(NotCondition),
/// An unsupported condition for future compatibility.
#[serde(other)]
Unsupported,
}
impl RuleCondition {
/// Returns a condition that matches everything.
pub fn all() -> Self {
Self::And(AndCondition { inner: Vec::new() })
}
/// Checks if Relay supports this condition (in other words if the condition had any unknown configuration
/// which was serialized as "Unsupported" (because the configuration is either faulty or was created for a
/// newer relay that supports some other condition types)
pub fn supported(&self) -> bool {
match self {
RuleCondition::Unsupported => false,
// we have a known condition
RuleCondition::Gte(_)
| RuleCondition::Lte(_)
| RuleCondition::Gt(_)
| RuleCondition::Lt(_)
| RuleCondition::Eq(_)
| RuleCondition::Glob(_) => true,
// dig down for embedded conditions
RuleCondition::And(rules) => rules.supported(),
RuleCondition::Or(rules) => rules.supported(),
RuleCondition::Not(rule) => rule.supported(),
}
}
/// Returns `true` if the rule matches the given value instance.
pub fn matches<T>(&self, value: &T) -> bool
where
T: Getter + ?Sized,
{
match self {
RuleCondition::Eq(condition) => condition.matches(value),
RuleCondition::Lte(condition) => condition.matches(value),
RuleCondition::Gte(condition) => condition.matches(value),
RuleCondition::Gt(condition) => condition.matches(value),
RuleCondition::Lt(condition) => condition.matches(value),
RuleCondition::Glob(condition) => condition.matches(value),
RuleCondition::And(conditions) => conditions.matches(value),
RuleCondition::Or(conditions) => conditions.matches(value),
RuleCondition::Not(condition) => condition.matches(value),
RuleCondition::Unsupported => false,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn deserialize() {
let serialized_rules = r#"[
{
"op":"eq",
"name": "field_1",
"value": ["UPPER","lower"],
"options":{
"ignoreCase": true
}
},
{
"op":"eq",
"name": "field_2",
"value": ["UPPER","lower"]
},
{
"op":"glob",
"name": "field_3",
"value": ["1.2.*","2.*"]
},
{
"op":"not",
"inner": {
"op":"glob",
"name": "field_4",
"value": ["1.*"]
}
},
{
"op":"and",
"inner": [{
"op":"glob",
"name": "field_5",
"value": ["2.*"]
}]
},
{
"op":"or",
"inner": [{
"op":"glob",
"name": "field_6",
"value": ["3.*"]
}]
}
]"#;
let rules: Result<Vec<RuleCondition>, _> = serde_json::from_str(serialized_rules);
assert!(rules.is_ok());
let rules = rules.unwrap();
insta::assert_ron_snapshot!(rules, @r#"
[
EqCondition(
op: "eq",
name: "field_1",
value: [
"UPPER",
"lower",
],
options: EqCondOptions(
ignoreCase: true,
),
),
EqCondition(
op: "eq",
name: "field_2",
value: [
"UPPER",
"lower",
],
),
GlobCondition(
op: "glob",
name: "field_3",
value: [
"1.2.*",
"2.*",
],
),
NotCondition(
op: "not",
inner: GlobCondition(
op: "glob",
name: "field_4",
value: [
"1.*",
],
),
),
AndCondition(
op: "and",
inner: [
GlobCondition(
op: "glob",
name: "field_5",
value: [
"2.*",
],
),
],
),
OrCondition(
op: "or",
inner: [
GlobCondition(
op: "glob",
name: "field_6",
value: [
"3.*",
],
),
],
),
]"#);
}
#[test]
fn unsupported_rule_deserialize() {
let bad_json = r#"{
"op": "BadOperator",
"name": "foo",
"value": "bar"
}"#;
let rule: RuleCondition = serde_json::from_str(bad_json).unwrap();
assert!(matches!(rule, RuleCondition::Unsupported));
}
}
//! Functionality for calculating if a trace should be processed or dropped.
use chrono::{DateTime, Utc};
use relay_base_schema::project::ProjectKey;
use relay_event_schema::protocol::Event;
use relay_protocol::Getter;
use relay_sampling::config::{RuleType, SamplingRule};
use relay_sampling::evaluation::{
merge_rules_from_configs, MatchedRuleIds, SamplingEvaluator, SamplingMatch,
};
use relay_sampling::DynamicSamplingContext;
use crate::actors::project::ProjectState;
use crate::envelope::{Envelope, ItemType};
/// The result of a sampling operation.
#[derive(Debug, Default, Clone, PartialEq, Eq)]
pub enum SamplingResult {
/// Keep the event.
///
/// Relay either applied sampling rules and decided to keep the event, or was unable to parse
/// the rules.
#[default]
Keep,
/// Drop the event, due to a list of rules with provided identifiers.
Drop(MatchedRuleIds),
}
impl SamplingResult {
fn determine_from_sampling_match(sampling_match: Option<SamplingMatch>) -> Self {
match sampling_match {
Some(SamplingMatch {
sample_rate,
matched_rule_ids,
seed,
}) => {
let random_number = relay_sampling::evaluation::pseudo_random_from_uuid(seed);
relay_log::trace!(
sample_rate,
random_number,
"applying dynamic sampling to matching event"
);
if random_number >= sample_rate {
relay_log::trace!("dropping event that matched the configuration");
SamplingResult::Drop(matched_rule_ids)
} else {
relay_log::trace!("keeping event that matched the configuration");
SamplingResult::Keep
}
}
None => {
relay_log::trace!("keeping event that didn't match the configuration");
SamplingResult::Keep
}
}
}
}
/// Gets the sampling match result by creating the merged configuration and matching it against
/// the sampling configuration.
fn get_sampling_match_result(
processing_enabled: bool,
project_state: Option<&ProjectState>,
root_project_state: Option<&ProjectState>,
dsc: Option<&DynamicSamplingContext>,
event: Option<&Event>,
now: DateTime<Utc>,
) -> Option<SamplingMatch> {
let mut evaluator = SamplingEvaluator::new(now, |rule_type| match rule_type {
RuleType::Trace => dsc.map(|dsc| dsc as &dyn Getter),
RuleType::Transaction => event.map(|e| e as &dyn Getter),
_ => None,
});
if let Some(state) = project_state {
if let Some(result) = evaluator.evaluate_all(state.iter_rules(RuleType::Transaction)) {
return Some(result);
}
}
if let Some(state) = root_project_state {
if let Some(result) = evaluator.evaluate_all(state.iter_rules(RuleType::Trace)) {
return Some(result);
}
}
evaluator.evaluate_all(rules);
None
}
/// Runs dynamic sampling on an incoming event/dsc and returns whether or not the event should be
/// kept or dropped.
pub fn get_sampling_result(
processing_enabled: bool,
project_state: Option<&ProjectState>,
root_project_state: Option<&ProjectState>,
dsc: Option<&DynamicSamplingContext>,
event: Option<&Event>,
) -> SamplingResult {
let sampling_result = get_sampling_match_result(
processing_enabled,
project_state,
root_project_state,
dsc,
event,
// For consistency reasons we take a snapshot in time and use that time across all code that
// requires it.
Utc::now(),
);
SamplingResult::determine_from_sampling_match(sampling_result)
}
/// Runs dynamic sampling if the dsc and root project state are not None and returns whether the
/// transactions received with such dsc and project state would be kept or dropped by dynamic
/// sampling.
pub fn is_trace_fully_sampled(
processing_enabled: bool,
root_project_state: Option<&ProjectState>,
dsc: Option<&DynamicSamplingContext>,
) -> Option<bool> {
let dsc = dsc?;
let root_project_state = root_project_state?;
// If the sampled field is not set, we prefer to not tag the error since we have no clue on
// whether the head of the trace was kept or dropped on the client side.
// In addition, if the head of the trace was dropped on the client we will immediately mark
// the trace as not fully sampled.
if !(dsc.sampled?) {
return Some(false);
}
let sampling_result = get_sampling_result(
processing_enabled,
None,
Some(root_project_state),
Some(dsc),
None,
);
let sampled = match sampling_result {
SamplingResult::Keep => true,
SamplingResult::Drop(_) => false,
};
Some(sampled)
}
/// Returns the project key defined in the `trace` header of the envelope.
///
/// This function returns `None` if:
/// - there is no [`DynamicSamplingContext`] in the envelope headers.
/// - there are no transactions or events in the envelope, since in this case sampling by trace is redundant.
pub fn get_sampling_key(envelope: &Envelope) -> Option<ProjectKey> {
// If the envelope item is not of type transaction or event, we will not return a sampling key
// because it doesn't make sense to load the root project state if we don't perform trace
// sampling.
envelope
.get_item_by(|item| item.ty() == &ItemType::Transaction || item.ty() == &ItemType::Event)?;
envelope.dsc().map(|dsc| dsc.public_key)
}
#[cfg(test)]
mod tests {
use relay_base_schema::events::EventType;
use relay_event_schema::protocol::{EventId, LenientString};
use relay_protocol::Annotated;
use relay_sampling::condition::{EqCondOptions, EqCondition, RuleCondition};
use relay_sampling::config::{
RuleId, RuleType, SamplingConfig, SamplingMode, SamplingRule, SamplingValue,
};
use similar_asserts::assert_eq;
use uuid::Uuid;
use super::*;
use crate::testutils::project_state_with_config;
fn eq(name: &str, value: &[&str], ignore_case: bool) -> RuleCondition {
RuleCondition::Eq(EqCondition {
name: name.to_owned(),
value: value.iter().map(|s| s.to_string()).collect(),
options: EqCondOptions { ignore_case },
})
}
fn mocked_event(event_type: EventType, transaction: &str, release: &str) -> Event {
Event {
id: Annotated::new(EventId::new()),
ty: Annotated::new(event_type),
transaction: Annotated::new(transaction.to_string()),
release: Annotated::new(LenientString(release.to_string())),
..Event::default()
}
}
fn mocked_simple_dynamic_sampling_context(
sample_rate: Option<f64>,
release: Option<&str>,
transaction: Option<&str>,
environment: Option<&str>,
sampled: Option<bool>,
) -> DynamicSamplingContext {
DynamicSamplingContext {
trace_id: Uuid::new_v4(),
public_key: "12345678901234567890123456789012".parse().unwrap(),
release: release.map(|value| value.to_string()),
environment: environment.map(|value| value.to_string()),
transaction: transaction.map(|value| value.to_string()),
sample_rate,
user: Default::default(),
other: Default::default(),
replay_id: None,
sampled,
}
}
fn mocked_sampling_rule(id: u32, ty: RuleType, sample_rate: f64) -> SamplingRule {
SamplingRule {
condition: RuleCondition::all(),
sampling_value: SamplingValue::SampleRate { value: sample_rate },
ty,
id: RuleId(id),
time_range: Default::default(),
decaying_fn: Default::default(),
}
}
#[test]
/// Tests that an event is kept when there is a match and we have 100% sample rate.
fn test_get_sampling_result_return_keep_with_match_and_100_sample_rate() {
let project_state = project_state_with_config(SamplingConfig {
rules: vec![],
rules_v2: vec![mocked_sampling_rule(1, RuleType::Transaction, 1.0)],
mode: SamplingMode::Received,
});
let event = mocked_event(EventType::Transaction, "transaction", "2.0");
let result = get_sampling_result(true, Some(&project_state), None, None, Some(&event));
assert_eq!(result, SamplingResult::Keep)
}
#[test]
/// Tests that an event is dropped when there is a match and we have 0% sample rate.
fn test_get_sampling_result_return_drop_with_match_and_0_sample_rate() {
let project_state = project_state_with_config(SamplingConfig {
rules: vec![],
rules_v2: vec![mocked_sampling_rule(1, RuleType::Transaction, 0.0)],
mode: SamplingMode::Received,
});
let event = mocked_event(EventType::Transaction, "transaction", "2.0");
let result = get_sampling_result(true, Some(&project_state), None, None, Some(&event));
assert_eq!(
result,
SamplingResult::Drop(MatchedRuleIds(vec![RuleId(1)]))
)
}
#[test]
/// Tests that an event is kept when there is no match.
fn test_get_sampling_result_return_keep_with_no_match() {
let project_state = project_state_with_config(SamplingConfig {
rules: vec![],
rules_v2: vec![SamplingRule {
condition: eq("event.transaction", &["foo"], true),
sampling_value: SamplingValue::SampleRate { value: 0.5 },
ty: RuleType::Transaction,
id: RuleId(3),
time_range: Default::default(),
decaying_fn: Default::default(),
}],
mode: SamplingMode::Received,
});
let event = mocked_event(EventType::Transaction, "bar", "2.0");
let result = get_sampling_result(true, Some(&project_state), None, None, Some(&event));
assert_eq!(result, SamplingResult::Keep)
}
#[test]
/// Tests that an event is kept when there are unsupported rules with no processing and vice versa.
fn test_get_sampling_result_return_keep_with_unsupported_rule() {
let project_state = project_state_with_config(SamplingConfig {
rules: vec![],
rules_v2: vec![
mocked_sampling_rule(1, RuleType::Unsupported, 0.0),
mocked_sampling_rule(2, RuleType::Transaction, 0.0),
],
mode: SamplingMode::Received,
});
let event = mocked_event(EventType::Transaction, "transaction", "2.0");
let result = get_sampling_result(false, Some(&project_state), None, None, Some(&event));
assert_eq!(result, SamplingResult::Keep);
let result = get_sampling_result(true, Some(&project_state), None, None, Some(&event));
assert_eq!(
result,
SamplingResult::Drop(MatchedRuleIds(vec![RuleId(2)]))
)
}
#[test]
/// Tests that an event is kept when there is a trace match and we have 100% sample rate.
fn test_get_sampling_result_with_traces_rules_return_keep_when_match() {
let root_project_state = project_state_with_config(SamplingConfig {
rules: vec![],
rules_v2: vec![mocked_sampling_rule(1, RuleType::Trace, 1.0)],
mode: SamplingMode::Received,
});
let dsc = mocked_simple_dynamic_sampling_context(Some(1.0), Some("3.0"), None, None, None);
let result = get_sampling_result(true, None, Some(&root_project_state), Some(&dsc), None);
assert_eq!(result, SamplingResult::Keep)
}
#[test]
/// Tests that a trace is marked as fully sampled correctly when dsc and project state are set.
fn test_is_trace_fully_sampled_with_valid_dsc_and_project_state() {
// We test with `sampled = true` and 100% rule.
let project_state = project_state_with_config(SamplingConfig {
rules: vec![],
rules_v2: vec![mocked_sampling_rule(1, RuleType::Trace, 1.0)],
mode: SamplingMode::Received,
});
let dsc =
mocked_simple_dynamic_sampling_context(Some(1.0), Some("3.0"), None, None, Some(true));
let result = is_trace_fully_sampled(true, Some(&project_state), Some(&dsc)).unwrap();
assert!(result);
// We test with `sampled = true` and 0% rule.
let project_state = project_state_with_config(SamplingConfig {
rules: vec![],
rules_v2: vec![mocked_sampling_rule(1, RuleType::Trace, 0.0)],
mode: SamplingMode::Received,
});
let dsc =
mocked_simple_dynamic_sampling_context(Some(1.0), Some("3.0"), None, None, Some(true));
let result = is_trace_fully_sampled(true, Some(&project_state), Some(&dsc)).unwrap();
assert!(!result);
// We test with `sampled = false` and 100% rule.
let project_state = project_state_with_config(SamplingConfig {
rules: vec![],
rules_v2: vec![mocked_sampling_rule(1, RuleType::Trace, 1.0)],
mode: SamplingMode::Received,
});
let dsc =
mocked_simple_dynamic_sampling_context(Some(1.0), Some("3.0"), None, None, Some(false));
let result = is_trace_fully_sampled(true, Some(&project_state), Some(&dsc)).unwrap();
assert!(!result);
}
#[test]
/// Tests that a trace is not marked as fully sampled or not if inputs are invalid.
fn test_is_trace_fully_sampled_with_invalid_inputs() {
// We test with missing `sampled`.
let project_state = project_state_with_config(SamplingConfig {
rules: vec![],
rules_v2: vec![mocked_sampling_rule(1, RuleType::Trace, 1.0)],
mode: SamplingMode::Received,
});
let dsc = mocked_simple_dynamic_sampling_context(Some(1.0), Some("3.0"), None, None, None);
let result = is_trace_fully_sampled(true, Some(&project_state), Some(&dsc));
assert!(result.is_none());
// We test with missing dsc and project config.
let result = is_trace_fully_sampled(true, None, None);
assert!(result.is_none())
}
}
//! Evaluation of dynamic sampling rules.
use std::fmt;
use std::num::ParseIntError;
use chrono::{DateTime, Utc};
use rand::distributions::Uniform;
use rand::Rng;
use rand_pcg::Pcg32;
use relay_base_schema::events::EventType;
use relay_event_schema::protocol::Event;
use relay_protocol::Getter;
use serde::Serialize;
use uuid::Uuid;
use crate::config::{
DecayingFunction, RuleId, RuleType, SamplingConfig, SamplingMode, SamplingRule, SamplingValue,
TimeRange,
};
use crate::dsc::DynamicSamplingContext;
/// Generates a pseudo random number by seeding the generator with the given id.
///
/// The return is deterministic, always generates the same number from the same id.
pub fn pseudo_random_from_uuid(id: Uuid) -> f64 {
let big_seed = id.as_u128();
let mut generator = Pcg32::new((big_seed >> 64) as u64, big_seed as u64);
let dist = Uniform::new(0f64, 1f64);
generator.sample(dist)
}
/// Returns an iterator of references that chains together and merges rules.
///
/// The chaining logic will take all the non-trace rules from the project and all the trace/unsupported
/// rules from the root project and concatenate them.
pub fn merge_rules_from_configs<'a>(
sampling_config: Option<&'a SamplingConfig>,
root_sampling_config: Option<&'a SamplingConfig>,
) -> impl Iterator<Item = &'a SamplingRule> {
let transaction_rules = sampling_config
.into_iter()
.flat_map(|config| config.rules_v2.iter())
.filter(|&rule| rule.ty == RuleType::Transaction);
let trace_rules = root_sampling_config
.into_iter()
.flat_map(|config| config.rules_v2.iter())
.filter(|&rule| rule.ty == RuleType::Trace);
transaction_rules.chain(trace_rules)
}
/// Checks whether unsupported rules result in a direct keep of the event or depending on the
/// type of Relay an ignore of unsupported rules.
fn check_unsupported_rules(
processing_enabled: bool,
sampling_config: Option<&SamplingConfig>,
root_sampling_config: Option<&SamplingConfig>,
) -> Result<(), ()> {
// When we have unsupported rules disable sampling for non processing relays.
if sampling_config.map_or(false, |config| config.unsupported())
|| root_sampling_config.map_or(false, |config| config.unsupported())
{
if !processing_enabled {
return Err(());
} else {
relay_log::error!("found unsupported rules even as processing relay");
}
}
Ok(())
}
/// Gets the sampling match result by creating the merged configuration and matching it against
/// the sampling configuration.
pub fn merge_configs_and_match(
processing_enabled: bool,
sampling_config: Option<&SamplingConfig>,
root_sampling_config: Option<&SamplingConfig>,
dsc: Option<&DynamicSamplingContext>,
event: Option<&Event>,
now: DateTime<Utc>,
) -> Option<SamplingMatch> {
// We check if there are unsupported rules in any of the two configurations.
check_unsupported_rules(processing_enabled, sampling_config, root_sampling_config).ok()?;
// We perform the rule matching with the multi-matching logic on the merged rules.
let rules = merge_rules_from_configs(sampling_config, root_sampling_config);
let mut match_result = SamplingMatch::match_against_rules(rules, event, dsc, now)?;
// If we have a match, we will try to derive the sample rate based on the sampling mode.
//
// Keep in mind that the sample rate received here has already been derived by the matching
// logic, based on multiple matches and decaying functions.
//
// The determination of the sampling mode occurs with the following priority:
// 1. Non-root project sampling mode
// 2. Root project sampling mode
let Some(primary_config) = sampling_config.or(root_sampling_config) else {
relay_log::error!("cannot sample without at least one sampling config");
return None;
};
let sample_rate = match primary_config.mode {
SamplingMode::Received => match_result.sample_rate,
SamplingMode::Total => match dsc {
Some(dsc) => dsc.adjusted_sample_rate(match_result.sample_rate),
None => match_result.sample_rate,
},
SamplingMode::Unsupported => {
if processing_enabled {
relay_log::error!("found unsupported sampling mode even as processing Relay");
}
return None;
}
};
match_result.set_sample_rate(sample_rate);
// Only if we arrive at this stage, it means that we have found a match and we want to prepare
// the data for making the sampling decision.
Some(match_result)
}
/// Represents the specification for sampling an incoming event.
#[derive(Clone, Debug, PartialEq, Serialize)]
pub struct SamplingMatch {
/// The sample rate to use for the incoming event.
pub sample_rate: f64,
/// The seed to feed to the random number generator which allows the same number to be
/// generated given the same seed.
///
/// This is especially important for trace sampling, even though we can have inconsistent
/// traces due to multi-matching.
pub seed: Uuid,
/// The list of rule ids that have matched the incoming event and/or dynamic sampling context.
pub matched_rule_ids: MatchedRuleIds,
}
impl SamplingMatch {
/// Setter for `sample_rate`.
pub fn set_sample_rate(&mut self, new_sample_rate: f64) {
self.sample_rate = new_sample_rate;
}
/// Matches an event and/or dynamic sampling context against the rules of the sampling configuration.
///
/// The multi-matching algorithm used iterates by collecting and multiplying factor rules until
/// it finds a sample rate rule. Once a sample rate rule is found, the final sample rate is
/// computed by multiplying it with the previously accumulated factors.
///
/// The default accumulated factors equal to 1 because it is the identity of the multiplication
/// operation, thus in case no factor rules are matched, the final result will just be the
/// sample rate of the matching rule.
///
/// In case no sample rate rule is matched, we are going to return a None, signaling that no
/// match has been found.
pub fn match_against_rules<'a, I>(
rules: I,
event: Option<&Event>,
dsc: Option<&DynamicSamplingContext>,
now: DateTime<Utc>,
) -> Option<SamplingMatch>
where
I: Iterator<Item = &'a SamplingRule>,
{
let mut matched_rule_ids = vec![];
// Even though this seed is changed based on whether we match event or trace rules, we will
// still incur in inconsistent trace sampling because of multi-matching of rules across event
// and trace rules.
//
// An example of inconsistent trace sampling could be:
// /hello -> /world -> /transaction belong to trace_id = abc
// * /hello has uniform rule with 0.2 sample rate which will match all the transactions of the trace
// * each project has a single transaction rule with different factors (2, 3, 4)
//
// 1. /hello is matched with a transaction rule with a factor of 2 and uses as seed abc -> 0.2 * 2 = 0.4 sample rate
// 2. /world is matched with a transaction rule with a factor of 3 and uses as seed abc -> 0.2 * 3 = 0.6 sample rate
// 3. /transaction is matched with a transaction rule with a factor of 4 and uses as seed abc -> 0.2 * 4 = 0.8 sample rate
//
// We can see that we have 3 different samples rates but given the same seed, the random number generated will be the same.
let mut seed = event.and_then(|e| e.id.value()).map(|id| id.0);
let mut accumulated_factors = 1.0;
for rule in rules {
let matches = match rule.ty {
RuleType::Trace => match dsc {
Some(dsc) => rule.condition.matches(dsc),
_ => false,
},
RuleType::Transaction => event.map_or(false, |event| match event.ty.0 {
Some(EventType::Transaction) => rule.condition.matches(event),
_ => false,
}),
_ => false,
};
if matches {
if let Some(evaluator) = SamplingValueEvaluator::create(rule, now) {
matched_rule_ids.push(rule.id);
if rule.ty == RuleType::Trace {
if let Some(dsc) = dsc {
seed = Some(dsc.trace_id);
}
}
let value = evaluator.evaluate(now);
match rule.sampling_value {
SamplingValue::Factor { .. } => accumulated_factors *= value,
SamplingValue::SampleRate { .. } => {
return Some(SamplingMatch {
sample_rate: (value * accumulated_factors).clamp(0.0, 1.0),
seed: match seed {
Some(seed) => seed,
// In case we are not able to generate a seed, we will return a no
// match.
None => return None,
},
matched_rule_ids: MatchedRuleIds(matched_rule_ids),
});
}
}
}
}
}
// In case no match is available, we won't return any specification.
None
}
}
/// The result of a sampling operation.
#[derive(Debug, Default, Clone, PartialEq, Eq)]
pub enum SamplingResult {
/// Keep the event.
///
/// Relay either applied sampling rules and decided to keep the event, or was unable to parse
/// the rules.
#[default]
Keep,
/// Drop the event, due to a list of rules with provided identifiers.
Drop(MatchedRuleIds),
}
pub struct SamplingEvaluator<F> {
now: DateTime<Utc>,
rule_ids: Vec<RuleId>, // vec![]
factor: f64, // 1
getter: F,
}
impl<'a, F> SamplingEvaluator<F>
where
F: FnMut(RuleType) -> Option<&'a dyn Getter> + 'a,
{
pub fn new(now: DateTime<Utc>, getter: F) -> Self {
Self {
now,
rule_ids: Vec::new(),
factor: 1.0,
getter,
}
}
pub fn evaluate(&mut self, rule: &SamplingRule) -> Option<SamplingResult> {
let instance = (self.getter)(rule.ty)?;
if !rule.condition.matches(instance) {
return None;
}
let evaluator = SamplingValueEvaluator::create(rule, self.now)?;
self.rule_ids.push(rule.id);
// TODO: For later
// if rule.ty == RuleType::Trace {
// if let Some(dsc) = dsc {
// seed = Some(dsc.trace_id);
// }
// }
let value = evaluator.evaluate(self.now);
match rule.sampling_value {
SamplingValue::Factor { .. } => self.factor *= value,
SamplingValue::SampleRate { .. } => {
return Some(todo!());
}
}
None
}
pub fn evaluate_all<'b, I>(&mut self, mut rules: I) -> Option<SamplingResult>
where
I: Iterator<Item = &'b SamplingRule>,
{
rules.find_map(|rule| self.evaluate(rule))
}
}
/// Represents a list of rule ids which is used for outcomes.
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)]
pub struct MatchedRuleIds(pub Vec<RuleId>);
impl MatchedRuleIds {
/// Parses `MatchedRuleIds` from a string with concatenated rule identifiers.
///
/// The format it parses from is:
///
/// ```text
/// rule_id_1,rule_id_2,...
/// ```
pub fn parse(value: &str) -> Result<MatchedRuleIds, ParseIntError> {
let mut rule_ids = vec![];
for rule_id in value.split(',') {
rule_ids.push(RuleId(rule_id.parse()?));
}
Ok(MatchedRuleIds(rule_ids))
}
}
impl fmt::Display for MatchedRuleIds {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
for (i, rule_id) in self.0.iter().enumerate() {
if i > 0 {
write!(f, ",")?;
}
write!(f, "{rule_id}")?;
}
Ok(())
}
}
/// A struct representing the evaluation context of a sample rate.
#[derive(Debug, Clone, Copy)]
enum SamplingValueEvaluator {
Linear {
start: DateTime<Utc>,
end: DateTime<Utc>,
initial_value: f64,
decayed_value: f64,
},
Constant {
initial_value: f64,
},
}
impl SamplingValueEvaluator {
/// Returns a [`SamplingValueEvaluator`] if the rule is active at the given time.
fn create(rule: &SamplingRule, now: DateTime<Utc>) -> Option<Self> {
let sampling_base_value = rule.sampling_value.value();
match rule.decaying_fn {
DecayingFunction::Linear { decayed_value } => {
if let TimeRange {
start: Some(start),
end: Some(end),
} = rule.time_range
{
// As in the TimeRange::contains method we use a right non-inclusive time bound.
if sampling_base_value > decayed_value && start <= now && now < end {
return Some(Self::Linear {
start,
end,
initial_value: sampling_base_value,
decayed_value,
});
}
}
}
DecayingFunction::Constant => {
if rule.time_range.contains(now) {
return Some(Self::Constant {
initial_value: sampling_base_value,
});
}
}
}
None
}
/// Evaluates the value of the sampling strategy given a the current time.
fn evaluate(&self, now: DateTime<Utc>) -> f64 {
match self {
Self::Linear {
start,
end,
initial_value,
decayed_value,
} => {
let now_timestamp = now.timestamp() as f64;
let start_timestamp = start.timestamp() as f64;
let end_timestamp = end.timestamp() as f64;
let progress_ratio = ((now_timestamp - start_timestamp)
/ (end_timestamp - start_timestamp))
.clamp(0.0, 1.0);
// This interval will always be < 0.
let interval = decayed_value - initial_value;
initial_value + (interval * progress_ratio)
}
Self::Constant { initial_value } => *initial_value,
}
}
}
#[cfg(test)]
mod tests {
use crate::condition::RuleCondition;
use super::*;
#[test]
/// Test that the we get the same sampling decision from the same trace id
fn test_repeatable_seed() {
let id = "4a106cf6-b151-44eb-9131-ae7db1a157a3".parse().unwrap();
let val1 = pseudo_random_from_uuid(id);
let val2 = pseudo_random_from_uuid(id);
assert!(val1 + f64::EPSILON > val2 && val2 + f64::EPSILON > val1);
}
#[test]
/// Tests if the MatchedRuleIds struct is displayed correctly as string.
fn matched_rule_ids_display() {
let matched_rule_ids = MatchedRuleIds(vec![RuleId(123), RuleId(456)]);
assert_eq!(matched_rule_ids.to_string(), "123,456");
let matched_rule_ids = MatchedRuleIds(vec![RuleId(123)]);
assert_eq!(matched_rule_ids.to_string(), "123");
let matched_rule_ids = MatchedRuleIds(vec![]);
assert_eq!(matched_rule_ids.to_string(), "")
}
#[test]
/// Tests if the MatchRuleIds struct is created correctly from its string representation.
fn matched_rule_ids_parse() {
assert_eq!(
MatchedRuleIds::parse("123,456"),
Ok(MatchedRuleIds(vec![RuleId(123), RuleId(456)]))
);
assert_eq!(
MatchedRuleIds::parse("123"),
Ok(MatchedRuleIds(vec![RuleId(123)]))
);
assert!(MatchedRuleIds::parse("").is_err());
assert!(MatchedRuleIds::parse(",").is_err());
assert!(MatchedRuleIds::parse("123.456").is_err());
assert!(MatchedRuleIds::parse("a,b").is_err());
}
macro_rules! assert_rule_ids_eq {
($exc:expr, $res:expr) => {
if ($exc.len() != $res.len()) {
panic!("The rule ids don't match.")
}
for (index, rule) in $res.iter().enumerate() {
assert_eq!(rule.id.0, $exc[index])
}
};
}
fn mocked_sampling_rule(id: u32, ty: RuleType, sample_rate: f64) -> SamplingRule {
SamplingRule {
condition: RuleCondition::all(),
sampling_value: SamplingValue::SampleRate { value: sample_rate },
ty,
id: RuleId(id),
time_range: Default::default(),
decaying_fn: Default::default(),
}
}
fn merge_root_and_non_root_configs_with(
rules: Vec<SamplingRule>,
root_rules: Vec<SamplingRule>,
) -> Vec<SamplingRule> {
crate::evaluation::merge_rules_from_configs(
Some(&SamplingConfig {
rules: vec![],
rules_v2: rules,
mode: SamplingMode::Received,
}),
Some(&SamplingConfig {
rules: vec![],
rules_v2: root_rules,
mode: SamplingMode::Received,
}),
)
.cloned()
.collect()
}
#[test]
/// Tests the merged config of the two configs with rules.
fn test_get_merged_config_with_rules_in_both_project_config_and_root_project_config() {
assert_rule_ids_eq!(
[1, 7],
merge_root_and_non_root_configs_with(
vec![
mocked_sampling_rule(1, RuleType::Transaction, 0.1),
mocked_sampling_rule(3, RuleType::Trace, 0.3),
mocked_sampling_rule(4, RuleType::Unsupported, 0.1),
],
vec![
mocked_sampling_rule(5, RuleType::Transaction, 0.4),
mocked_sampling_rule(7, RuleType::Trace, 0.6),
mocked_sampling_rule(8, RuleType::Unsupported, 0.1),
],
)
);
}
#[test]
/// Tests the merged config of the two configs without rules.
fn test_get_merged_config_with_no_rules_in_both_project_config_and_root_project_config() {
assert!(merge_root_and_non_root_configs_with(vec![], vec![]).is_empty());
}
#[test]
/// Tests the merged config of the project config with rules and the root project config
/// without rules.
fn test_get_merged_config_with_rules_in_project_config_and_no_rules_in_root_project_config() {
assert_rule_ids_eq!(
[1],
merge_root_and_non_root_configs_with(
vec![
mocked_sampling_rule(1, RuleType::Transaction, 0.1),
mocked_sampling_rule(3, RuleType::Trace, 0.3),
mocked_sampling_rule(4, RuleType::Unsupported, 0.1),
],
vec![],
)
);
}
#[test]
/// Tests the merged config of the project config without rules and the root project config
/// with rules.
fn test_get_merged_config_with_no_rules_in_project_config_and_with_rules_in_root_project_config(
) {
assert_rule_ids_eq!(
[6],
merge_root_and_non_root_configs_with(
vec![],
vec![
mocked_sampling_rule(4, RuleType::Transaction, 0.4),
mocked_sampling_rule(6, RuleType::Trace, 0.6),
mocked_sampling_rule(7, RuleType::Unsupported, 0.1),
]
)
);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment