Skip to content

Commit

Permalink
Merge pull request prometheus#612 from jhalterman/merge-upstream
Browse files Browse the repository at this point in the history
Merge upstream prometheus/prometheus at 6332248
  • Loading branch information
jhalterman authored Apr 9, 2024
2 parents 99f7a07 + 56bbecc commit efedd71
Show file tree
Hide file tree
Showing 38 changed files with 1,660 additions and 880 deletions.
2 changes: 1 addition & 1 deletion cmd/prometheus/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ func main() {
a.Flag("scrape.discovery-reload-interval", "Interval used by scrape manager to throttle target groups updates.").
Hidden().Default("5s").SetValue(&cfg.scrape.DiscoveryReloadInterval)

a.Flag("enable-feature", "Comma separated feature names to enable. Valid options: agent, auto-gomemlimit, exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-at-modifier, promql-negative-offset, promql-per-step-stats, promql-experimental-functions, remote-write-receiver (DEPRECATED), extra-scrape-metrics, new-service-discovery-manager, auto-gomaxprocs, no-default-scrape-port, native-histograms, otlp-write-receiver. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details.").
a.Flag("enable-feature", "Comma separated feature names to enable. Valid options: agent, auto-gomemlimit, exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-per-step-stats, promql-experimental-functions, remote-write-receiver (DEPRECATED), extra-scrape-metrics, new-service-discovery-manager, auto-gomaxprocs, no-default-scrape-port, native-histograms, otlp-write-receiver, created-timestamp-zero-ingestion, concurrent-rule-eval. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details.").
Default("").StringsVar(&cfg.featureList)

promlogflag.AddFlags(a, &cfg.promlogConfig)
Expand Down
14 changes: 0 additions & 14 deletions cmd/promtool/rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,17 +234,3 @@ func (m *multipleAppender) flushAndCommit(ctx context.Context) error {
}
return nil
}

func max(x, y int64) int64 {
if x > y {
return x
}
return y
}

func min(x, y int64) int64 {
if x < y {
return x
}
return y
}
51 changes: 38 additions & 13 deletions discovery/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -485,8 +485,8 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
eps := NewEndpointSlice(
log.With(d.logger, "role", "endpointslice"),
informer,
cache.NewSharedInformer(slw, &apiv1.Service{}, resyncDisabled),
cache.NewSharedInformer(plw, &apiv1.Pod{}, resyncDisabled),
d.mustNewSharedInformer(slw, &apiv1.Service{}, resyncDisabled),
d.mustNewSharedInformer(plw, &apiv1.Pod{}, resyncDisabled),
nodeInf,
d.metrics.eventCount,
)
Expand Down Expand Up @@ -545,8 +545,8 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
eps := NewEndpoints(
log.With(d.logger, "role", "endpoint"),
d.newEndpointsByNodeInformer(elw),
cache.NewSharedInformer(slw, &apiv1.Service{}, resyncDisabled),
cache.NewSharedInformer(plw, &apiv1.Pod{}, resyncDisabled),
d.mustNewSharedInformer(slw, &apiv1.Service{}, resyncDisabled),
d.mustNewSharedInformer(plw, &apiv1.Pod{}, resyncDisabled),
nodeInf,
d.metrics.eventCount,
)
Expand Down Expand Up @@ -602,7 +602,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
}
svc := NewService(
log.With(d.logger, "role", "service"),
cache.NewSharedInformer(slw, &apiv1.Service{}, resyncDisabled),
d.mustNewSharedInformer(slw, &apiv1.Service{}, resyncDisabled),
d.metrics.eventCount,
)
d.discoverers = append(d.discoverers, svc)
Expand Down Expand Up @@ -641,7 +641,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
return i.Watch(ctx, options)
},
}
informer = cache.NewSharedInformer(ilw, &networkv1.Ingress{}, resyncDisabled)
informer = d.mustNewSharedInformer(ilw, &networkv1.Ingress{}, resyncDisabled)
} else {
i := d.client.NetworkingV1beta1().Ingresses(namespace)
ilw := &cache.ListWatch{
Expand All @@ -656,7 +656,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
return i.Watch(ctx, options)
},
}
informer = cache.NewSharedInformer(ilw, &v1beta1.Ingress{}, resyncDisabled)
informer = d.mustNewSharedInformer(ilw, &v1beta1.Ingress{}, resyncDisabled)
}
ingress := NewIngress(
log.With(d.logger, "role", "ingress"),
Expand Down Expand Up @@ -747,7 +747,7 @@ func (d *Discovery) newNodeInformer(ctx context.Context) cache.SharedInformer {
return d.client.CoreV1().Nodes().Watch(ctx, options)
},
}
return cache.NewSharedInformer(nlw, &apiv1.Node{}, resyncDisabled)
return d.mustNewSharedInformer(nlw, &apiv1.Node{}, resyncDisabled)
}

