Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(metrics): Forward received_at field to Kafka #3561

Merged
merged 24 commits into from
May 22, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Improve
  • Loading branch information
iambriccardo committed May 8, 2024
commit def59a069740ef9344a6d661286e57de6fbd44e1
3 changes: 2 additions & 1 deletion tests/integration/asserts/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from .time import time_within, time_within_delta
from .time import time_after, time_within, time_within_delta

__all__ = [
"time_after",
"time_within",
"time_within_delta"
]
10 changes: 6 additions & 4 deletions tests/integration/asserts/time.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@ def __eq__(self, other):
return self._lower_bound <= other <= self._upper_bound


def time_within(lower_bound, upper_bound=None):
if upper_bound is None:
upper_bound = int(datetime.now(tz=timezone.utc).timestamp())

def time_after(lower_bound):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't assert after, but after until now. I think making the upper bound on time_within optional and defaulting to now is fine.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I was indeed thinking about a better name.

upper_bound = int(datetime.now(tz=timezone.utc).timestamp())
return time_within(lower_bound, upper_bound)


def time_within(lower_bound, upper_bound):
assert lower_bound <= upper_bound
return _WithinBounds(lower_bound, upper_bound)

Expand Down
91 changes: 44 additions & 47 deletions tests/integration/test_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import queue

from .test_envelope import generate_transaction_item
from .asserts import time_within, time_within_delta
from .asserts import time_after, time_within, time_within_delta

