diff --git a/CHANGELOG.md b/CHANGELOG.md index aeddf928..6c7a18bd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ - Add platform field to android methods to align with the sample format ([#442](https://github.com/getsentry/vroom/pull/442)) - Exclude unsymbolicated frames from metrics ingestion ([#441](https://github.com/getsentry/vroom/pull/441)) - Filter out system frames when ingesting functions into generic metrics ([#444](https://github.com/getsentry/vroom/pull/444)) +- Store profile chunks. ([#463](https://github.com/getsentry/vroom/pull/463)) **Bug Fixes**: diff --git a/cmd/vroom/chunk.go b/cmd/vroom/chunk.go new file mode 100644 index 00000000..ac5923fb --- /dev/null +++ b/cmd/vroom/chunk.go @@ -0,0 +1,127 @@ +package main + +import ( + "context" + "encoding/json" + "errors" + "io" + "net/http" + "strconv" + + "github.com/getsentry/sentry-go" + "github.com/segmentio/kafka-go" + "gocloud.dev/gcerrors" + + "github.com/getsentry/vroom/internal/chunk" + "github.com/getsentry/vroom/internal/storageutil" +) + +func (env *environment) postChunk(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + hub := sentry.GetHubFromContext(ctx) + + s := sentry.StartSpan(ctx, "processing") + s.Description = "Read HTTP body" + body, err := io.ReadAll(r.Body) + s.Finish() + if err != nil { + hub.CaptureException(err) + w.WriteHeader(http.StatusBadRequest) + return + } + defer r.Body.Close() + + c := new(chunk.Chunk) + s = sentry.StartSpan(ctx, "json.unmarshal") + s.Description = "Unmarshal profile" + err = json.Unmarshal(body, c) + s.Finish() + if err != nil { + hub.CaptureException(err) + w.WriteHeader(http.StatusBadRequest) + return + } + + hub.Scope().SetContext("Profile metadata", map[string]interface{}{ + "chunk_id": c.ID, + "organization_id": strconv.FormatUint(c.OrganizationID, 10), + "profiler_id": c.ProfilerID, + "project_id": strconv.FormatUint(c.ProjectID, 10), + "size": len(body), + }) + + hub.Scope().SetTags(map[string]string{ + "platform": string(c.Platform), + }) + + s = sentry.StartSpan(ctx, "gcs.write") + s.Description = "Write profile to GCS" + err = storageutil.CompressedWrite(ctx, env.storage, c.StoragePath(), body) + s.Finish() + if err != nil { + if errors.Is(err, context.DeadlineExceeded) { + // This is a transient error, we'll retry + w.WriteHeader(http.StatusTooManyRequests) + } else { + // These errors won't be retried + hub.CaptureException(err) + if code := gcerrors.Code(err); code == gcerrors.FailedPrecondition { + w.WriteHeader(http.StatusPreconditionFailed) + } else { + w.WriteHeader(http.StatusInternalServerError) + } + } + return + } + + s = sentry.StartSpan(ctx, "json.marshal") + s.Description = "Marshal chunk Kafka message" + b, err := json.Marshal(buildChunkKafkaMessage(c)) + s.Finish() + if err != nil { + hub.CaptureException(err) + w.WriteHeader(http.StatusInternalServerError) + return + } + s = sentry.StartSpan(ctx, "processing") + s.Description = "Send chunk to Kafka" + err = env.profilingWriter.WriteMessages(ctx, kafka.Message{ + Topic: env.config.ProfileChunksKafkaTopic, + Value: b, + }) + if err != nil { + hub.CaptureException(err) + w.WriteHeader(http.StatusInternalServerError) + return + } + s.Finish() + + w.WriteHeader(http.StatusNoContent) +} + +type ( + ChunkKafkaMessage struct { + ProjectID uint64 `json:"project_id"` + ProfilerID string `json:"profiler_id"` + ChunkID string `json:"chunk_id"` + + StartTimestamp float64 `json:"start_timestamp"` + EndTimestamp float64 `json:"end_timestamp"` + + Received float64 `json:"received"` + RetentionDays int `json:"retention_days"` + } +) + +func buildChunkKafkaMessage(c *chunk.Chunk) *ChunkKafkaMessage { + start, end := c.StartEndTimestamps() + return &ChunkKafkaMessage{ + ChunkID: c.ID, + ProjectID: c.ProjectID, + ProfilerID: c.ProfilerID, + StartTimestamp: start, + EndTimestamp: end, + Received: c.Received, + RetentionDays: c.RetentionDays, + } +} diff --git a/cmd/vroom/config.go b/cmd/vroom/config.go index 34e7af22..8920f3a6 100644 --- a/cmd/vroom/config.go +++ b/cmd/vroom/config.go @@ -8,14 +8,14 @@ type ( SentryDSN string `env:"SENTRY_DSN"` OccurrencesKafkaBrokers []string `env:"SENTRY_KAFKA_BROKERS_OCCURRENCES" env-default:"localhost:9092"` - OccurrencesKafkaTopic string `env:"SENTRY_KAFKA_TOPIC_OCCURRENCES" env-default:"ingest-occurrences"` + ProfilingKafkaBrokers []string `env:"SENTRY_KAFKA_BROKERS_PROFILING" env-default:"localhost:9092"` + SpansKafkaBrokers []string `env:"SENTRY_KAFKA_BROKERS_SPANS" env-default:"localhost:9092"` - SpansKafkaBrokers []string `env:"SENTRY_KAFKA_BROKERS_SPANS" env-default:"localhost:9092"` - MetricsSummaryKafkaTopic string `env:"SENTRY_KAFKA_TOPIC_METRICS_SUMMARY" env-default:"snuba-metrics-summaries"` - - ProfilingKafkaBrokers []string `env:"SENTRY_KAFKA_BROKERS_PROFILING" env-default:"localhost:9092"` - CallTreesKafkaTopic string `env:"SENTRY_KAFKA_TOPIC_CALL_TREES" env-default:"profiles-call-tree"` - ProfilesKafkaTopic string `env:"SENTRY_KAKFA_TOPIC_PROFILES" env-default:"processed-profiles"` + CallTreesKafkaTopic string `env:"SENTRY_KAFKA_TOPIC_CALL_TREES" env-default:"profiles-call-tree"` + MetricsSummaryKafkaTopic string `env:"SENTRY_KAFKA_TOPIC_METRICS_SUMMARY" env-default:"snuba-metrics-summaries"` + OccurrencesKafkaTopic string `env:"SENTRY_KAFKA_TOPIC_OCCURRENCES" env-default:"ingest-occurrences"` + ProfileChunksKafkaTopic string `env:"SENTRY_KAFKA_TOPIC_PROFILE_CHUNKS" env-default:"snuba-profile-chunks"` + ProfilesKafkaTopic string `env:"SENTRY_KAKFA_TOPIC_PROFILES" env-default:"processed-profiles"` SnubaHost string `env:"SENTRY_SNUBA_HOST" env-default:"http://localhost:1218"` diff --git a/cmd/vroom/main.go b/cmd/vroom/main.go index 4480a641..a31524cd 100644 --- a/cmd/vroom/main.go +++ b/cmd/vroom/main.go @@ -45,9 +45,7 @@ type environment struct { metricsClient *http.Client } -var ( - release string -) +var release string const ( KiB int64 = 1024 @@ -65,8 +63,8 @@ func newEnvironment() (*environment, error) { if err != nil { return nil, err } - ctx := context.Background() + ctx := context.Background() e.storage, err = blob.OpenBucket(ctx, e.config.BucketURL) if err != nil { return nil, err @@ -175,6 +173,7 @@ func (e *environment) newRouter() (*httprouter.Router, error) { e.postFlamegraphFromProfileIDs, }, {http.MethodGet, "/health", e.getHealth}, + {http.MethodPost, "/chunk", e.postChunk}, {http.MethodPost, "/profile", e.postProfile}, {http.MethodPost, "/regressed", e.postRegressed}, } diff --git a/cmd/vroom/profile.go b/cmd/vroom/profile.go index e61a146b..ec1b9d7f 100644 --- a/cmd/vroom/profile.go +++ b/cmd/vroom/profile.go @@ -40,10 +40,7 @@ func (env *environment) postProfile(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusBadRequest) return } - - hub.Scope().SetContext("Profile metadata", map[string]interface{}{ - "Size": len(body), - }) + defer r.Body.Close() var p profile.Profile s = sentry.StartSpan(ctx, "json.unmarshal") @@ -55,17 +52,20 @@ func (env *environment) postProfile(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusBadRequest) return } - defer r.Body.Close() orgID := p.OrganizationID() - profilePlatform := p.Platform() - - hub.Scope().SetTags(map[string]string{ + hub.Scope().SetContext("Profile metadata", map[string]interface{}{ "organization_id": strconv.FormatUint(orgID, 10), - "platform": string(profilePlatform), "profile_id": p.ID(), "project_id": strconv.FormatUint(p.ProjectID(), 10), + "size": len(body), + }) + + profilePlatform := p.Platform() + + hub.Scope().SetTags(map[string]string{ + "platform": string(profilePlatform), }) s = sentry.StartSpan(ctx, "processing") diff --git a/internal/chunk/chunk.go b/internal/chunk/chunk.go new file mode 100644 index 00000000..cd106812 --- /dev/null +++ b/internal/chunk/chunk.go @@ -0,0 +1,66 @@ +package chunk + +import ( + "encoding/json" + "fmt" + + "github.com/getsentry/vroom/internal/debugmeta" + "github.com/getsentry/vroom/internal/frame" + "github.com/getsentry/vroom/internal/platform" +) + +type ( + // Chunk is an implementation of the Sample V2 format. + Chunk struct { + ID string `json:"chunk_id"` + ProfilerID string `json:"profiler_id"` + + DebugMeta debugmeta.DebugMeta `json:"debug_meta"` + + Environment string `json:"environment"` + Platform platform.Platform `json:"platform"` + Release string `json:"release"` + + Version string `json:"version"` + + Profile Data `json:"profile"` + + OrganizationID uint64 `json:"organization_id"` + ProjectID uint64 `json:"project_id"` + Received float64 `json:"received"` + RetentionDays int `json:"retention_days"` + + Measurements json.RawMessage + } + + Data struct { + Frames []frame.Frame + Samples []Sample + Stacks [][]int + ThreadMetadata map[string]map[string]string `json:"thread_metadata"` + } + + Sample struct { + StackID int `json:"stack_id"` + ThreadID string `json:"thread_id"` + Timestamp float64 + } +) + +func (c *Chunk) StoragePath() string { + return fmt.Sprintf( + "%d/%d/%s/%s", + c.OrganizationID, + c.ProjectID, + c.ProfilerID, + c.ID, + ) +} + +func (c *Chunk) StartEndTimestamps() (float64, float64) { + count := len(c.Profile.Samples) + if count == 0 { + return 0, 0 + } + return c.Profile.Samples[0].Timestamp, c.Profile.Samples[count-1].Timestamp +} diff --git a/internal/profile/legacy.go b/internal/profile/legacy.go index 783143b9..6ac1c2fc 100644 --- a/internal/profile/legacy.go +++ b/internal/profile/legacy.go @@ -22,11 +22,16 @@ import ( "github.com/getsentry/vroom/internal/utils" ) -const maxProfileDurationForCallTrees = 15 * time.Second +const ( + maxProfileDurationForCallTrees = 15 * time.Second +) + +var ( + ErrProfileHasNoTrace = errors.New("profile has no trace") + ErrReactHasInvalidJsTrace = errors.New("react-android profile has invalid js trace") -var ErrProfileHasNoTrace = errors.New("profile has no trace") -var ErrReactHasInvalidJsTrace = errors.New("react-android profile has invalid js trace") -var member void + member void +) type ( void struct{}