func (d *Discovery) newPodsByNodeInformer(plw *cache.ListWatch) cache.SharedIndexInformer {
Expand All @@ -762,7 +762,7 @@ func (d *Discovery) newPodsByNodeInformer(plw *cache.ListWatch) cache.SharedInde
}
}

return cache.NewSharedIndexInformer(plw, &apiv1.Pod{}, resyncDisabled, indexers)
return d.mustNewSharedIndexInformer(plw, &apiv1.Pod{}, resyncDisabled, indexers)
}

func (d *Discovery) newEndpointsByNodeInformer(plw *cache.ListWatch) cache.SharedIndexInformer {
Expand All @@ -783,7 +783,7 @@ func (d *Discovery) newEndpointsByNodeInformer(plw *cache.ListWatch) cache.Share
return pods, nil
}
if !d.attachMetadata.Node {
return cache.NewSharedIndexInformer(plw, &apiv1.Endpoints{}, resyncDisabled, indexers)
return d.mustNewSharedIndexInformer(plw, &apiv1.Endpoints{}, resyncDisabled, indexers)
}

indexers[nodeIndex] = func(obj interface{}) ([]string, error) {
Expand All @@ -809,13 +809,13 @@ func (d *Discovery) newEndpointsByNodeInformer(plw *cache.ListWatch) cache.Share
return nodes, nil
}

return cache.NewSharedIndexInformer(plw, &apiv1.Endpoints{}, resyncDisabled, indexers)
return d.mustNewSharedIndexInformer(plw, &apiv1.Endpoints{}, resyncDisabled, indexers)
}

func (d *Discovery) newEndpointSlicesByNodeInformer(plw *cache.ListWatch, object runtime.Object) cache.SharedIndexInformer {
indexers := make(map[string]cache.IndexFunc)
if !d.attachMetadata.Node {
return cache.NewSharedIndexInformer(plw, object, resyncDisabled, indexers)
return d.mustNewSharedIndexInformer(plw, object, resyncDisabled, indexers)
}

indexers[nodeIndex] = func(obj interface{}) ([]string, error) {
Expand Down Expand Up @@ -854,7 +854,32 @@ func (d *Discovery) newEndpointSlicesByNodeInformer(plw *cache.ListWatch, object
return nodes, nil
}

return cache.NewSharedIndexInformer(plw, object, resyncDisabled, indexers)
return d.mustNewSharedIndexInformer(plw, object, resyncDisabled, indexers)
}

func (d *Discovery) informerWatchErrorHandler(r *cache.Reflector, err error) {
d.metrics.failuresCount.Inc()
cache.DefaultWatchErrorHandler(r, err)
}

func (d *Discovery) mustNewSharedInformer(lw cache.ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration) cache.SharedInformer {
informer := cache.NewSharedInformer(lw, exampleObject, defaultEventHandlerResyncPeriod)
// Invoking SetWatchErrorHandler should fail only if the informer has been started beforehand.
// Such a scenario would suggest an incorrect use of the API, thus the panic.
if err := informer.SetWatchErrorHandler(d.informerWatchErrorHandler); err != nil {
panic(err)
}
return informer
}

func (d *Discovery) mustNewSharedIndexInformer(lw cache.ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers cache.Indexers) cache.SharedIndexInformer {
informer := cache.NewSharedIndexInformer(lw, exampleObject, defaultEventHandlerResyncPeriod, indexers)
// Invoking SetWatchErrorHandler should fail only if the informer has been started beforehand.
// Such a scenario would suggest an incorrect use of the API, thus the panic.
if err := informer.SetWatchErrorHandler(d.informerWatchErrorHandler); err != nil {
panic(err)
}
return informer
}

func checkDiscoveryV1Supported(client kubernetes.Interface) (bool, error) {
Expand Down
40 changes: 40 additions & 0 deletions discovery/kubernetes/kubernetes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,16 @@ import (
"time"

"github.com/go-kit/log"
prom_testutil "github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/require"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/version"
"k8s.io/apimachinery/pkg/watch"
fakediscovery "k8s.io/client-go/discovery/fake"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
kubetesting "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache"

"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -314,3 +318,39 @@ func TestCheckNetworkingV1Supported(t *testing.T) {
})
}
}

