Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ref(normalization): Remove TransactionsProcessor #2714

Merged
merged 3 commits into from
Nov 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion relay-event-normalization/src/normalize/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,14 @@ use crate::{

pub mod breakdowns;
pub mod nel;
pub(crate) mod processor;
iker-barriocanal marked this conversation as resolved.
Show resolved Hide resolved
pub mod span;
pub mod user_agent;
pub mod utils;

mod contexts;
mod logentry;
mod mechanism;
mod processor;
mod request;
mod stacktrace;

Expand Down
121 changes: 102 additions & 19 deletions relay-event-normalization/src/normalize/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@ use std::collections::BTreeSet;

use std::hash::{Hash, Hasher};
use std::mem;
use std::ops::Range;

use chrono::{DateTime, Duration, Utc};
use relay_base_schema::metrics::{is_valid_metric_name, DurationUnit, FractionUnit, MetricUnit};
use relay_common::time::UnixTimestamp;
use relay_event_schema::processor::{
self, MaxChars, ProcessValue, ProcessingAction, ProcessingResult, ProcessingState, Processor,
};
Expand All @@ -31,25 +33,42 @@ use crate::span::tag_extraction::{self, extract_span_tags};
use crate::timestamp::TimestampProcessor;
use crate::utils::{self, MAX_DURATION_MOBILE_MS};
use crate::{
breakdowns, schema, span, transactions, trimming, user_agent, BreakdownsConfig,
ClockDriftProcessor, DynamicMeasurementsConfig, GeoIpLookup, PerformanceScoreConfig,
RawUserAgentInfo,
breakdowns, end_all_spans, normalize_transaction_name, schema, set_default_transaction_source,
span, trimming, user_agent, validate_transaction, BreakdownsConfig, ClockDriftProcessor,
DynamicMeasurementsConfig, GeoIpLookup, PerformanceScoreConfig, RawUserAgentInfo,
TransactionNameConfig,
};

use crate::LightNormalizationConfig;

/// Configuration for [`NormalizeProcessor`].
#[derive(Clone, Debug, Default)]
pub struct NormalizeProcessorConfig<'a> {
pub(crate) struct NormalizeProcessorConfig<'a> {
/// Light normalization config.
// XXX(iker): we should split this config appropriately.
light_config: LightNormalizationConfig<'a>,
pub light_config: LightNormalizationConfig<'a>,

/// Configuration to apply to transaction names, especially around sanitizing.
pub transaction_name_config: TransactionNameConfig<'a>,

/// Timestamp range in which a transaction must end.
///
/// Transactions that finish outside of this range are considered invalid.
/// This check is skipped if no range is provided.
pub transaction_range: Option<Range<UnixTimestamp>>,
}

