Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(metrics): Forward received_at field to Kafka #3561

Merged
merged 24 commits into from
May 22, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Improve
  • Loading branch information
iambriccardo committed May 10, 2024
commit b5ecdb9e4e2a3313b09e1a1beb94a854def4adc3
3 changes: 3 additions & 0 deletions tests/integration/asserts/time.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ def __eq__(self, other):
assert isinstance(other, int)
return self._lower_bound <= other <= self._upper_bound

def __str__(self):
return f"{self._lower_bound} <= x <= {self._upper_bound}"


def time_after(lower_bound):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't assert after, but after until now. I think making the upper bound on time_within optional and defaulting to now is fine.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I was indeed thinking about a better name.

upper_bound = int(datetime.now(tz=timezone.utc).timestamp())
Expand Down
7 changes: 4 additions & 3 deletions tests/integration/test_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ def test_global_metrics_no_config(mini_sentry, relay):
timestamp = int(datetime.now(tz=timezone.utc).timestamp())
metrics = [
{
"timestamp": time_after(timestamp),
"timestamp": timestamp,
"width": 1,
"name": "c:transactions/foo@none",
"value": 17.0,
Expand Down Expand Up @@ -695,11 +695,12 @@ def test_session_metrics_processing(
metrics = metrics_by_name(metrics_consumer, 2)

now_timestamp = int(now.timestamp())
started_timestamp = int(started.timestamp())
assert metrics["c:sessions/session@none"] == {
"org_id": 1,
"project_id": 42,
"retention_days": 90,
"timestamp": time_after(now_timestamp),
"timestamp": time_after(started_timestamp),
"name": "c:sessions/session@none",
"type": "c",
"value": 1.0,
Expand All @@ -716,7 +717,7 @@ def test_session_metrics_processing(
"org_id": 1,
"project_id": 42,
"retention_days": 90,
"timestamp": time_after(now_timestamp),
"timestamp": time_after(started_timestamp),
"name": "s:sessions/user@none",
"type": "s",
"value": [1617781333],
Expand Down
19 changes: 17 additions & 2 deletions tests/integration/test_spans.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

from sentry_sdk.envelope import Envelope, Item, PayloadRef

from .asserts import time_after
from .test_store import make_transaction
from .test_metrics import TEST_CONFIG

Expand Down Expand Up @@ -442,7 +443,8 @@ def test_span_ingestion(
)

duration = timedelta(milliseconds=500)
end = datetime.now(timezone.utc) - timedelta(seconds=1)
now = datetime.now(timezone.utc)
end = now - timedelta(seconds=1)
start = end - duration

# 1 - Send OTel span and sentry span via envelope
Expand Down Expand Up @@ -638,11 +640,11 @@ def test_span_ingestion(
metrics.sort(key=lambda m: (m["name"], sorted(m["tags"].items()), m["timestamp"]))
for metric in metrics:
try:
del metric["received_at"]
metric["value"].sort()
except AttributeError:
pass

now_timestamp = int(now.timestamp())
expected_timestamp = int(end.timestamp())
expected_span_metrics = [
{
Expand All @@ -654,6 +656,7 @@ def test_span_ingestion(
"timestamp": expected_timestamp,
"type": "c",
"value": 3.0,
"received_at": time_after(now_timestamp),
},
{
"name": "c:spans/usage@none",
Expand All @@ -664,6 +667,7 @@ def test_span_ingestion(
"timestamp": expected_timestamp + 1,
"type": "c",
"value": 3.0,
"received_at": time_after(now_timestamp),
},
{
"name": "d:spans/duration@millisecond",
Expand All @@ -681,6 +685,7 @@ def test_span_ingestion(
"timestamp": expected_timestamp + 1,
"type": "d",
"value": [1500.0],
"received_at": time_after(now_timestamp),
},
{
"name": "d:spans/duration@millisecond",
Expand All @@ -694,6 +699,7 @@ def test_span_ingestion(
"timestamp": expected_timestamp,
"type": "d",
"value": [500.0],
"received_at": time_after(now_timestamp),
},
{
"name": "d:spans/duration@millisecond",
Expand All @@ -706,6 +712,7 @@ def test_span_ingestion(
"timestamp": expected_timestamp,
"type": "d",
"value": [500.0, 500.0],
"received_at": time_after(now_timestamp),
},
{
"name": "d:spans/duration@millisecond",
Expand All @@ -716,6 +723,7 @@ def test_span_ingestion(
"timestamp": expected_timestamp + 1,
"type": "d",
"value": [1500.0, 1500.0],
"received_at": time_after(now_timestamp),
},
{
"org_id": 1,
Expand All @@ -733,6 +741,7 @@ def test_span_ingestion(
"span.op": "resource.script",
},
"retention_days": 90,
"received_at": time_after(now_timestamp),
},
{
"org_id": 1,
Expand All @@ -743,6 +752,7 @@ def test_span_ingestion(
"timestamp": expected_timestamp,
"type": "d",
"value": [500.0],
"received_at": time_after(now_timestamp),
},
{
"name": "d:spans/exclusive_time@millisecond",
Expand All @@ -753,6 +763,7 @@ def test_span_ingestion(
"timestamp": expected_timestamp,
"type": "d",
"value": [500.0, 500.0],
"received_at": time_after(now_timestamp),
},
{
"name": "d:spans/exclusive_time@millisecond",
Expand All @@ -763,6 +774,7 @@ def test_span_ingestion(
"timestamp": expected_timestamp + 1,
"type": "d",
"value": [345.0, 345.0],
"received_at": time_after(now_timestamp),
},
{
"org_id": 1,
Expand All @@ -780,6 +792,7 @@ def test_span_ingestion(
"span.op": "resource.script",
},
"retention_days": 90,
"received_at": time_after(now_timestamp),
},
{
"name": "d:spans/exclusive_time_light@millisecond",
Expand All @@ -790,6 +803,7 @@ def test_span_ingestion(
"timestamp": expected_timestamp,
"type": "d",
"value": [500.0],
"received_at": time_after(now_timestamp),
},
{
"name": "d:spans/webvital.score.total@ratio",
Expand All @@ -800,6 +814,7 @@ def test_span_ingestion(
"timestamp": expected_timestamp + 1,
"type": "d",
"value": [0.12121616],
"received_at": time_after(now_timestamp),
},
]
assert [m for m in metrics if ":spans/" in m["name"]] == expected_span_metrics
Expand Down
26 changes: 21 additions & 5 deletions tests/integration/test_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
from requests.exceptions import HTTPError
from sentry_sdk.envelope import Envelope

from .asserts import time_after, time_within_delta


def test_store(mini_sentry, relay_chain):
relay = relay_chain()
Expand Down Expand Up @@ -660,10 +662,12 @@ def generate_ticks():
tick = generate_ticks()

def make_bucket(name, type_, values):
timestamp_ = next(tick)
print(f"TIMESTAMP {timestamp_}")
return {
"org_id": 1,
"project_id": project_id,
"timestamp": next(tick),
"timestamp": timestamp_,
"name": name,
"type": type_,
"value": values,
Expand All @@ -674,6 +678,8 @@ def send_buckets(buckets):
relay.send_metrics_buckets(project_id, buckets)
sleep(0.2)

timestamp = int(datetime.utcnow().timestamp())

# NOTE: Sending these buckets in multiple envelopes because the order of flushing
# and also the order of rate limiting is not deterministic.
send_buckets(
Expand Down Expand Up @@ -723,10 +729,6 @@ def send_buckets(buckets):

# Sort buckets to prevent ordering flakiness:
produced_buckets.sort(key=lambda b: (b["name"], b["value"]))
for bucket in produced_buckets:
del bucket["timestamp"]
del bucket["received_at"]

assert produced_buckets == [
{
"name": "d:sessions/duration@second",
Expand All @@ -736,6 +738,8 @@ def send_buckets(buckets):
"tags": {},
"type": "d",
"value": [1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0],
"timestamp": time_within_delta(timestamp),
"received_at": time_within_delta(timestamp),
},
{
"name": "d:sessions/session@none",
Expand All @@ -745,6 +749,8 @@ def send_buckets(buckets):
"tags": {},
"type": "c",
"value": 1.0,
"timestamp": time_within_delta(timestamp),
"received_at": time_within_delta(timestamp),
},
{
"name": "d:sessions/session@user",
Expand All @@ -754,6 +760,8 @@ def send_buckets(buckets):
"tags": {},
"type": "s",
"value": [1254],
"timestamp": time_within_delta(timestamp),
"received_at": time_within_delta(timestamp),
},
{
"name": "d:transactions/duration@millisecond",
Expand All @@ -763,6 +771,8 @@ def send_buckets(buckets):
"tags": {},
"type": "d",
"value": [1.0, 2.0, 3.0],
"timestamp": time_within_delta(timestamp),
"received_at": time_within_delta(timestamp),
},
{
"name": "d:transactions/duration@millisecond",
Expand All @@ -772,6 +782,8 @@ def send_buckets(buckets):
"tags": {},
"type": "d",
"value": violating_bucket,
"timestamp": time_within_delta(timestamp),
"received_at": time_within_delta(timestamp),
},
{
"name": "d:transactions/measurements.lcp@millisecond",
Expand All @@ -781,6 +793,8 @@ def send_buckets(buckets):
"tags": {},
"type": "d",
"value": [1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0],
"timestamp": time_within_delta(timestamp),
"received_at": time_within_delta(timestamp),
},
{
"name": "d:transactions/measurements.lcp@millisecond",
Expand All @@ -790,6 +804,8 @@ def send_buckets(buckets):
"tags": {},
"type": "d",
"value": [2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0],
"timestamp": time_within_delta(timestamp),
"received_at": time_within_delta(timestamp),
},
]

Expand Down
Loading