-
-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Metrics - Cloudwatch v2 #668
Changes from 1 commit
7702703
ebc199f
f261280
7ba11af
9c0b674
904b106
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
- Loading branch information
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,249 @@ | ||
// Package cloudwatch2 emits all data as a StatisticsSet (rather than | ||
// a singular Value) to CloudWatch via the aws-sdk-go-v2 SDK. | ||
package cloudwatch2 | ||
|
||
import ( | ||
"math" | ||
"os" | ||
"sync" | ||
"time" | ||
|
||
"github.com/aws/aws-sdk-go-v2/aws" | ||
"github.com/aws/aws-sdk-go-v2/service/cloudwatch" | ||
"github.com/aws/aws-sdk-go-v2/service/cloudwatch/cloudwatchiface" | ||
|
||
"github.com/go-kit/kit/log" | ||
"github.com/go-kit/kit/metrics" | ||
"github.com/go-kit/kit/metrics/convert" | ||
"github.com/go-kit/kit/metrics/internal/lv" | ||
) | ||
|
||
const ( | ||
maxConcurrentRequests = 20 | ||
) | ||
|
||
// CloudWatch receives metrics observations and forwards them to CloudWatch. | ||
// Create a CloudWatch object, use it to create metrics, and pass those metrics as | ||
// dependencies to the components that will use them. | ||
// | ||
// To regularly report metrics to CloudWatch, use the WriteLoop helper method. | ||
type CloudWatch struct { | ||
mtx sync.RWMutex | ||
sem chan struct{} | ||
namespace string | ||
svc cloudwatchiface.CloudWatchAPI | ||
counters *lv.Space | ||
logger log.Logger | ||
numConcurrentRequests int | ||
} | ||
|
||
type option func(*CloudWatch) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since this is returned by exported functions, it's nicer to also make it exported. (And commented, etc.) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will do. |
||
|
||
func (cw *CloudWatch) apply(opt option) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this method is never used. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ahh, yup. That's what I get for doin' a copypasta. |
||
if opt != nil { | ||
opt(cw) | ||
} | ||
} | ||
|
||
// WithLogger sets the Logger that will recieve error messages generated | ||
// during the WriteLoop | ||
func WithLogger(logger log.Logger) option { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I find it's nice if functional options describe the default behavior if the option isn't invoked. In this case, it might be nice to add: "By default, no logger is used." There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixing... |
||
return func(cw *CloudWatch) { | ||
cw.logger = logger | ||
} | ||
} | ||
|
||
// WithConcurrentRequests sets the upper limit on how many | ||
// cloudwatch.PutMetricDataRequest may be under way at any | ||
// given time. If n is greater than 20, 20 is used. | ||
func WithConcurrentRequests(n int) option { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Similarly: "By default, 10 concurrent requests are used." There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Adding... |
||
return func(cw *CloudWatch) { | ||
if n > maxConcurrentRequests { | ||
n = maxConcurrentRequests | ||
} | ||
cw.numConcurrentRequests = n | ||
} | ||
} | ||
|
||
// New returns a CloudWatch object that may be used to create metrics. | ||
// Namespace is applied to all created metrics and maps to the CloudWatch namespace. | ||
// Callers must ensure that regular calls to Send are performed, either | ||
// manually or with one of the helper methods. | ||
func New(namespace string, svc cloudwatchiface.CloudWatchAPI, options ...option) *CloudWatch { | ||
cw := &CloudWatch{ | ||
sem: nil, // set below | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this field be omitted since it's set below? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will do! |
||
namespace: namespace, | ||
svc: svc, | ||
counters: lv.NewSpace(), | ||
numConcurrentRequests: 10, | ||
logger: log.NewLogfmtLogger(os.Stderr), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. default should be There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed. |
||
} | ||
|
||
for _, optFunc := range options { | ||
optFunc(cw) | ||
} | ||
|
||
cw.sem = make(chan struct{}, cw.numConcurrentRequests) | ||
|
||
return cw | ||
} | ||
|
||
// NewCounter returns a counter. Observations are aggregated and emitted once | ||
// per write invocation. | ||
func (cw *CloudWatch) NewCounter(name string) metrics.Counter { | ||
return &Counter{ | ||
name: name, | ||
obs: cw.counters.Observe, | ||
} | ||
} | ||
|
||
// NewGauge returns an gauge. Under the covers, there is no distinctions | ||
// in CloudWatch for how Counters/Histograms/Gauges are reported, so this | ||
// just wraps a cloudwatch2.Counter. | ||
func (cw *CloudWatch) NewGauge(name string) metrics.Gauge { | ||
return convert.NewCounterAsGauge(cw.NewCounter(name)) | ||
} | ||
|
||
// NewHistogram returns a histogram. Under the covers, there is no distinctions | ||
// in CloudWatch for how Counters/Histograms/Gauges are reported, so this | ||
// just wraps a cloudwatch2.Counter. | ||
func (cw *CloudWatch) NewHistogram(name string) metrics.Histogram { | ||
return convert.NewCounterAsHistogram(cw.NewCounter(name)) | ||
} | ||
|
||
// WriteLoop is a helper method that invokes Send every time the passed | ||
// channel fires. This method blocks until the channel is closed, so clients | ||
// probably want to run it in its own goroutine. For typical usage, create a | ||
// time.Ticker and pass its C channel to this method. | ||
func (cw *CloudWatch) WriteLoop(c <-chan time.Time) { | ||
for range c { | ||
if err := cw.Send(); err != nil { | ||
cw.logger.Log("during", "Send", "err", err) | ||
} | ||
} | ||
} | ||
|
||
// Send will fire an API request to CloudWatch with the latest stats for | ||
// all metrics. It is preferred that the WriteLoop method is used. | ||
func (cw *CloudWatch) Send() error { | ||
cw.mtx.RLock() | ||
defer cw.mtx.RUnlock() | ||
now := time.Now() | ||
|
||
var datums []cloudwatch.MetricDatum | ||
|
||
cw.counters.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool { | ||
datums = append(datums, cloudwatch.MetricDatum{ | ||
MetricName: aws.String(name), | ||
Dimensions: makeDimensions(lvs...), | ||
StatisticValues: stats(values), | ||
Timestamp: aws.Time(now), | ||
}) | ||
return true | ||
}) | ||
|
||
var batches [][]cloudwatch.MetricDatum | ||
for len(datums) > 0 { | ||
var batch []cloudwatch.MetricDatum | ||
lim := len(datums) | ||
if lim > maxConcurrentRequests { | ||
lim = maxConcurrentRequests | ||
} | ||
batch, datums = datums[:lim], datums[lim:] | ||
batches = append(batches, batch) | ||
} | ||
|
||
var errors = make(chan error, len(batches)) | ||
for _, batch := range batches { | ||
go func(batch []cloudwatch.MetricDatum) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the goroutines could run as an errorgroup There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oooh, cool! I hadn't yet added |
||
cw.sem <- struct{}{} | ||
defer func() { | ||
<-cw.sem | ||
}() | ||
req := cw.svc.PutMetricDataRequest(&cloudwatch.PutMetricDataInput{ | ||
Namespace: aws.String(cw.namespace), | ||
MetricData: batch, | ||
}) | ||
_, err := req.Send() | ||
errors <- err | ||
}(batch) | ||
} | ||
var firstErr error | ||
for i := 0; i < cap(errors); i++ { | ||
if err := <-errors; err != nil && firstErr != nil { | ||
firstErr = err | ||
} | ||
} | ||
|
||
return firstErr | ||
} | ||
|
||
var zero = float64(0.0) | ||
var zeros = cloudwatch.StatisticSet{ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is only used once, any reason it should be a package var? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My intent was to avoid paying the "construction costs" every time a zero set is needed. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ... Should I do something different to make that apparent? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe just a light comment could work. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure thing! |
||
Maximum: &zero, | ||
Minimum: &zero, | ||
Sum: &zero, | ||
SampleCount: &zero, | ||
} | ||
|
||
func stats(a []float64) *cloudwatch.StatisticSet { | ||
count := float64(len(a)) | ||
if count == 0 { | ||
return &zeros | ||
} | ||
|
||
var sum float64 | ||
var min = math.MaxFloat64 | ||
var max = math.MaxFloat64 * -1 | ||
for _, f := range a { | ||
sum += f | ||
if f < min { | ||
min = f | ||
} | ||
if f > max { | ||
max = f | ||
} | ||
} | ||
|
||
return &cloudwatch.StatisticSet{ | ||
Maximum: &max, | ||
Minimum: &min, | ||
Sum: &sum, | ||
SampleCount: &count, | ||
} | ||
} | ||
|
||
func makeDimensions(labelValues ...string) []cloudwatch.Dimension { | ||
dimensions := make([]cloudwatch.Dimension, len(labelValues)/2) | ||
for i, j := 0, 0; i < len(labelValues); i, j = i+2, j+1 { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ha! Wild. Nice. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I can't take credit for this... I lifted this directly from the existing |
||
dimensions[j] = cloudwatch.Dimension{ | ||
Name: aws.String(labelValues[i]), | ||
Value: aws.String(labelValues[i+1]), | ||
} | ||
} | ||
return dimensions | ||
} | ||
|
||
type observeFunc func(name string, lvs lv.LabelValues, value float64) | ||
|
||
// Counter is a counter. Observations are forwarded to a node | ||
// object, and aggregated per timeseries. | ||
type Counter struct { | ||
name string | ||
lvs lv.LabelValues | ||
obs observeFunc | ||
} | ||
|
||
// With implements metrics.Counter. | ||
func (c *Counter) With(labelValues ...string) metrics.Counter { | ||
return &Counter{ | ||
name: c.name, | ||
lvs: c.lvs.With(labelValues...), | ||
obs: c.obs, | ||
} | ||
} | ||
|
||
// Add implements metrics.Counter. | ||
func (c *Counter) Add(delta float64) { | ||
c.obs(c.name, c.lvs, delta) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,147 @@ | ||
package cloudwatch2 | ||
|
||
import ( | ||
"strings" | ||
"testing" | ||
|
||
"github.com/aws/aws-sdk-go-v2/aws" | ||
"github.com/aws/aws-sdk-go-v2/service/cloudwatch" | ||
"github.com/aws/aws-sdk-go-v2/service/cloudwatch/cloudwatchiface" | ||
) | ||
|
||
func TestStats(t *testing.T) { | ||
testCases := []struct { | ||
name string | ||
vals []float64 | ||
xMin float64 | ||
xMax float64 | ||
xSum float64 | ||
xCt float64 | ||
}{ | ||
{ | ||
"empty", | ||
[]float64{}, | ||
0.0, | ||
0.0, | ||
0.0, | ||
0.0, | ||
}, | ||
{ | ||
"single", | ||
[]float64{3.1416}, | ||
3.1416, | ||
3.1416, | ||
3.1416, | ||
1.0, | ||
}, | ||
{ | ||
"double", | ||
[]float64{1.0, 9.0}, | ||
1.0, | ||
9.0, | ||
10.0, | ||
2.0, | ||
}, | ||
{ | ||
"multiple", | ||
[]float64{5.0, 1.0, 9.0, 5.0}, | ||
1.0, | ||
9.0, | ||
20.0, | ||
4.0, | ||
}, | ||
} | ||
|
||
for _, tc := range testCases { | ||
t.Run(tc.name, func(t *testing.T) { | ||
s := stats(tc.vals) | ||
if tc.xMin != *s.Minimum { | ||
t.Errorf("expected [%f]: %f\n", tc.xMin, *s.Minimum) | ||
} | ||
if tc.xMax != *s.Maximum { | ||
t.Errorf("expected [%f]: %f\n", tc.xMax, *s.Maximum) | ||
} | ||
if tc.xSum != *s.Sum { | ||
t.Errorf("expected [%f]: %f\n", tc.xSum, *s.Sum) | ||
} | ||
if tc.xCt != *s.SampleCount { | ||
t.Errorf("expected [%f]: %f\n", tc.xCt, *s.SampleCount) | ||
} | ||
}) | ||
} | ||
} | ||
|
||
type mockCloudWatch struct { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Great. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 😊 |
||
cloudwatchiface.CloudWatchAPI | ||
latestName string | ||
latestData []cloudwatch.MetricDatum | ||
} | ||
|
||
func (mcw *mockCloudWatch) PutMetricDataRequest(in *cloudwatch.PutMetricDataInput) cloudwatch.PutMetricDataRequest { | ||
mcw.latestName = *in.Namespace | ||
mcw.latestData = in.MetricData | ||
return cloudwatch.PutMetricDataRequest{ | ||
// To mock the V2 API, most of the functions spit | ||
// out structs that you need to call Send() on. | ||
// The non-intuitive thing is that to get the Send() to avoid actually | ||
// going across the wire, you just create a dumb aws.Request with either | ||
// aws.Request.Data defined (for succes) or with aws.Request.Error | ||
// to simulate an Error. | ||
Request: &aws.Request{Data: &cloudwatch.PutMetricDataOutput{}}, | ||
Input: in, | ||
} | ||
} | ||
|
||
func TestSend(t *testing.T) { | ||
ns := "example-namespace" | ||
svc := &mockCloudWatch{} | ||
cw := New(ns, svc) | ||
|
||
c := cw.NewCounter("c").With("charlie", "cat") | ||
h := cw.NewHistogram("h").With("hotel", "horse") | ||
g := cw.NewGauge("g").With("golf", "giraffe") | ||
|
||
c.Add(4.0) | ||
c.Add(5.0) | ||
c.Add(6.0) | ||
h.Observe(3.0) | ||
h.Observe(5.0) | ||
h.Observe(7.0) | ||
g.Set(2.0) | ||
g.Set(5.0) | ||
g.Set(8.0) | ||
|
||
err := cw.Send() | ||
if err != nil { | ||
t.Fatalf("unexpected: %v\n", err) | ||
} | ||
|
||
if ns != svc.latestName { | ||
t.Errorf("expected namespace %q; not %q\n", ns, svc.latestName) | ||
} | ||
|
||
if len(svc.latestData) != 3 { | ||
t.Errorf("expected 3 datums: %v\n", svc.latestData) | ||
} | ||
for _, datum := range svc.latestData { | ||
initial := *datum.MetricName | ||
if len(datum.Dimensions) != 1 { | ||
t.Errorf("expected 1 dimension: %v\n", datum) | ||
} | ||
if !strings.HasPrefix(*datum.Dimensions[0].Name, initial) { | ||
t.Errorf("expected %q in Name of %v\n", initial, datum.Dimensions) | ||
} | ||
if !strings.HasPrefix(*datum.Dimensions[0].Value, initial) { | ||
t.Errorf("expected %q in Value of %v\n", initial, datum.Dimensions) | ||
} | ||
if datum.StatisticValues == nil { | ||
t.Errorf("expected StatisticValues in %v\n", datum) | ||
} | ||
if *datum.StatisticValues.Sum != 15.0 { | ||
t.Errorf("expected 15.0 for Sum in %v\n", datum) | ||
} | ||
if *datum.StatisticValues.SampleCount != 3.0 { | ||
t.Errorf("expected 3.0 for SampleCount in %v\n", datum) | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
package cloudwatchiface 🤦♂️
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yah. It's supposed to be in support of mocking out their service... But it feels so icky.