impl<'a> From<LightNormalizationConfig<'a>> for NormalizeProcessorConfig<'a> {
fn from(config: LightNormalizationConfig<'a>) -> Self {
fn from(mut config: LightNormalizationConfig<'a>) -> Self {
// HACK(iker): workaround to avoid cloning of config items. We'll get
// rid of this when we remove light normalization in the next step.
let transaction_name_config = std::mem::take(&mut config.transaction_name_config);
let transaction_range = config.transaction_range.take();

Self {
light_config: config,
transaction_name_config,
transaction_range,
}
}
}
Expand All @@ -62,7 +81,7 @@ impl<'a> From<LightNormalizationConfig<'a>> for NormalizeProcessorConfig<'a> {
/// The returned [`ProcessingResult`] indicates whether the passed event should
/// be ingested or dropped.
#[derive(Debug, Default)]
pub struct NormalizeProcessor<'a> {
pub(crate) struct NormalizeProcessor<'a> {
iker-barriocanal marked this conversation as resolved.
Show resolved Hide resolved
/// Configuration for the normalization steps.
config: NormalizeProcessorConfig<'a>,
}
Expand All @@ -80,23 +99,40 @@ impl<'a> Processor for NormalizeProcessor<'a> {
meta: &mut Meta,
state: &ProcessingState<'_>,
) -> ProcessingResult {
if event.ty.value() == Some(&EventType::Transaction) {
// TODO: Parts of this processor should probably be a filter so we
// can revert some changes to ProcessingAction)

validate_transaction(event, self.config.transaction_range.as_ref())?;
Copy link
Contributor Author

@iker-barriocanal iker-barriocanal Nov 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The validation happened after normalizing the transaction name, a few lines below. Since validation does not depend on the transaction name, moving it to the top here allows Relay to reject invalid transactions a bit earlier and save some computing. Note the outcome does not change. Happy to move this to another PR if this feels out of scope.


if let Some(trace_context) = event.context_mut::<TraceContext>() {
trace_context.op.get_or_insert_with(|| "default".to_owned());
}
Comment on lines +108 to +110
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was part of validate_transaction, but moving it here allows the validation function to accept a &event instead of &mut event.


// The transaction name is expected to be non-empty by downstream services (e.g. Snuba), but
// Relay doesn't reject events missing the transaction name. Instead, a default transaction
// name is given, similar to how Sentry gives an "<unlabeled event>" title to error events.
// SDKs should avoid sending empty transaction names, setting a more contextual default
// value when possible.
if event.transaction.value().map_or(true, |s| s.is_empty()) {
event
.transaction
.set_value(Some("<unlabeled transaction>".to_owned()))
}
set_default_transaction_source(event);
normalize_transaction_name(event, &self.config.transaction_name_config)?;
end_all_spans(event)?;
}

// XXX(iker): processing child values should be the last step. The logic
// below this call is being moved (WIP) to the processor appropriately.
event.process_child_values(self, state)?;

let config = &self.config.light_config;
if config.is_renormalize {
return Ok(());
}

// Validate and normalize transaction
// (internally noops for non-transaction events).
// TODO: Parts of this processor should probably be a filter so we
// can revert some changes to ProcessingAction)
let mut transactions_processor = transactions::TransactionsProcessor::new(
config.transaction_name_config.clone(),
config.transaction_range.clone(),
);
transactions_processor.process_event(event, meta, ProcessingState::root())?;

// Check for required and non-empty values
schema::SchemaProcessor.process_event(event, meta, ProcessingState::root())?;

Expand Down Expand Up @@ -973,7 +1009,7 @@ mod tests {
use insta::assert_debug_snapshot;
use relay_base_schema::events::EventType;
use relay_base_schema::metrics::{DurationUnit, MetricUnit};
use relay_event_schema::processor::{process_value, ProcessingAction, ProcessingState};
use relay_event_schema::processor::{self, process_value, ProcessingAction, ProcessingState};
use relay_event_schema::protocol::{
Contexts, Csp, DeviceContext, Event, Headers, IpAddr, Measurement, Measurements, Request,
Span, SpanId, Tags, TraceContext, TraceId,
Expand Down Expand Up @@ -1782,7 +1818,7 @@ mod tests {
}

#[test]
fn test_renormalize_is_idempotent() {
fn test_renormalize_spans_is_idempotent() {
let json = r#"{
"start_timestamp": 1,
"timestamp": 2,
Expand All @@ -1809,6 +1845,53 @@ mod tests {
assert_eq!(reprocessed, reprocessed2);
}

#[test]
fn test_renormalize_transactions_is_idempotent() {
let json = r#"{
"event_id": "52df9022835246eeb317dbd739ccd059",
"type": "transaction",
"transaction": "test-transaction",
"start_timestamp": 1,
"timestamp": 2,
"contexts": {
"trace": {
"trace_id": "ff62a8b040f340bda5d830223def1d81",
"span_id": "bd429c44b67a3eb4"
}
}
}"#;

let mut processed = Annotated::<Event>::from_json(json).unwrap();
let processor_config = NormalizeProcessorConfig::default();
let mut processor = NormalizeProcessor::new(processor_config.clone());
process_value(&mut processed, &mut processor, ProcessingState::root()).unwrap();
remove_received_from_event(&mut processed);
let trace_context = get_value!(processed!).context::<TraceContext>().unwrap();
assert_eq!(trace_context.op.value().unwrap(), "default");

let mut reprocess_config = processor_config.clone();
reprocess_config.light_config.is_renormalize = true;
let mut processor = NormalizeProcessor::new(processor_config.clone());

let mut reprocessed = processed.clone();
process_value(&mut reprocessed, &mut processor, ProcessingState::root()).unwrap();
remove_received_from_event(&mut reprocessed);
assert_eq!(processed, reprocessed);

let mut reprocessed2 = reprocessed.clone();
process_value(&mut reprocessed2, &mut processor, ProcessingState::root()).unwrap();
remove_received_from_event(&mut reprocessed2);
assert_eq!(reprocessed, reprocessed2);
}

fn remove_received_from_event(event: &mut Annotated<Event>) {
processor::apply(event, |e, _| {
e.received = Annotated::empty();
Ok(())
})
.unwrap();
}

#[test]
fn test_computed_performance_score() {
let json = r#"
Expand Down
Loading
Loading