TEST_CONFIG = {
"aggregator": {
Expand Down Expand Up @@ -204,14 +204,14 @@ def test_metrics(mini_sentry, relay):
)
assert received_metrics == [
{
"timestamp": timestamp,
"timestamp": time_after(timestamp),
"width": 1,
"name": "c:transactions/bar@none",
"value": 17.0,
"type": "c",
},
{
"timestamp": timestamp,
"timestamp": time_after(timestamp),
"width": 1,
"name": "c:transactions/foo@none",
"value": 42.0,
Expand Down Expand Up @@ -241,7 +241,7 @@ def test_metrics_backdated(mini_sentry, relay):
)
assert received_metrics == [
{
"timestamp": timestamp,
"timestamp": time_after(timestamp),
"width": 1,
"name": "c:transactions/foo@none",
"value": 42.0,
Expand Down Expand Up @@ -385,14 +385,14 @@ def test_global_metrics(mini_sentry, relay):
metrics = metrics_without_keys(metrics_batch[public_key], keys={"metadata"})
assert metrics == [
{
"timestamp": timestamp,
"timestamp": time_after(timestamp),
"width": 1,
"name": "c:transactions/bar@none",
"value": 17.0,
"type": "c",
},
{
"timestamp": timestamp,
"timestamp": time_after(timestamp),
"width": 1,
"name": "c:transactions/foo@none",
"value": 42.0,
Expand All @@ -411,7 +411,7 @@ def test_global_metrics_no_config(mini_sentry, relay):
timestamp = int(datetime.now(tz=timezone.utc).timestamp())
metrics = [
{
"timestamp": timestamp,
"timestamp": time_after(timestamp),
"width": 1,
"name": "c:transactions/foo@none",
"value": 17.0,
Expand Down Expand Up @@ -466,7 +466,7 @@ def test_global_metrics_batching(mini_sentry, relay):

assert metrics_without_keys(batch1[public_key], keys={"metadata"}) == [
{
"timestamp": timestamp,
"timestamp": time_after(timestamp),
"width": 1,
"name": "d:transactions/foo@none",
"value": [float(i) for i in range(1, 16)],
Expand All @@ -476,7 +476,7 @@ def test_global_metrics_batching(mini_sentry, relay):

assert metrics_without_keys(batch2[public_key], keys={"metadata"}) == [
{
"timestamp": timestamp,
"timestamp": time_after(timestamp),
"width": 1,
"name": "d:transactions/foo@none",
"value": [16.0, 17.0],
Expand Down Expand Up @@ -510,8 +510,8 @@ def test_metrics_with_processing(mini_sentry, relay_with_processing, metrics_con
"tags": {},
"value": 42.0,
"type": "c",
"timestamp": timestamp,
"received_at": time_within_delta(timestamp),
"timestamp": time_after(timestamp),
"received_at": time_after(timestamp)
}

assert metrics["headers"]["c:custom/bar@second"] == [("namespace", b"custom")]
Expand All @@ -523,8 +523,8 @@ def test_metrics_with_processing(mini_sentry, relay_with_processing, metrics_con
"tags": {},
"value": 17.0,
"type": "c",
"timestamp": timestamp,
"received_at": time_within_delta(timestamp),
"timestamp": time_after(timestamp),
"received_at": time_after(timestamp)
}


Expand Down Expand Up @@ -563,8 +563,8 @@ def test_global_metrics_with_processing(
"tags": {},
"value": 42.0,
"type": "c",
"timestamp": timestamp,
"received_at": time_within_delta(timestamp),
"timestamp": time_after(timestamp),
"received_at": time_after(timestamp)
}

assert metrics["headers"]["c:custom/bar@second"] == [("namespace", b"custom")]
Expand All @@ -576,8 +576,8 @@ def test_global_metrics_with_processing(
"tags": {},
"value": 17.0,
"type": "c",
"timestamp": timestamp,
"received_at": time_within_delta(timestamp),
"timestamp": time_after(timestamp),
"received_at": time_after(timestamp)
}


Expand Down Expand Up @@ -616,8 +616,8 @@ def test_metrics_full(mini_sentry, relay, relay_with_processing, metrics_consume
"tags": {},
"value": 15.0,
"type": "c",
"timestamp": time_within_delta(timestamp),
"received_at": time_within_delta(timestamp)
"timestamp": time_after(timestamp),
"received_at": time_after(timestamp)
}

metrics_consumer.assert_empty()
Expand Down Expand Up @@ -696,12 +696,11 @@ def test_session_metrics_processing(
metrics = metrics_by_name(metrics_consumer, 2)

now_timestamp = int(now.timestamp())
started_timestamp = int(started.timestamp())
assert metrics["c:sessions/session@none"] == {
"org_id": 1,
"project_id": 42,
"retention_days": 90,
"timestamp": started_timestamp,
"timestamp": time_after(now_timestamp),
"name": "c:sessions/session@none",
"type": "c",
"value": 1.0,
Expand All @@ -711,14 +710,14 @@ def test_session_metrics_processing(
"release": "sentry-test@1.0.0",
"session.status": "init",
},
"received_at": time_within_delta(now_timestamp),
"received_at": time_after(now_timestamp),
}

assert metrics["s:sessions/user@none"] == {
"org_id": 1,
"project_id": 42,
"retention_days": 90,
"timestamp": started_timestamp,
"timestamp": time_after(now_timestamp),
"name": "s:sessions/user@none",
"type": "s",
"value": [1617781333],
Expand All @@ -727,7 +726,7 @@ def test_session_metrics_processing(
"environment": "production",
"release": "sentry-test@1.0.0",
},
"received_at": time_within_delta(now_timestamp),
"received_at": time_after(now_timestamp)
}


Expand Down Expand Up @@ -854,7 +853,7 @@ def test_transaction_metrics(

timestamp = int(timestamp.timestamp())
common = {
"timestamp": timestamp,
"timestamp": time_after(timestamp),
"org_id": 1,
"project_id": 42,
"retention_days": 90,
Expand All @@ -863,7 +862,7 @@ def test_transaction_metrics(
"platform": "other",
"transaction.status": "unknown",
},
"received_at": time_within_delta(timestamp),
"received_at": time_after(timestamp)
}

assert metrics["c:spans/usage@none"]["value"] == 2
Expand Down Expand Up @@ -900,7 +899,7 @@ def test_transaction_metrics(
"value": [9.910106, 9.910106],
}
assert metrics["c:transactions/count_per_root_project@none"] == {
"timestamp": timestamp,
"timestamp": time_after(timestamp),
"org_id": 1,
"project_id": 42,
"retention_days": 90,
Expand All @@ -911,7 +910,7 @@ def test_transaction_metrics(
"name": "c:transactions/count_per_root_project@none",
"type": "c",
"value": 2.0,
"received_at": time_within_delta(timestamp),
"received_at": time_after(timestamp)
}


Expand Down Expand Up @@ -970,26 +969,26 @@ def test_transaction_metrics_count_per_root_project(

timestamp = int(timestamp.timestamp())
assert metrics_by_project[41]["c:transactions/count_per_root_project@none"] == {
"timestamp": timestamp,
"timestamp": time_after(timestamp),
"org_id": 1,
"project_id": 41,
"retention_days": 90,
"tags": {"decision": "keep", "transaction": "test"},
"name": "c:transactions/count_per_root_project@none",
"type": "c",
"value": 1.0,
"received_at": time_within_delta(timestamp),
"received_at": time_after(timestamp)
}
assert metrics_by_project[42]["c:transactions/count_per_root_project@none"] == {
"timestamp": timestamp,
"timestamp": time_after(timestamp),
"org_id": 1,
"project_id": 42,
"retention_days": 90,
"tags": {"decision": "keep"},
"name": "c:transactions/count_per_root_project@none",
"type": "c",
"value": 2.0,
"received_at": time_within_delta(timestamp),
"received_at": time_after(timestamp),
}


Expand Down Expand Up @@ -1282,14 +1281,14 @@ def test_graceful_shutdown(mini_sentry, relay):
)
assert received_metrics == [
{
"timestamp": future_timestamp,
"timestamp": time_within_delta(future_timestamp, timedelta(seconds=1)),
"width": 1,
"name": "c:transactions/future@none",
"value": 17.0,
"type": "c",
},
{
"timestamp": past_timestamp,
"timestamp": time_within_delta(past_timestamp, timedelta(seconds=1)),
"width": 1,
"name": "c:transactions/past@none",
"value": 42.0,
Expand Down Expand Up @@ -1496,7 +1495,7 @@ def test_generic_metric_extraction(mini_sentry, relay):
json.loads(item.get_bytes().decode()), keys={"metadata"}
)
assert {
"timestamp": int(timestamp.timestamp()),
"timestamp": time_after(int(timestamp.timestamp())),
"width": 1,
"name": "c:transactions/on_demand@none",
"type": "c",
Expand Down Expand Up @@ -1584,9 +1583,6 @@ def test_span_metrics_secondary_aggregator(
processing.send_transaction(project_id, transaction)

metrics = metrics_consumer.get_metrics()
for metric in metrics:
del metric["received_at"]

# Transaction metrics are still aggregated:
assert all([m[0]["name"].startswith("spans", 2) for m in metrics])

Expand All @@ -1596,6 +1592,7 @@ def test_span_metrics_secondary_aggregator(
if metric["name"] == "d:spans/exclusive_time@millisecond"
]
span_metrics.sort(key=lambda m: m[0]["tags"]["span.op"])
timestamp = int(timestamp.timestamp())
assert span_metrics == [
(
{
Expand All @@ -1610,9 +1607,10 @@ def test_span_metrics_secondary_aggregator(
"span.domain": ",foo,",
"span.op": "db",
},
"timestamp": int(timestamp.timestamp()),
"timestamp": time_after(timestamp),
"type": "d",
"value": [123.0],
"received_at": time_after(timestamp),
},
[("namespace", b"spans")],
),
Expand All @@ -1623,9 +1621,10 @@ def test_span_metrics_secondary_aggregator(
"project_id": 42,
"retention_days": 90,
"tags": {"span.op": "default"},
"timestamp": int(timestamp.timestamp()),
"timestamp": time_after(timestamp),
"type": "d",
"value": [3.0],
"received_at": time_after(timestamp),
},
[("namespace", b"spans")],
),
Expand Down Expand Up @@ -1847,14 +1846,14 @@ def test_profiles_metrics(mini_sentry, relay):
)
assert received_metrics == [
{
"timestamp": timestamp,
"timestamp": time_after(timestamp),
"width": 1,
"name": "c:profiles/bar@none",
"value": 17.0,
"type": "c",
},
{
"timestamp": timestamp,
"timestamp": time_after(timestamp),
"width": 1,
"name": "c:profiles/foo@none",
"value": 42.0,
Expand Down Expand Up @@ -1921,20 +1920,18 @@ def test_metrics_received_at(
"organizations:custom-metrics",
]

before = int(datetime.now(tz=timezone.utc).timestamp())

timestamp = int(datetime.now(tz=timezone.utc).timestamp())
relay.send_metrics(project_id, "custom/foo:1337|d")

metric, _ = metrics_consumer.get_metric()

assert metric == {
"org_id": 0,
"project_id": 42,
"name": "d:custom/foo@none",
"type": "d",
"value": [1337.0],
"timestamp": time_within(before),
"timestamp": time_after(timestamp),
"tags": {},
"retention_days": 90,
"received_at": time_within(before),
"received_at": time_after(timestamp),
}