Skip to content

Commit

Permalink
node stats: support infinity
Browse files Browse the repository at this point in the history
These values are often float64 but sometimes a string, from the api
docs[1]:

> An "Infinity" value for a given flow window indicates that worker
> millis have been spent without any events completing processing

This adds a custom float64 type to enable the use of a custom
marshaller.

[1]: https://www.elastic.co/guide/en/logstash/current/node-stats-api.html
  • Loading branch information
fbs committed Oct 11, 2024
1 parent 1e60b12 commit 1c71aa6
Showing 1 changed file with 47 additions and 16 deletions.
63 changes: 47 additions & 16 deletions fetcher/responses/nodestats_response.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
package responses

import "time"
import (
"encoding/json"
"fmt"
"math"
"time"
)

type PipelineResponse struct {
Workers int `json:"workers"`
Expand Down Expand Up @@ -87,28 +92,28 @@ type EventsResponse struct {

type FlowResponse struct {
InputThroughput struct {
Current float64 `json:"current"`
Lifetime float64 `json:"lifetime"`
Current InfinityFloat `json:"current"`
Lifetime InfinityFloat `json:"lifetime"`
} `json:"input_throughput"`
FilterThroughput struct {
Current float64 `json:"current"`
Lifetime float64 `json:"lifetime"`
Current InfinityFloat `json:"current"`
Lifetime InfinityFloat `json:"lifetime"`
} `json:"filter_throughput"`
OutputThroughput struct {
Current float64 `json:"current"`
Lifetime float64 `json:"lifetime"`
Current InfinityFloat `json:"current"`
Lifetime InfinityFloat `json:"lifetime"`
} `json:"output_throughput"`
QueueBackpressure struct {
Current float64 `json:"current"`
Lifetime float64 `json:"lifetime"`
Current InfinityFloat `json:"current"`
Lifetime InfinityFloat `json:"lifetime"`
} `json:"queue_backpressure"`
WorkerConcurrency struct {
Current float64 `json:"current"`
Lifetime float64 `json:"lifetime"`
Current InfinityFloat `json:"current"`
Lifetime InfinityFloat `json:"lifetime"`
} `json:"worker_concurrency"`
WorkerUtilization struct {
Current float64 `json:"current"`
Lifetime float64 `json:"lifetime"`
Current InfinityFloat `json:"current"`
Lifetime InfinityFloat `json:"lifetime"`
} `json:"worker_utilization"`
}

Expand Down Expand Up @@ -154,11 +159,11 @@ type SinglePipelineResponse struct {
} `json:"events"`
Flow struct {
WorkerUtilization struct {
Current float64 `json:"current"`
Lifetime float64 `json:"lifetime"`
Current InfinityFloat `json:"current"`
Lifetime InfinityFloat `json:"lifetime"`
} `json:"worker_utilization"`
WorkerMillisPerEvent struct {
Lifetime float64 `json:"lifetime"`
Lifetime InfinityFloat `json:"lifetime"`
} `json:"worker_millis_per_event"`
} `json:"flow"`
} `json:"filters"`
Expand Down Expand Up @@ -286,3 +291,29 @@ type NodeStatsResponse struct {

Pipelines map[string]SinglePipelineResponse `json:"pipelines"`
}

// InfinityFloat is a float type that also accepts the string Infinity
type InfinityFloat float64

func (i *InfinityFloat) UnmarshalJSON(data []byte) error {
var s string
err := json.Unmarshal(data, &s)
if err == nil {
if s == "Infinity" {
*i = InfinityFloat(math.Inf(1))
return nil
} else if s == "-Infinity" {
*i = InfinityFloat(math.Inf(-1))
return nil
}
fmt.Errorf("Invalid string value for InfinityFloat: %s", s)
}

var f float64
if err := json.Unmarshal(data, &f); err != nil {
return err
}

*i = InfinityFloat(f)
return nil
}

0 comments on commit 1c71aa6

Please sign in to comment.