Skip to content

Commit

Permalink
Collect command metrics in telemetry (#1327)
Browse files Browse the repository at this point in the history
Co-authored-by: w84thesun <dmitry.eremenko@ferretdb.io>
  • Loading branch information
AlekSi and w84thesun authored Oct 26, 2022
1 parent 97d8407 commit 6ffe677
Show file tree
Hide file tree
Showing 24 changed files with 323 additions and 231 deletions.
18 changes: 15 additions & 3 deletions cmd/envtool/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/FerretDB/FerretDB/internal/handlers/pg/pgdb"
"github.com/FerretDB/FerretDB/internal/util/debug"
"github.com/FerretDB/FerretDB/internal/util/logging"
"github.com/FerretDB/FerretDB/internal/util/state"
"github.com/FerretDB/FerretDB/internal/util/version"
)

Expand Down Expand Up @@ -85,8 +86,13 @@ func waitForPostgresPort(ctx context.Context, logger *zap.SugaredLogger, port ui
}

for ctx.Err() == nil {
var pgPool *pgdb.Pool
pgPool, err := pgdb.NewPool(ctx, fmt.Sprintf("postgres://postgres@127.0.0.1:%d/ferretdb", port), logger.Desugar(), false)
p, err := state.NewProvider("")
if err != nil {
return err
}

connString := fmt.Sprintf("postgres://postgres@127.0.0.1:%d/ferretdb", port)
pgPool, err := pgdb.NewPool(ctx, connString, logger.Desugar(), false, p)
if err == nil {
pgPool.Close()
return nil
Expand Down Expand Up @@ -195,7 +201,13 @@ func setupPostgres(ctx context.Context, logger *zap.SugaredLogger) error {
return err
}

pgPool, err := pgdb.NewPool(ctx, "postgres://postgres@127.0.0.1:5432/ferretdb", logger.Desugar(), false)
p, err := state.NewProvider("")
if err != nil {
return err
}

connString := "postgres://postgres@127.0.0.1:5432/ferretdb"
pgPool, err := pgdb.NewPool(ctx, connString, logger.Desugar(), false, p)
if err != nil {
return err
}
Expand Down
11 changes: 3 additions & 8 deletions cmd/ferretdb/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"log"
"os"
"path/filepath"
"sort"
"strings"
"sync"
"time"
Expand All @@ -31,11 +30,9 @@ import (
"github.com/prometheus/common/expfmt"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"golang.org/x/exp/maps"

"github.com/FerretDB/FerretDB/internal/clientconn"
"github.com/FerretDB/FerretDB/internal/clientconn/connmetrics"
"github.com/FerretDB/FerretDB/internal/handlers/common"
"github.com/FerretDB/FerretDB/internal/handlers/registry"
"github.com/FerretDB/FerretDB/internal/util/debug"
"github.com/FerretDB/FerretDB/internal/util/logging"
Expand Down Expand Up @@ -242,6 +239,8 @@ func run() {
debug.RunHandler(ctx, cli.DebugAddr, metricsRegisterer, logger.Named("debug"))
}()

metrics := connmetrics.NewListenerMetrics()

wg.Add(1)

go func() {
Expand All @@ -254,6 +253,7 @@ func run() {
DNT: os.Getenv("DO_NOT_TRACK"),
ExecName: os.Args[0],
P: stateProvider,
ConnMetrics: metrics.ConnMetrics,
L: logger.Named("telemetry"),
UndecidedDelay: cli.Test.Telemetry.UndecidedDelay,
ReportInterval: cli.Test.Telemetry.ReportInterval,
Expand All @@ -262,11 +262,6 @@ func run() {
)
}()

cmdsList := maps.Keys(common.Commands)
sort.Strings(cmdsList)

metrics := connmetrics.NewListenerMetrics(cmdsList)

h, err := registry.NewHandler(cli.Handler, &registry.NewHandlerOpts{
Ctx: ctx,
Logger: logger,
Expand Down
8 changes: 1 addition & 7 deletions ferretdb/ferretdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,11 @@ import (
"errors"
"fmt"
"net/url"
"sort"

"go.uber.org/zap"
"golang.org/x/exp/maps"

"github.com/FerretDB/FerretDB/internal/clientconn"
"github.com/FerretDB/FerretDB/internal/clientconn/connmetrics"
"github.com/FerretDB/FerretDB/internal/handlers/common"
"github.com/FerretDB/FerretDB/internal/handlers/registry"
"github.com/FerretDB/FerretDB/internal/util/logging"
"github.com/FerretDB/FerretDB/internal/util/state"
Expand Down Expand Up @@ -76,10 +73,7 @@ func New(config *Config) (*FerretDB, error) {
return nil, fmt.Errorf("failed to construct handler: %s", err)
}

cmdsList := maps.Keys(common.Commands)
sort.Strings(cmdsList)

metrics := connmetrics.NewListenerMetrics(cmdsList)
metrics := connmetrics.NewListenerMetrics()

h, err := registry.NewHandler(config.Handler, &registry.NewHandlerOpts{
Ctx: context.Background(),
Expand Down
27 changes: 10 additions & 17 deletions integration/commands_administration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -843,34 +843,30 @@ func TestCommandsAdministrationServerStatusMetrics(t *testing.T) {
t.Parallel()

for name, tc := range map[string]struct {
cmds []bson.D
metricsPath types.Path
expectedFields []string
expectedNoZero []string
cmds []bson.D
metricsPath types.Path
expectedNonZero []string
}{
"BasicCmd": {
cmds: []bson.D{
{{"ping", int32(1)}},
},
metricsPath: types.NewPath([]string{"metrics", "commands", "ping"}),
expectedFields: []string{"total", "failed"},
expectedNoZero: []string{"total"},
metricsPath: types.NewPath([]string{"metrics", "commands", "ping"}),
expectedNonZero: []string{"total"},
},
"UpdateCmd": {
cmds: []bson.D{
{{"update", "values"}, {"updates", bson.A{bson.D{{"q", bson.D{{"v", "foo"}}}}}}},
},
metricsPath: types.NewPath([]string{"metrics", "commands", "update"}),
expectedFields: []string{"arrayFilters", "failed", "pipeline", "total"},
expectedNoZero: []string{"total"},
metricsPath: types.NewPath([]string{"metrics", "commands", "update"}),
expectedNonZero: []string{"total"},
},
"UpdateCmdFailed": {
cmds: []bson.D{
{{"update", int32(1)}},
},
metricsPath: types.NewPath([]string{"metrics", "commands", "update"}),
expectedFields: []string{"arrayFilters", "failed", "pipeline", "total"},
expectedNoZero: []string{"failed", "total"},
metricsPath: types.NewPath([]string{"metrics", "commands", "update"}),
expectedNonZero: []string{"failed", "total"},
},
// TODO: https://github.com/FerretDB/FerretDB/issues/9
} {
Expand Down Expand Up @@ -898,11 +894,8 @@ func TestCommandsAdministrationServerStatusMetrics(t *testing.T) {

actualFields := actualDoc.Keys()

sort.Strings(tc.expectedFields)
sort.Strings(actualFields)

assert.Equal(t, tc.expectedFields, actualFields)

var actualNotZeros []string
for key, value := range actualDoc.Map() {
assert.IsType(t, int64(0), value)
Expand All @@ -912,7 +905,7 @@ func TestCommandsAdministrationServerStatusMetrics(t *testing.T) {
}
}

for _, expectedName := range tc.expectedNoZero {
for _, expectedName := range tc.expectedNonZero {
assert.Contains(t, actualNotZeros, expectedName)
}
})
Expand Down
8 changes: 1 addition & 7 deletions integration/setup/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"fmt"
"net"
"net/url"
"sort"
"sync"
"testing"

Expand All @@ -31,11 +30,9 @@ import (
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.uber.org/zap"
"golang.org/x/exp/maps"

"github.com/FerretDB/FerretDB/internal/clientconn"
"github.com/FerretDB/FerretDB/internal/clientconn/connmetrics"
"github.com/FerretDB/FerretDB/internal/handlers/common"
"github.com/FerretDB/FerretDB/internal/handlers/registry"
"github.com/FerretDB/FerretDB/internal/util/debug"
"github.com/FerretDB/FerretDB/internal/util/logging"
Expand Down Expand Up @@ -105,10 +102,7 @@ func setupListener(tb testing.TB, ctx context.Context, logger *zap.Logger) int {
u, err := url.Parse(*postgreSQLURLF)
require.NoError(tb, err)

cmdsList := maps.Keys(common.Commands)
sort.Strings(cmdsList)

metrics := connmetrics.NewListenerMetrics(cmdsList)
metrics := connmetrics.NewListenerMetrics()

h, err := registry.NewHandler(*handlerF, &registry.NewHandlerOpts{
Ctx: ctx,
Expand Down
55 changes: 33 additions & 22 deletions internal/clientconn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"sync/atomic"
"time"

"github.com/AlekSi/pointer"
"github.com/pmezard/go-difflib/difflib"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
Expand Down Expand Up @@ -290,30 +289,31 @@ func (c *conn) run(ctx context.Context) (err error) {
//
// The possible resBody returns:
// - normal response - to be returned to the client, closeConn is false;
// - protocol error (*common.Error, possibly wrapped) - to be returned to the client, closeConn is false;
// - protocol error - to be returned to the client, closeConn is false;
// - any other error - to be returned to the client as InternalError before terminating connection, closeConn is true.
//
// Handlers to which it routes, should not panic on bad input, but may do so in "impossible" cases.
// They also should not use recover(). That allows us to use fuzzing.
func (c *conn) route(ctx context.Context, reqHeader *wire.MsgHeader, reqBody wire.MsgBody) (resHeader *wire.MsgHeader, resBody wire.MsgBody, closeConn bool) { //nolint:lll // argument list is too long
requests := c.m.Requests.MustCurryWith(prometheus.Labels{"opcode": reqHeader.OpCode.String()})
var command string
var result *string
defer func() {
if result == nil {
result = pointer.ToString("panic")
}

c.m.Responses.WithLabelValues(resHeader.OpCode.String(), command, *result).Inc()
}()

connInfo := &conninfo.ConnInfo{
PeerAddr: c.netConn.RemoteAddr(),
AggregationStages: c.m.AggregationStages,
PeerAddr: c.netConn.RemoteAddr(),
}
ctx, cancel := context.WithCancel(conninfo.WithConnInfo(ctx, connInfo))
defer cancel()

var command, result, operator string
defer func() {
if result == "" {
result = "panic"
}

if operator == "" {
operator = "unknown"
}

c.m.Responses.WithLabelValues(resHeader.OpCode.String(), command, operator, result).Inc()
}()

resHeader = new(wire.MsgHeader)
var err error
switch reqHeader.OpCode {
Expand Down Expand Up @@ -354,20 +354,30 @@ func (c *conn) route(ctx context.Context, reqHeader *wire.MsgHeader, reqBody wir
err = lazyerrors.Errorf("unexpected OpCode %s", reqHeader.OpCode)
}

requests.WithLabelValues(command).Inc()
if command == "" {
command = "unknown"
}

c.m.Requests.WithLabelValues(reqHeader.OpCode.String(), command).Inc()

// set body for error
if err != nil {
switch resHeader.OpCode {
case wire.OpCodeMsg:
protoErr, recoverable := common.ProtocolError(err)
closeConn = !recoverable

var res wire.OpMsg
must.NoError(res.SetSections(wire.OpMsgSection{
Documents: []*types.Document{protoErr.Document()},
}))
resBody = &res
result = pointer.ToString(protoErr.Code().String())

result = protoErr.Code().String()

if info := protoErr.ErrInfo(); info != nil {
operator = info.Operator
}

case wire.OpCodeQuery:
fallthrough
Expand All @@ -388,7 +398,7 @@ func (c *conn) route(ctx context.Context, reqHeader *wire.MsgHeader, reqBody wir
case wire.OpCodeCompressed:
// do not panic to make fuzzing easier
closeConn = true
result = pointer.ToString("unhandled")
result = "unhandled"
c.l.Error(
"Handler error for unhandled response opcode",
zap.Error(err), zap.Stringer("opcode", resHeader.OpCode),
Expand All @@ -398,7 +408,7 @@ func (c *conn) route(ctx context.Context, reqHeader *wire.MsgHeader, reqBody wir
default:
// do not panic to make fuzzing easier
closeConn = true
result = pointer.ToString("unexpected")
result = "unexpected"
c.l.Error(
"Handler error for unexpected response opcode",
zap.Error(err), zap.Stringer("opcode", resHeader.OpCode),
Expand All @@ -411,17 +421,18 @@ func (c *conn) route(ctx context.Context, reqHeader *wire.MsgHeader, reqBody wir
// https://github.com/FerretDB/FerretDB/issues/273
b, err := resBody.MarshalBinary()
if err != nil {
result = nil
result = ""
panic(err)
}
resHeader.MessageLength = int32(wire.MsgHeaderLen + len(b))

resHeader.RequestID = atomic.AddInt32(&c.lastRequestID, 1)
resHeader.ResponseTo = reqHeader.RequestID

if result == nil {
result = pointer.ToString("ok")
if result == "" {
result = "ok"
}

return
}

Expand Down
5 changes: 1 addition & 4 deletions internal/clientconn/conninfo/conn_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ package conninfo
import (
"context"
"net"

"github.com/prometheus/client_golang/prometheus"
)

// contextKey is a special type to represent context.WithValue keys a bit more safely.
Expand All @@ -31,8 +29,7 @@ var connInfoKey = contextKey{}

// ConnInfo represents connection info.
type ConnInfo struct {
PeerAddr net.Addr
AggregationStages *prometheus.CounterVec
PeerAddr net.Addr
}

// WithConnInfo returns a new context with the given ConnInfo.
Expand Down
Loading

1 comment on commit 6ffe677

@vercel
Copy link

@vercel vercel bot commented on 6ffe677 Oct 26, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Successfully deployed to the following URLs:

ferret-db – ./

ferret-db-ferretdb.vercel.app
ferret-db.vercel.app
ferret-db-git-main-ferretdb.vercel.app

Please sign in to comment.