func TestFailuresCountMetric(t *testing.T) {
tests := []struct {
role Role
minFailedWatches int
}{
{RoleNode, 1},
{RolePod, 1},
{RoleService, 1},
{RoleEndpoint, 3},
{RoleEndpointSlice, 3},
{RoleIngress, 1},
}

for _, tc := range tests {
tc := tc
t.Run(string(tc.role), func(t *testing.T) {
t.Parallel()

n, c := makeDiscovery(tc.role, NamespaceDiscovery{})
// The counter is initialized and no failures at the beginning.
require.Equal(t, float64(0), prom_testutil.ToFloat64(n.metrics.failuresCount))

// Simulate an error on watch requests.
c.Discovery().(*fakediscovery.FakeDiscovery).PrependWatchReactor("*", func(action kubetesting.Action) (bool, watch.Interface, error) {
return true, nil, apierrors.NewUnauthorized("unauthorized")
})

// Start the discovery.
k8sDiscoveryTest{discovery: n}.Run(t)

// At least the errors of the initial watches should be caught (watches are retried on errors).
require.GreaterOrEqual(t, prom_testutil.ToFloat64(n.metrics.failuresCount), float64(tc.minFailedWatches))
})
}
}
13 changes: 12 additions & 1 deletion discovery/kubernetes/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ import (
var _ discovery.DiscovererMetrics = (*kubernetesMetrics)(nil)

type kubernetesMetrics struct {
eventCount *prometheus.CounterVec
eventCount *prometheus.CounterVec
failuresCount prometheus.Counter

metricRegisterer discovery.MetricRegisterer
}
Expand All @@ -37,10 +38,18 @@ func newDiscovererMetrics(reg prometheus.Registerer, rmi discovery.RefreshMetric
},
[]string{"role", "event"},
),
failuresCount: prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: discovery.KubernetesMetricsNamespace,
Name: "failures_total",
Help: "The number of failed WATCH/LIST requests.",
},
),
}

m.metricRegisterer = discovery.NewMetricRegisterer(reg, []prometheus.Collector{
m.eventCount,
m.failuresCount,
})

// Initialize metric vectors.
Expand All @@ -61,6 +70,8 @@ func newDiscovererMetrics(reg prometheus.Registerer, rmi discovery.RefreshMetric
}
}

m.failuresCount.Add(0)

return m
}

Expand Down
51 changes: 44 additions & 7 deletions discovery/linode/linode.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,17 +59,22 @@ const (
linodeLabelSpecsVCPUs = linodeLabel + "specs_vcpus"
linodeLabelSpecsTransferBytes = linodeLabel + "specs_transfer_bytes"
linodeLabelExtraIPs = linodeLabel + "extra_ips"
linodeLabelIPv6Ranges = linodeLabel + "ipv6_ranges"

// This is our events filter; when polling for changes, we care only about
// events since our last refresh.
// Docs: https://www.linode.com/docs/api/account/#events-list
// Docs: https://www.linode.com/docs/api/account/#events-list.
filterTemplate = `{"created": {"+gte": "%s"}}`

// Optional region filtering.
regionFilterTemplate = `{"region": "%s"}`
)

// DefaultSDConfig is the default Linode SD configuration.
var DefaultSDConfig = SDConfig{
TagSeparator: ",",
Port: 80,
Region: "",
RefreshInterval: model.Duration(60 * time.Second),
HTTPClientConfig: config.DefaultHTTPClientConfig,
}
Expand All @@ -85,6 +90,7 @@ type SDConfig struct {
RefreshInterval model.Duration `yaml:"refresh_interval"`
Port int `yaml:"port"`
TagSeparator string `yaml:"tag_separator,omitempty"`
Region string `yaml:"region,omitempty"`
}

// NewDiscovererMetrics implements discovery.Config.
Expand Down Expand Up @@ -122,6 +128,7 @@ type Discovery struct {
*refresh.Discovery
client *linodego.Client
port int
region string
tagSeparator string
lastRefreshTimestamp time.Time
pollCount int
Expand All @@ -139,6 +146,7 @@ func NewDiscovery(conf *SDConfig, logger log.Logger, metrics discovery.Discovere

d := &Discovery{
port: conf.Port,
region: conf.Region,
tagSeparator: conf.TagSeparator,
pollCount: 0,
lastRefreshTimestamp: time.Now().UTC(),
Expand Down Expand Up @@ -224,16 +232,31 @@ func (d *Discovery) refreshData(ctx context.Context) ([]*targetgroup.Group, erro
tg := &targetgroup.Group{
Source: "Linode",
}
opts := linodego.ListOptions{
PageSize: 500,
}

// If region filter provided, use it to constrain results.
if d.region != "" {
opts.Filter = fmt.Sprintf(regionFilterTemplate, d.region)
}

// Gather all linode instances.
instances, err := d.client.ListInstances(ctx, &linodego.ListOptions{PageSize: 500})
instances, err := d.client.ListInstances(ctx, &opts)
if err != nil {
d.metrics.failuresCount.Inc()
return nil, err
}

// Gather detailed IP address info for all IPs on all linode instances.
detailedIPs, err := d.client.ListIPAddresses(ctx, &linodego.ListOptions{PageSize: 500})
detailedIPs, err := d.client.ListIPAddresses(ctx, &opts)
if err != nil {
d.metrics.failuresCount.Inc()
return nil, err
}

// Gather detailed IPv6 Range info for all linode instances.
ipv6RangeList, err := d.client.ListIPv6Ranges(ctx, &opts)
if err != nil {
d.metrics.failuresCount.Inc()
return nil, err
Expand All @@ -248,7 +271,7 @@ func (d *Discovery) refreshData(ctx context.Context) ([]*targetgroup.Group, erro
privateIPv4, publicIPv4, publicIPv6 string
privateIPv4RDNS, publicIPv4RDNS, publicIPv6RDNS string
backupsStatus string
extraIPs []string
extraIPs, ipv6Ranges []string
)

for _, ip := range instance.IPv4 {
Expand Down Expand Up @@ -276,17 +299,23 @@ func (d *Discovery) refreshData(ctx context.Context) ([]*targetgroup.Group, erro
}

if instance.IPv6 != "" {
slaac := strings.Split(instance.IPv6, "/")[0]
for _, detailedIP := range detailedIPs {
if detailedIP.Address != strings.Split(instance.IPv6, "/")[0] {
if detailedIP.Address != slaac {
continue
}

publicIPv6 = detailedIP.Address

if detailedIP.RDNS != "" && detailedIP.RDNS != "null" {
publicIPv6RDNS = detailedIP.RDNS
}
}
for _, ipv6Range := range ipv6RangeList {
if ipv6Range.RouteTarget != slaac {
continue
}
ipv6Ranges = append(ipv6Ranges, fmt.Sprintf("%s/%d", ipv6Range.Range, ipv6Range.Prefix))
}
}

if instance.Backups.Enabled {
Expand Down Expand Up @@ -330,12 +359,20 @@ func (d *Discovery) refreshData(ctx context.Context) ([]*targetgroup.Group, erro

if len(extraIPs) > 0 {
// This instance has more than one of at least one type of IP address (public, private,
// IPv4, IPv6, etc. We provide those extra IPs found here just like we do for instance
// IPv4,etc. We provide those extra IPs found here just like we do for instance
// tags, we surround a separated list with the tagSeparator config.
ips := d.tagSeparator + strings.Join(extraIPs, d.tagSeparator) + d.tagSeparator
labels[linodeLabelExtraIPs] = model.LabelValue(ips)
}

if len(ipv6Ranges) > 0 {
// This instance has more than one IPv6 Ranges routed to it we provide these
// Ranges found here just like we do for instance tags, we surround a separated
// list with the tagSeparator config.
ips := d.tagSeparator + strings.Join(ipv6Ranges, d.tagSeparator) + d.tagSeparator
labels[linodeLabelIPv6Ranges] = model.LabelValue(ips)
}

tg.Targets = append(tg.Targets, labels)
}
return []*targetgroup.Group{tg}, nil
Expand Down
Loading

0 comments on commit efedd71

Please sign in to comment.