From 2f96a4109d57675e4b06fe9a4e1b49d1d085fd23 Mon Sep 17 00:00:00 2001 From: Jeroen Rinzema Date: Thu, 14 May 2020 17:55:04 +0200 Subject: [PATCH] prometheus metrics consumption --- cmd/maestro/config/README.md | 4 + cmd/maestro/config/arguments.go | 5 + cmd/maestro/config/config.go | 12 ++ cmd/maestro/validate/validate.go | 7 +- examples/graphql/config.hcl | 4 + examples/graphql/flow.hcl | 15 ++ go.mod | 2 +- main.go | 5 +- middleware.go | 147 +++++++++++++++ options.go | 32 ++-- pkg/constructor/constructor.go | 15 +- pkg/constructor/options.go | 33 +++- pkg/definitions/hcl/hcl.go | 4 + pkg/definitions/hcl/intermediate.go | 28 +-- pkg/definitions/hcl/options.go | 1 + pkg/flow/flow.go | 47 +++-- pkg/flow/flow_test.go | 42 ++--- pkg/flow/node.go | 66 ++++--- pkg/flow/node_test.go | 76 ++++---- pkg/flow/tracker.go | 4 +- pkg/flow/tracker_test.go | 2 +- pkg/metrics/prometheus/prometheus.go | 258 ++++++++++++++++++++++++++ pkg/transport/grpc/caller_test.go | 2 +- pkg/transport/grpc/grpc_test.go | 2 +- pkg/transport/grpc/listener_test.go | 2 +- pkg/transport/grpc/reflection_test.go | 8 +- pkg/transport/http/listener_test.go | 6 +- 27 files changed, 677 insertions(+), 152 deletions(-) create mode 100644 middleware.go create mode 100644 pkg/metrics/prometheus/prometheus.go diff --git a/cmd/maestro/config/README.md b/cmd/maestro/config/README.md index 408eb686..20a394e3 100644 --- a/cmd/maestro/config/README.md +++ b/cmd/maestro/config/README.md @@ -22,6 +22,10 @@ http { address = "$HTTP_ADDRESS" } +prometheus { + address = ":5050" +} + services { select "proto.users.*" { host = "api.jexia.com" diff --git a/cmd/maestro/config/arguments.go b/cmd/maestro/config/arguments.go index d919f62c..ce5efc4d 100644 --- a/cmd/maestro/config/arguments.go +++ b/cmd/maestro/config/arguments.go @@ -10,6 +10,7 @@ import ( "github.com/jexia/maestro/pkg/definitions/protoc" "github.com/jexia/maestro/pkg/instance" "github.com/jexia/maestro/pkg/logger" + "github.com/jexia/maestro/pkg/metrics/prometheus" "github.com/jexia/maestro/pkg/specs" "github.com/jexia/maestro/pkg/transport/graphql" "github.com/jexia/maestro/pkg/transport/grpc" @@ -62,6 +63,10 @@ func ConstructArguments(params *Maestro) ([]constructor.Option, error) { arguments = append(arguments, maestro.WithListener(grpc.NewListener(params.GRPC.Address, specs.Options{}))) } + if params.Prometheus.Address != "" { + arguments = append(arguments, maestro.WithMiddleware(prometheus.New(params.Prometheus.Address))) + } + arguments = append([]constructor.Option{maestro.WithLogLevel(logger.Global, params.LogLevel)}, arguments...) return arguments, nil diff --git a/cmd/maestro/config/config.go b/cmd/maestro/config/config.go index dedecfde..d6d01756 100644 --- a/cmd/maestro/config/config.go +++ b/cmd/maestro/config/config.go @@ -42,6 +42,12 @@ func Parse(options *hcl.Options, target *Maestro) { Address: options.GRPC.Address, } } + + if options.Prometheus != nil && target.Prometheus.Address == "" { + target.Prometheus = Prometheus{ + Address: options.Prometheus.Address, + } + } } // Maestro configurations @@ -50,10 +56,16 @@ type Maestro struct { HTTP HTTP GraphQL GraphQL GRPC GRPC + Prometheus Prometheus Protobuffers []string Files []string } +// Prometheus configurations +type Prometheus struct { + Address string +} + // HTTP configurations type HTTP struct { Address string diff --git a/cmd/maestro/validate/validate.go b/cmd/maestro/validate/validate.go index fb8c079d..29ebfd2a 100644 --- a/cmd/maestro/validate/validate.go +++ b/cmd/maestro/validate/validate.go @@ -32,7 +32,12 @@ func run(cmd *cobra.Command, args []string) error { } ctx := instance.NewContext() - _, err = constructor.Specs(ctx, functions.Collection{}, maestro.NewOptions(ctx, arguments...)) + options, err := maestro.NewOptions(ctx, arguments...) + if err != nil { + return err + } + + _, err = constructor.Specs(ctx, functions.Collection{}, options) if err != nil { return err } diff --git a/examples/graphql/config.hcl b/examples/graphql/config.hcl index ae79348f..67463286 100644 --- a/examples/graphql/config.hcl +++ b/examples/graphql/config.hcl @@ -7,6 +7,10 @@ graphql { address = ":8080" } +prometheus { + address = ":5050" +} + services { select "proto.*" { host = "https://jsonplaceholder.typicode.com/" diff --git a/examples/graphql/flow.hcl b/examples/graphql/flow.hcl index 95e6a7fe..d5d6ad7a 100644 --- a/examples/graphql/flow.hcl +++ b/examples/graphql/flow.hcl @@ -25,6 +25,12 @@ endpoint "todo" "graphql" { base = "query" } +endpoint "sample" "graphql" { + path = "sample" + name = "Sample" + base = "query" +} + flow "todo" { input "proto.Query" {} @@ -41,3 +47,12 @@ flow "todo" { completed = "{{ query:completed }}" } } + +flow "sample" { + input "proto.Query" {} + + output "proto.Item" { + title = "sample" + completed = true + } +} diff --git a/go.mod b/go.mod index 2fc27b50..e07a38fd 100644 --- a/go.mod +++ b/go.mod @@ -18,7 +18,7 @@ require ( github.com/mitchellh/go-wordwrap v1.0.0 // indirect github.com/onsi/ginkgo v1.10.1 // indirect github.com/onsi/gomega v1.7.0 // indirect - github.com/prometheus/client_golang v1.2.1 // indirect + github.com/prometheus/client_golang v1.2.1 github.com/sirupsen/logrus v1.4.2 github.com/spf13/cobra v0.0.6 github.com/spf13/pflag v1.0.5 // indirect diff --git a/main.go b/main.go index f47f8b4f..0209a539 100644 --- a/main.go +++ b/main.go @@ -67,7 +67,10 @@ func (client *Client) Close() { // New constructs a new Maestro instance func New(opts ...constructor.Option) (*Client, error) { ctx := instance.NewContext() - options := NewOptions(ctx, opts...) + options, err := NewOptions(ctx, opts...) + if err != nil { + return nil, err + } mem := functions.Collection{} collection, err := constructor.Specs(ctx, mem, options) diff --git a/middleware.go b/middleware.go new file mode 100644 index 00000000..bc027f04 --- /dev/null +++ b/middleware.go @@ -0,0 +1,147 @@ +package maestro + +import ( + "context" + + "github.com/jexia/maestro/pkg/constructor" + "github.com/jexia/maestro/pkg/flow" + "github.com/jexia/maestro/pkg/instance" + "github.com/jexia/maestro/pkg/refs" +) + +// WithMiddleware initialises the given middleware and defines all options +func WithMiddleware(middleware constructor.Middleware) constructor.Option { + return func(options *constructor.Options) { + options.Middleware = append(options.Middleware, middleware) + } +} + +// AfterConstructor the passed function gets called once all options have been applied +func AfterConstructor(wrapper constructor.AfterConstructorHandler) constructor.Option { + return func(options *constructor.Options) { + if options.AfterConstructor == nil { + options.AfterConstructor = wrapper(func(instance.Context, *constructor.Collection) error { return nil }) + return + } + + options.AfterConstructor = wrapper(options.AfterConstructor) + } +} + +// BeforeManagerDo the passed function gets called before a request gets handled by a flow manager +func BeforeManagerDo(wrapper flow.BeforeManagerHandler) constructor.Option { + return func(options *constructor.Options) { + if options.BeforeManagerDo == nil { + options.BeforeManagerDo = wrapper(func(ctx context.Context, manager *flow.Manager, store refs.Store) (context.Context, error) { + return ctx, nil + }) + + return + } + + options.BeforeManagerDo = wrapper(options.BeforeManagerDo) + } +} + +// BeforeManagerRollback the passed function gets called before a rollback request gets handled by a flow manager +func BeforeManagerRollback(wrapper flow.BeforeManagerHandler) constructor.Option { + return func(options *constructor.Options) { + if options.BeforeManagerRollback == nil { + options.BeforeManagerRollback = wrapper(func(ctx context.Context, manager *flow.Manager, store refs.Store) (context.Context, error) { + return ctx, nil + }) + + return + } + + options.BeforeManagerRollback = wrapper(options.BeforeManagerRollback) + } +} + +// AfterManagerDo the passed function gets after a flow has been handled by the flow manager +func AfterManagerDo(wrapper flow.AfterManagerHandler) constructor.Option { + return func(options *constructor.Options) { + if options.AfterManagerDo == nil { + options.AfterManagerDo = wrapper(func(ctx context.Context, manager *flow.Manager, store refs.Store) (context.Context, error) { + return ctx, nil + }) + + return + } + + options.AfterManagerDo = wrapper(options.AfterManagerDo) + } +} + +// AfterManagerRollback the passed function gets after a flow rollback has been handled by the flow manager +func AfterManagerRollback(wrapper flow.AfterManagerHandler) constructor.Option { + return func(options *constructor.Options) { + if options.AfterManagerRollback == nil { + options.AfterManagerRollback = wrapper(func(ctx context.Context, manager *flow.Manager, store refs.Store) (context.Context, error) { + return ctx, nil + }) + + return + } + + options.AfterManagerRollback = wrapper(options.AfterManagerRollback) + } +} + +// BeforeNodeDo the passed function gets called before a node is executed +func BeforeNodeDo(wrapper flow.BeforeNodeHandler) constructor.Option { + return func(options *constructor.Options) { + if options.BeforeNodeDo == nil { + options.BeforeNodeDo = wrapper(func(ctx context.Context, node *flow.Node, tracker *flow.Tracker, processes *flow.Processes, store refs.Store) (context.Context, error) { + return ctx, nil + }) + + return + } + + options.BeforeNodeDo = wrapper(options.BeforeNodeDo) + } +} + +// BeforeNodeRollback the passed function gets called before a node rollback is executed +func BeforeNodeRollback(wrapper flow.BeforeNodeHandler) constructor.Option { + return func(options *constructor.Options) { + if options.BeforeNodeRollback == nil { + options.BeforeNodeRollback = wrapper(func(ctx context.Context, node *flow.Node, tracker *flow.Tracker, processes *flow.Processes, store refs.Store) (context.Context, error) { + return ctx, nil + }) + + return + } + + options.BeforeNodeRollback = wrapper(options.BeforeNodeRollback) + } +} + +// AfterNodeDo the passed function gets called after a node is executed +func AfterNodeDo(wrapper flow.AfterNodeHandler) constructor.Option { + return func(options *constructor.Options) { + if options.AfterNodeDo == nil { + options.AfterNodeDo = wrapper(func(ctx context.Context, node *flow.Node, tracker *flow.Tracker, processes *flow.Processes, store refs.Store) (context.Context, error) { + return ctx, nil + }) + return + } + + options.AfterNodeDo = wrapper(options.AfterNodeDo) + } +} + +// AfterNodeRollback the passed function gets called after a node rollback is executed +func AfterNodeRollback(wrapper flow.AfterNodeHandler) constructor.Option { + return func(options *constructor.Options) { + if options.AfterNodeRollback == nil { + options.AfterNodeRollback = wrapper(func(ctx context.Context, node *flow.Node, tracker *flow.Tracker, processes *flow.Processes, store refs.Store) (context.Context, error) { + return ctx, nil + }) + return + } + + options.AfterNodeRollback = wrapper(options.AfterNodeRollback) + } +} diff --git a/options.go b/options.go index f2c61909..e851a6b6 100644 --- a/options.go +++ b/options.go @@ -10,33 +10,31 @@ import ( "github.com/jexia/maestro/pkg/transport" ) -// BeforeOptionsFn gets called before initialising the options -type BeforeOptionsFn func(instance.Context, *constructor.Options) error - -// AfterOptionsFn gets called after all options have been initialised -type AfterOptionsFn func(instance.Context, *constructor.Options) error - // NewOptions constructs a constructor.Options object from the given constructor.Option constructors -func NewOptions(ctx instance.Context, options ...constructor.Option) constructor.Options { +func NewOptions(ctx instance.Context, options ...constructor.Option) (constructor.Options, error) { result := constructor.NewOptions(ctx) for _, option := range options { option(&result) } - return result -} - -// AfterConstructor the passed function gets called once all options have been applied -func AfterConstructor(wrapper constructor.AfterConstructorHandler) constructor.Option { - return func(options *constructor.Options) { - if options.AfterConstructor == nil { - options.AfterConstructor = wrapper(func(instance.Context, *constructor.Collection) error { return nil }) - return + for _, middleware := range result.Middleware { + options, err := middleware(ctx) + if err != nil { + return result, err } - options.AfterConstructor = wrapper(options.AfterConstructor) + for _, option := range options { + option(&result) + } } + + return result, nil +} + +// NewCollection constructs a new options collection +func NewCollection(options ...constructor.Option) []constructor.Option { + return options } // WithFlows appends the given flows resolver to the available flow resolvers diff --git a/pkg/constructor/constructor.go b/pkg/constructor/constructor.go index f0ecc97e..78ba9f8b 100644 --- a/pkg/constructor/constructor.go +++ b/pkg/constructor/constructor.go @@ -90,7 +90,12 @@ func FlowManager(ctx instance.Context, mem functions.Collection, services *specs return nil, err } - nodes[index] = flow.NewNode(ctx, node, caller, rollback) + nodes[index] = flow.NewNode(ctx, node, caller, rollback, &flow.NodeMiddleware{ + BeforeDo: options.BeforeNodeDo, + AfterDo: options.AfterNodeDo, + BeforeRollback: options.BeforeNodeRollback, + AfterRollback: options.AfterNodeRollback, + }) } forward, err := Forward(services, flows, manager.GetForward(), options) @@ -98,7 +103,13 @@ func FlowManager(ctx instance.Context, mem functions.Collection, services *specs return nil, err } - result.Flow = flow.NewManager(ctx, manager.GetName(), nodes) + result.Flow = flow.NewManager(ctx, manager.GetName(), nodes, &flow.ManagerMiddleware{ + BeforeDo: options.BeforeManagerDo, + AfterDo: options.AfterManagerDo, + BeforeRollback: options.BeforeManagerRollback, + AfterRollback: options.AfterManagerRollback, + }) + result.Forward = forward results[index] = result diff --git a/pkg/constructor/options.go b/pkg/constructor/options.go index d0b7985c..b17f3c71 100644 --- a/pkg/constructor/options.go +++ b/pkg/constructor/options.go @@ -3,6 +3,7 @@ package constructor import ( "github.com/jexia/maestro/pkg/codec" "github.com/jexia/maestro/pkg/definitions" + "github.com/jexia/maestro/pkg/flow" "github.com/jexia/maestro/pkg/functions" "github.com/jexia/maestro/pkg/instance" "github.com/jexia/maestro/pkg/transport" @@ -24,18 +25,30 @@ func NewOptions(ctx instance.Context) Options { // Options represents all the available options type Options struct { - Ctx instance.Context - Codec codec.Constructors - Callers transport.Callers - Listeners transport.Listeners - Flows []definitions.FlowsResolver - Endpoints []definitions.EndpointsResolver - Services []definitions.ServicesResolver - Schemas []definitions.SchemaResolver - AfterConstructor AfterConstructor - Functions functions.Custom + Ctx instance.Context + Codec codec.Constructors + Callers transport.Callers + Listeners transport.Listeners + Flows []definitions.FlowsResolver + Endpoints []definitions.EndpointsResolver + Services []definitions.ServicesResolver + Schemas []definitions.SchemaResolver + Middleware []Middleware + AfterConstructor AfterConstructor + BeforeManagerDo flow.BeforeManager + BeforeManagerRollback flow.BeforeManager + AfterManagerDo flow.AfterManager + AfterManagerRollback flow.AfterManager + BeforeNodeDo flow.BeforeNode + BeforeNodeRollback flow.BeforeNode + AfterNodeDo flow.AfterNode + AfterNodeRollback flow.AfterNode + Functions functions.Custom } +// Middleware is called once the options have been initialised +type Middleware func(instance.Context) ([]Option, error) + // AfterConstructor is called after the specifications is constructored type AfterConstructor func(instance.Context, *Collection) error diff --git a/pkg/definitions/hcl/hcl.go b/pkg/definitions/hcl/hcl.go index c5bbdb5e..dc0554d7 100644 --- a/pkg/definitions/hcl/hcl.go +++ b/pkg/definitions/hcl/hcl.go @@ -118,6 +118,10 @@ func GetOptions(ctx instance.Context, path string) (*Options, error) { if definition.GraphQL != nil { options.GraphQL = definition.GraphQL } + + if definition.Prometheus != nil { + options.Prometheus = definition.Prometheus + } } return options, nil diff --git a/pkg/definitions/hcl/intermediate.go b/pkg/definitions/hcl/intermediate.go index 6f5fee77..d398515f 100644 --- a/pkg/definitions/hcl/intermediate.go +++ b/pkg/definitions/hcl/intermediate.go @@ -6,17 +6,18 @@ import ( // Manifest intermediate specs type Manifest struct { - LogLevel string `hcl:"log_level,optional"` - GraphQL *GraphQL `hcl:"graphql,block"` - HTTP *HTTP `hcl:"http,block"` - GRPC *GRPC `hcl:"grpc,block"` - Protobuffers []string `hcl:"protobuffers,optional"` - Include []string `hcl:"include,optional"` - Flows []Flow `hcl:"flow,block"` - Proxy []Proxy `hcl:"proxy,block"` - Endpoints []Endpoint `hcl:"endpoint,block"` - Services []Service `hcl:"service,block"` - ServiceSelector []Services `hcl:"services,block"` + LogLevel string `hcl:"log_level,optional"` + GraphQL *GraphQL `hcl:"graphql,block"` + HTTP *HTTP `hcl:"http,block"` + GRPC *GRPC `hcl:"grpc,block"` + Prometheus *Prometheus `hcl:"prometheus,block"` + Protobuffers []string `hcl:"protobuffers,optional"` + Include []string `hcl:"include,optional"` + Flows []Flow `hcl:"flow,block"` + Proxy []Proxy `hcl:"proxy,block"` + Endpoints []Endpoint `hcl:"endpoint,block"` + Services []Service `hcl:"service,block"` + ServiceSelector []Services `hcl:"services,block"` } // GraphQL represents the GraphQL option definitions @@ -34,6 +35,11 @@ type GRPC struct { Address string `hcl:"address"` } +// Prometheus represent the prometheus option definitions +type Prometheus struct { + Address string `hcl:"address"` +} + // Before intermediate specification type Before struct { Resources []Resources `hcl:"resources,block"` diff --git a/pkg/definitions/hcl/options.go b/pkg/definitions/hcl/options.go index a6c7ecff..8991d9b7 100644 --- a/pkg/definitions/hcl/options.go +++ b/pkg/definitions/hcl/options.go @@ -7,4 +7,5 @@ type Options struct { GraphQL *GraphQL HTTP *HTTP GRPC *GRPC + Prometheus *Prometheus } diff --git a/pkg/flow/flow.go b/pkg/flow/flow.go index 5c8f6776..e4f2621d 100644 --- a/pkg/flow/flow.go +++ b/pkg/flow/flow.go @@ -20,14 +20,22 @@ type Call interface { // NewManager constructs a new manager for the given flow. // Branches are constructed for the constructed nodes to optimalise performance. // Various variables such as the amount of nodes, references and loose ends are collected to optimalise allocations during runtime. -func NewManager(ctx instance.Context, name string, nodes []*Node) *Manager { +func NewManager(ctx instance.Context, name string, nodes []*Node, middleware *ManagerMiddleware) *Manager { ConstructBranches(nodes) + if middleware == nil { + middleware = &ManagerMiddleware{} + } + manager := &Manager{ - ctx: ctx, - Name: name, - Starting: FetchStarting(nodes), - Nodes: len(nodes), + BeforeDo: middleware.BeforeDo, + BeforeRollback: middleware.BeforeRollback, + ctx: ctx, + Name: name, + Starting: FetchStarting(nodes), + Nodes: len(nodes), + AfterDo: middleware.AfterDo, + AfterRollback: middleware.AfterRollback, } ends := make(map[string]*Node, len(nodes)) @@ -42,14 +50,22 @@ func NewManager(ctx instance.Context, name string, nodes []*Node) *Manager { return manager } +// ManagerMiddleware holds the available middleware options for a flow manager +type ManagerMiddleware struct { + BeforeDo BeforeManager + AfterDo AfterManager + BeforeRollback BeforeManager + AfterRollback AfterManager +} + // BeforeManager is called before a manager get's calles -type BeforeManager func(ctx context.Context, manager *Manager, store refs.Store) error +type BeforeManager func(ctx context.Context, manager *Manager, store refs.Store) (context.Context, error) // BeforeManagerHandler wraps the before call function to allow middleware to be chained type BeforeManagerHandler func(BeforeManager) BeforeManager // AfterManager is called after a manager is called -type AfterManager func(ctx context.Context, manager *Manager, store refs.Store) error +type AfterManager func(ctx context.Context, manager *Manager, store refs.Store) (context.Context, error) // AfterManagerHandler wraps the after call function to allow middleware to be chained type AfterManagerHandler func(AfterManager) AfterManager @@ -76,9 +92,9 @@ func (manager *Manager) GetName() string { // Do calls all the nodes inside the manager if a error is returned is a rollback of all the already executed steps triggered. // Nodes are executed concurrently to one another. -func (manager *Manager) Do(ctx context.Context, refs refs.Store) error { +func (manager *Manager) Do(ctx context.Context, refs refs.Store) (err error) { if manager.BeforeDo != nil { - err := manager.BeforeDo(ctx, manager, refs) + ctx, err = manager.BeforeDo(ctx, manager, refs) if err != nil { return err } @@ -90,7 +106,7 @@ func (manager *Manager) Do(ctx context.Context, refs refs.Store) error { manager.ctx.Logger(logger.Flow).WithField("flow", manager.Name).Debug("Executing flow") processes := NewProcesses(len(manager.Starting)) - tracker := NewTracker(manager.Nodes) + tracker := NewTracker(manager.Name, manager.Nodes) for _, node := range manager.Starting { go node.Do(ctx, tracker, processes, refs) @@ -114,7 +130,7 @@ func (manager *Manager) Do(ctx context.Context, refs refs.Store) error { manager.ctx.Logger(logger.Flow).WithField("flow", manager.Name).Debug("Flow completed") if manager.AfterDo != nil { - err := manager.AfterDo(ctx, manager, refs) + ctx, err = manager.AfterDo(ctx, manager, refs) if err != nil { return err } @@ -133,17 +149,18 @@ func (manager *Manager) NewStore() refs.Store { func (manager *Manager) Revert(executed *Tracker, refs refs.Store) { defer manager.wg.Done() + var err error ctx := context.Background() if manager.BeforeRollback != nil { - err := manager.BeforeRollback(ctx, manager, refs) + ctx, err = manager.BeforeRollback(ctx, manager, refs) if err != nil { manager.ctx.Logger(logger.Flow).Error("Revert failed before rollback returned a error: ", err) return } } - tracker := NewTracker(manager.Nodes) + tracker := NewTracker(manager.Name, manager.Nodes) ends := make(map[string]*Node, manager.Ends) // Include all nodes to the revert tracker that have not been called @@ -158,13 +175,13 @@ func (manager *Manager) Revert(executed *Tracker, refs refs.Store) { processes := NewProcesses(len(ends)) for _, end := range ends { - go end.Revert(ctx, tracker, processes, refs) + go end.Rollback(ctx, tracker, processes, refs) } processes.Wait() if manager.AfterRollback != nil { - err := manager.AfterRollback(ctx, manager, refs) + ctx, err = manager.AfterRollback(ctx, manager, refs) if err != nil { manager.ctx.Logger(logger.Flow).Error("Revert failed after rollback returned a error: ", err) return diff --git a/pkg/flow/flow_test.go b/pkg/flow/flow_test.go index c0e56fb7..2c88b28e 100644 --- a/pkg/flow/flow_test.go +++ b/pkg/flow/flow_test.go @@ -84,7 +84,7 @@ func TestNewManager(t *testing.T) { for name, nodes := range tests { t.Run(name, func(t *testing.T) { ctx := instance.NewContext() - manager := NewManager(ctx, name, nodes) + manager := NewManager(ctx, name, nodes, nil) if manager == nil { t.Fatal("unexpected result, expected a manager to be returned") } @@ -138,9 +138,9 @@ func TestBeforeDoFlow(t *testing.T) { call := &caller{} _, manager := NewMockFlowManager(call, nil) - manager.BeforeDo = func(ctx context.Context, manager *Manager, store refs.Store) error { + manager.BeforeDo = func(ctx context.Context, manager *Manager, store refs.Store) (context.Context, error) { counter++ - return nil + return ctx, nil } err := manager.Do(context.Background(), nil) @@ -159,9 +159,9 @@ func TestBeforeDoFlowErr(t *testing.T) { call := &caller{} _, manager := NewMockFlowManager(call, nil) - manager.BeforeDo = func(ctx context.Context, manager *Manager, store refs.Store) error { + manager.BeforeDo = func(ctx context.Context, manager *Manager, store refs.Store) (context.Context, error) { counter++ - return expected + return ctx, expected } err := manager.Do(context.Background(), nil) @@ -180,9 +180,9 @@ func TestAfterDoFlowErr(t *testing.T) { call := &caller{} _, manager := NewMockFlowManager(call, nil) - manager.AfterDo = func(ctx context.Context, manager *Manager, store refs.Store) error { + manager.AfterDo = func(ctx context.Context, manager *Manager, store refs.Store) (context.Context, error) { counter++ - return expected + return ctx, expected } err := manager.Do(context.Background(), nil) @@ -200,9 +200,9 @@ func TestAfterDoFlow(t *testing.T) { call := &caller{} _, manager := NewMockFlowManager(call, nil) - manager.AfterDo = func(ctx context.Context, manager *Manager, store refs.Store) error { + manager.AfterDo = func(ctx context.Context, manager *Manager, store refs.Store) (context.Context, error) { counter++ - return nil + return ctx, nil } err := manager.Do(context.Background(), nil) @@ -220,13 +220,13 @@ func TestBeforeRollbackFlow(t *testing.T) { call := &caller{} nodes, manager := NewMockFlowManager(call, nil) - manager.BeforeRollback = func(ctx context.Context, manager *Manager, store refs.Store) error { + manager.BeforeRollback = func(ctx context.Context, manager *Manager, store refs.Store) (context.Context, error) { counter++ - return nil + return ctx, nil } manager.wg.Add(1) - manager.Revert(NewTracker(len(nodes)), nil) + manager.Revert(NewTracker("", len(nodes)), nil) if counter != 1 { t.Fatalf("unexpected counter %d, expected before rollback function to be called", counter) @@ -239,13 +239,13 @@ func TestBeforeRollbackFlowErr(t *testing.T) { call := &caller{} nodes, manager := NewMockFlowManager(call, nil) - manager.BeforeRollback = func(ctx context.Context, manager *Manager, store refs.Store) error { + manager.BeforeRollback = func(ctx context.Context, manager *Manager, store refs.Store) (context.Context, error) { counter++ - return expected + return ctx, expected } manager.wg.Add(1) - manager.Revert(NewTracker(len(nodes)), nil) + manager.Revert(NewTracker("", len(nodes)), nil) if counter != 1 { t.Fatalf("unexpected counter %d, expected before rollback function to be called", counter) @@ -257,13 +257,13 @@ func TestAfterRollbackFlow(t *testing.T) { call := &caller{} nodes, manager := NewMockFlowManager(call, nil) - manager.AfterRollback = func(ctx context.Context, manager *Manager, store refs.Store) error { + manager.AfterRollback = func(ctx context.Context, manager *Manager, store refs.Store) (context.Context, error) { counter++ - return nil + return ctx, nil } manager.wg.Add(1) - manager.Revert(NewTracker(len(nodes)), nil) + manager.Revert(NewTracker("", len(nodes)), nil) if counter != 1 { t.Fatalf("unexpected counter %d, expected after rollback function to be called", counter) @@ -276,13 +276,13 @@ func TestAfterRollbackFlowErr(t *testing.T) { call := &caller{} nodes, manager := NewMockFlowManager(call, nil) - manager.AfterRollback = func(ctx context.Context, manager *Manager, store refs.Store) error { + manager.AfterRollback = func(ctx context.Context, manager *Manager, store refs.Store) (context.Context, error) { counter++ - return expected + return ctx, expected } manager.wg.Add(1) - manager.Revert(NewTracker(len(nodes)), nil) + manager.Revert(NewTracker("", len(nodes)), nil) if counter != 1 { t.Fatalf("unexpected counter %d, expected before rollback function to be called", counter) diff --git a/pkg/flow/node.go b/pkg/flow/node.go index d6891f84..aeafb92a 100644 --- a/pkg/flow/node.go +++ b/pkg/flow/node.go @@ -13,9 +13,13 @@ import ( // NewNode constructs a new node for the given call. // The service called inside the call endpoint is retrieved from the services collection. // The call, codec and rollback are defined inside the node and used while processing requests. -func NewNode(ctx instance.Context, node *specs.Node, call, rollback Call) *Node { +func NewNode(ctx instance.Context, node *specs.Node, call, rollback Call, middleware *NodeMiddleware) *Node { references := refs.References{} + if middleware == nil { + middleware = &NodeMiddleware{} + } + if node.Call != nil { references.MergeLeft(refs.ParameterReferences(node.Call.Request)) } @@ -35,15 +39,19 @@ func NewNode(ctx instance.Context, node *specs.Node, call, rollback Call) *Node logger := ctx.Logger(logger.Flow) return &Node{ - ctx: ctx, - logger: logger, - Name: node.Name, - Previous: []*Node{}, - Call: call, - Rollback: rollback, - DependsOn: node.DependsOn, - References: references, - Next: []*Node{}, + BeforeDo: middleware.BeforeDo, + BeforeRevert: middleware.BeforeRollback, + ctx: ctx, + logger: logger, + Name: node.Name, + Previous: []*Node{}, + Call: call, + Revert: rollback, + DependsOn: node.DependsOn, + References: references, + Next: []*Node{}, + AfterDo: middleware.AfterDo, + AfterRevert: middleware.AfterRollback, } } @@ -61,14 +69,22 @@ func (nodes Nodes) Has(name string) bool { return false } +// NodeMiddleware holds all the available +type NodeMiddleware struct { + BeforeDo BeforeNode + AfterDo AfterNode + BeforeRollback BeforeNode + AfterRollback AfterNode +} + // BeforeNode is called before a node is executed -type BeforeNode func(ctx context.Context, node *Node, tracker *Tracker, processes *Processes, store refs.Store) error +type BeforeNode func(ctx context.Context, node *Node, tracker *Tracker, processes *Processes, store refs.Store) (context.Context, error) // BeforeNodeHandler wraps the before node function to allow middleware to be chained type BeforeNodeHandler func(BeforeNode) BeforeNode // AfterNode is called after a node is executed -type AfterNode func(ctx context.Context, node *Node, tracker *Tracker, processes *Processes, store refs.Store) error +type AfterNode func(ctx context.Context, node *Node, tracker *Tracker, processes *Processes, store refs.Store) (context.Context, error) // AfterNodeHandler wraps the after node function to allow middleware to be chained type AfterNodeHandler func(AfterNode) AfterNode @@ -82,7 +98,7 @@ type Node struct { Name string Previous Nodes Call Call - Rollback Call + Revert Call DependsOn map[string]*specs.Node References map[string]*specs.PropertyReference Next Nodes @@ -104,8 +120,10 @@ func (node *Node) Do(ctx context.Context, tracker *Tracker, processes *Processes return } + var err error + if node.BeforeDo != nil { - err := node.BeforeDo(ctx, node, tracker, processes, refs) + ctx, err = node.BeforeDo(ctx, node, tracker, processes, refs) if err != nil { node.logger.Error("Node before middleware failed: ", err) processes.Fatal(err) @@ -114,7 +132,7 @@ func (node *Node) Do(ctx context.Context, tracker *Tracker, processes *Processes } if node.Call != nil { - err := node.Call.Do(ctx, refs) + err = node.Call.Do(ctx, refs) if err != nil { node.logger.Error("Call failed: ", node.Name) processes.Fatal(err) @@ -137,7 +155,7 @@ func (node *Node) Do(ctx context.Context, tracker *Tracker, processes *Processes } if node.AfterDo != nil { - err := node.AfterDo(ctx, node, tracker, processes, refs) + ctx, err = node.AfterDo(ctx, node, tracker, processes, refs) if err != nil { node.logger.Error("Node after middleware failed: ", err) processes.Fatal(err) @@ -146,9 +164,9 @@ func (node *Node) Do(ctx context.Context, tracker *Tracker, processes *Processes } } -// Revert executes the given node rollback an calls the previous nodes. +// Rollback executes the given node rollback an calls the previous nodes. // If one of the nodes fails is the error marked but execution is not aborted. -func (node *Node) Revert(ctx context.Context, tracker *Tracker, processes *Processes, refs refs.Store) { +func (node *Node) Rollback(ctx context.Context, tracker *Tracker, processes *Processes, refs refs.Store) { defer processes.Done() node.logger.Debug("Executing node revert ", node.Name) @@ -160,8 +178,10 @@ func (node *Node) Revert(ctx context.Context, tracker *Tracker, processes *Proce return } + var err error + if node.BeforeRevert != nil { - err := node.BeforeRevert(ctx, node, tracker, processes, refs) + ctx, err = node.BeforeRevert(ctx, node, tracker, processes, refs) if err != nil { node.logger.Error("Node before middleware failed: ", err) processes.Fatal(err) @@ -173,12 +193,12 @@ func (node *Node) Revert(ctx context.Context, tracker *Tracker, processes *Proce processes.Add(len(node.Previous)) for _, node := range node.Previous { tracker.Mark(node) - go node.Revert(ctx, tracker, processes, refs) + go node.Rollback(ctx, tracker, processes, refs) } }() - if node.Rollback != nil { - err := node.Rollback.Do(ctx, refs) + if node.Revert != nil { + err = node.Revert.Do(ctx, refs) if err != nil { processes.Fatal(err) return @@ -189,7 +209,7 @@ func (node *Node) Revert(ctx context.Context, tracker *Tracker, processes *Proce tracker.Mark(node) if node.AfterRevert != nil { - err := node.AfterRevert(ctx, node, tracker, processes, refs) + ctx, err = node.AfterRevert(ctx, node, tracker, processes, refs) if err != nil { node.logger.Error("Node after middleware failed: ", err) processes.Fatal(err) diff --git a/pkg/flow/node_test.go b/pkg/flow/node_test.go index 430ba29b..7e12d305 100644 --- a/pkg/flow/node_test.go +++ b/pkg/flow/node_test.go @@ -24,7 +24,7 @@ func NewMockNode(name string, caller Call, rollback Call) *Node { logger: logger, Name: name, Call: caller, - Rollback: rollback, + Revert: rollback, DependsOn: map[string]*specs.Node{}, References: map[string]*specs.PropertyReference{}, } @@ -88,7 +88,7 @@ func BenchmarkSingleNodeCallingJSONCodecParallel(b *testing.B) { b.RunParallel(func(pb *testing.PB) { for pb.Next() { ctx := context.Background() - tracker := NewTracker(1) + tracker := NewTracker("", 1) processes := NewProcesses(1) refs := refs.NewReferenceStore(0) @@ -154,7 +154,7 @@ func BenchmarkSingleNodeCallingJSONCodecSerial(b *testing.B) { for i := 0; i < b.N; i++ { ctx := context.Background() - tracker := NewTracker(1) + tracker := NewTracker("", 1) processes := NewProcesses(1) refs := refs.NewReferenceStore(0) @@ -172,7 +172,7 @@ func BenchmarkSingleNodeCallingParallel(b *testing.B) { b.RunParallel(func(pb *testing.PB) { for pb.Next() { ctx := context.Background() - tracker := NewTracker(1) + tracker := NewTracker("", 1) processes := NewProcesses(1) refs := refs.NewReferenceStore(0) @@ -190,7 +190,7 @@ func BenchmarkSingleNodeCallingSerial(b *testing.B) { for i := 0; i < b.N; i++ { ctx := context.Background() - tracker := NewTracker(1) + tracker := NewTracker("", 1) processes := NewProcesses(1) refs := refs.NewReferenceStore(0) @@ -217,7 +217,7 @@ func BenchmarkBranchedNodeCallingParallel(b *testing.B) { b.RunParallel(func(pb *testing.PB) { for pb.Next() { ctx := context.Background() - tracker := NewTracker(len(nodes)) + tracker := NewTracker("", len(nodes)) processes := NewProcesses(1) refs := refs.NewReferenceStore(0) @@ -244,7 +244,7 @@ func BenchmarkBranchedNodeCallingSerial(b *testing.B) { for i := 0; i < b.N; i++ { ctx := context.Background() - tracker := NewTracker(len(nodes)) + tracker := NewTracker("", len(nodes)) processes := NewProcesses(1) refs := refs.NewReferenceStore(0) @@ -370,7 +370,7 @@ func TestConstructingNode(t *testing.T) { for name, test := range tests { t.Run(name, func(t *testing.T) { ctx := instance.NewContext() - result := NewNode(ctx, test.Node, test.Call, test.Rollback) + result := NewNode(ctx, test.Node, test.Call, test.Rollback, nil) if len(result.References) != test.Expected { t.Fatalf("unexpected amount of references %d, expected %d", len(result.References), test.Expected) @@ -388,7 +388,7 @@ func TestConstructingNodeReferences(t *testing.T) { Name: "mock", } - result := NewNode(ctx, node, call, rollback) + result := NewNode(ctx, node, call, rollback, nil) if result == nil { t.Fatal("nil node returned") } @@ -423,7 +423,7 @@ func TestNodeCalling(t *testing.T) { nodes[1].Next = []*Node{nodes[2]} nodes[2].Previous = []*Node{nodes[1]} - tracker := NewTracker(len(nodes)) + tracker := NewTracker("", len(nodes)) processes := NewProcesses(1) refs := refs.NewReferenceStore(0) @@ -461,7 +461,7 @@ func TestSlowNodeAbortingOnErr(t *testing.T) { nodes[3].Previous = []*Node{nodes[1], nodes[2]} - tracker := NewTracker(len(nodes)) + tracker := NewTracker("", len(nodes)) processes := NewProcesses(1) refs := refs.NewReferenceStore(0) @@ -498,11 +498,11 @@ func TestNodeRevert(t *testing.T) { nodes[1].Next = []*Node{nodes[2]} nodes[2].Previous = []*Node{nodes[1]} - tracker := NewTracker(len(nodes)) + tracker := NewTracker("", len(nodes)) processes := NewProcesses(1) refs := refs.NewReferenceStore(0) - nodes[len(nodes)-1].Revert(context.Background(), tracker, processes, refs) + nodes[len(nodes)-1].Rollback(context.Background(), tracker, processes, refs) processes.Wait() if processes.Err() != nil { @@ -533,7 +533,7 @@ func TestNodeBranchesCalling(t *testing.T) { nodes[3].Previous = []*Node{nodes[1], nodes[2]} - tracker := NewTracker(len(nodes)) + tracker := NewTracker("", len(nodes)) processes := NewProcesses(1) refs := refs.NewReferenceStore(0) @@ -554,13 +554,13 @@ func TestBeforeDoNode(t *testing.T) { call := &caller{} node := NewMockNode("mock", call, nil) - node.BeforeDo = func(ctx context.Context, node *Node, tracker *Tracker, processes *Processes, store refs.Store) error { + node.BeforeDo = func(ctx context.Context, node *Node, tracker *Tracker, processes *Processes, store refs.Store) (context.Context, error) { counter++ - return nil + return ctx, nil } processes := NewProcesses(1) - node.Do(context.Background(), NewTracker(1), processes, nil) + node.Do(context.Background(), NewTracker("", 1), processes, nil) if processes.Err() != nil { t.Error(processes.Err()) } @@ -576,13 +576,13 @@ func TestBeforeDoNodeErr(t *testing.T) { call := &caller{} node := NewMockNode("mock", call, nil) - node.BeforeDo = func(ctx context.Context, node *Node, tracker *Tracker, processes *Processes, store refs.Store) error { + node.BeforeDo = func(ctx context.Context, node *Node, tracker *Tracker, processes *Processes, store refs.Store) (context.Context, error) { counter++ - return expected + return ctx, expected } processes := NewProcesses(1) - node.Do(context.Background(), NewTracker(1), processes, nil) + node.Do(context.Background(), NewTracker("", 1), processes, nil) if processes.Err() != expected { t.Errorf("unexpected err '%s', expected '%s' to be thrown", processes.Err(), expected) } @@ -597,13 +597,13 @@ func TestAfterDoNode(t *testing.T) { call := &caller{} node := NewMockNode("mock", call, nil) - node.AfterDo = func(ctx context.Context, node *Node, tracker *Tracker, processes *Processes, store refs.Store) error { + node.AfterDo = func(ctx context.Context, node *Node, tracker *Tracker, processes *Processes, store refs.Store) (context.Context, error) { counter++ - return nil + return ctx, nil } processes := NewProcesses(1) - node.Do(context.Background(), NewTracker(1), processes, nil) + node.Do(context.Background(), NewTracker("", 1), processes, nil) if processes.Err() != nil { t.Error(processes.Err()) } @@ -619,13 +619,13 @@ func TestAfterDoNodeErr(t *testing.T) { call := &caller{} node := NewMockNode("mock", call, nil) - node.AfterDo = func(ctx context.Context, node *Node, tracker *Tracker, processes *Processes, store refs.Store) error { + node.AfterDo = func(ctx context.Context, node *Node, tracker *Tracker, processes *Processes, store refs.Store) (context.Context, error) { counter++ - return expected + return ctx, expected } processes := NewProcesses(1) - node.Do(context.Background(), NewTracker(1), processes, nil) + node.Do(context.Background(), NewTracker("", 1), processes, nil) if processes.Err() != expected { t.Errorf("unexpected err '%s', expected '%s' to be thrown", processes.Err(), expected) } @@ -640,13 +640,13 @@ func TestBeforeRevertNode(t *testing.T) { call := &caller{} node := NewMockNode("mock", call, nil) - node.BeforeRevert = func(ctx context.Context, node *Node, tracker *Tracker, processes *Processes, store refs.Store) error { + node.BeforeRevert = func(ctx context.Context, node *Node, tracker *Tracker, processes *Processes, store refs.Store) (context.Context, error) { counter++ - return nil + return ctx, nil } processes := NewProcesses(1) - node.Revert(context.Background(), NewTracker(1), processes, nil) + node.Rollback(context.Background(), NewTracker("", 1), processes, nil) if processes.Err() != nil { t.Error(processes.Err()) } @@ -662,13 +662,13 @@ func TestBeforeRevertNodeErr(t *testing.T) { call := &caller{} node := NewMockNode("mock", call, nil) - node.BeforeRevert = func(ctx context.Context, node *Node, tracker *Tracker, processes *Processes, store refs.Store) error { + node.BeforeRevert = func(ctx context.Context, node *Node, tracker *Tracker, processes *Processes, store refs.Store) (context.Context, error) { counter++ - return expected + return ctx, expected } processes := NewProcesses(1) - node.Revert(context.Background(), NewTracker(1), processes, nil) + node.Rollback(context.Background(), NewTracker("", 1), processes, nil) if processes.Err() != expected { t.Errorf("unexpected err '%s', expected '%s' to be thrown", processes.Err(), expected) } @@ -683,13 +683,13 @@ func TestAfterRevertNode(t *testing.T) { call := &caller{} node := NewMockNode("mock", call, nil) - node.AfterRevert = func(ctx context.Context, node *Node, tracker *Tracker, processes *Processes, store refs.Store) error { + node.AfterRevert = func(ctx context.Context, node *Node, tracker *Tracker, processes *Processes, store refs.Store) (context.Context, error) { counter++ - return nil + return ctx, nil } processes := NewProcesses(1) - node.Revert(context.Background(), NewTracker(1), processes, nil) + node.Rollback(context.Background(), NewTracker("", 1), processes, nil) if processes.Err() != nil { t.Error(processes.Err()) } @@ -705,13 +705,13 @@ func TestAfterRevertNodeErr(t *testing.T) { call := &caller{} node := NewMockNode("mock", call, nil) - node.AfterRevert = func(ctx context.Context, node *Node, tracker *Tracker, processes *Processes, store refs.Store) error { + node.AfterRevert = func(ctx context.Context, node *Node, tracker *Tracker, processes *Processes, store refs.Store) (context.Context, error) { counter++ - return expected + return ctx, expected } processes := NewProcesses(1) - node.Revert(context.Background(), NewTracker(1), processes, nil) + node.Rollback(context.Background(), NewTracker("", 1), processes, nil) if processes.Err() != expected { t.Errorf("unexpected err '%s', expected '%s' to be thrown", processes.Err(), expected) } diff --git a/pkg/flow/tracker.go b/pkg/flow/tracker.go index 6a4e1e31..252f696a 100644 --- a/pkg/flow/tracker.go +++ b/pkg/flow/tracker.go @@ -5,8 +5,9 @@ import ( ) // NewTracker constructs a new tracker -func NewTracker(nodes int) *Tracker { +func NewTracker(flow string, nodes int) *Tracker { return &Tracker{ + Flow: flow, Nodes: make(map[string]int, nodes), Locks: make(map[*Node]*sync.Mutex, nodes), } @@ -14,6 +15,7 @@ func NewTracker(nodes int) *Tracker { // Tracker represents a structure responsible of tracking nodes type Tracker struct { + Flow string mutex sync.Mutex Nodes map[string]int Locks map[*Node]*sync.Mutex diff --git a/pkg/flow/tracker_test.go b/pkg/flow/tracker_test.go index 9c688412..890a2a89 100644 --- a/pkg/flow/tracker_test.go +++ b/pkg/flow/tracker_test.go @@ -3,7 +3,7 @@ package flow import "testing" func TestTrackerMark(t *testing.T) { - tracker := NewTracker(1) + tracker := NewTracker("", 1) node := NewMockNode("first", nil, nil) if tracker.Met(node) { diff --git a/pkg/metrics/prometheus/prometheus.go b/pkg/metrics/prometheus/prometheus.go new file mode 100644 index 00000000..cb4a09b9 --- /dev/null +++ b/pkg/metrics/prometheus/prometheus.go @@ -0,0 +1,258 @@ +package prometheus + +import ( + "context" + "net/http" + "time" + + "github.com/jexia/maestro" + "github.com/jexia/maestro/pkg/constructor" + "github.com/jexia/maestro/pkg/flow" + "github.com/jexia/maestro/pkg/instance" + "github.com/jexia/maestro/pkg/logger" + "github.com/jexia/maestro/pkg/refs" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" +) + +// New constructs a new prometheus middleware instance +func New(addr string) constructor.Middleware { + return func(ctx instance.Context) ([]constructor.Option, error) { + ctx.Logger(logger.Core).WithField("addr", addr).Info("Setting up prometheus") + + collector, err := NewCollector() + if err != nil { + return nil, err + } + + server := http.Server{ + Addr: addr, + Handler: promhttp.HandlerFor(collector.Registry(), promhttp.HandlerOpts{}), + } + + go server.ListenAndServe() + + handles := maestro.NewCollection( + maestro.BeforeManagerDo(collector.BeforeDo), + maestro.AfterManagerDo(collector.AfterDo), + maestro.BeforeNodeDo(collector.BeforeNode), + maestro.BeforeNodeRollback(collector.BeforeNode), + maestro.AfterNodeDo(collector.AfterNode), + maestro.AfterNodeRollback(collector.AfterNode), + maestro.BeforeManagerRollback(collector.BeforeRollback), + maestro.AfterManagerRollback(collector.AfterRollback), + ) + + promhttp.HandlerFor( + prometheus.DefaultGatherer, + promhttp.HandlerOpts{}, + ) + + return handles, nil + } +} + +// CtxKey context key type +type CtxKey string + +var ( + // StartTimeCtx context time value + StartTimeCtx = CtxKey("start-time") +) + +// NewCollector constructs a new prometheus collector +func NewCollector() (Collector, error) { + registry := prometheus.NewRegistry() + collector := &collector{ + registry: registry, + flowsDo: prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "flow_total", + Help: "Total amount of times a flow has been called", + }, []string{"flow"}), + flowDo: prometheus.NewSummaryVec(prometheus.SummaryOpts{ + Name: "flow_duration_seconds", + Help: "Avarage flow execution duration in seconds", + }, []string{"flow"}), + flowsRollback: prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "flow_rollback_total", + Help: "Total amount of times a flow has been rolled back", + }, []string{"flow"}), + flowRollback: prometheus.NewSummaryVec(prometheus.SummaryOpts{ + Name: "flow_rollback_duration_seconds", + Help: "Avarage rollback execution duration in seconds", + }, []string{"flow"}), + nodes: prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "node_total", + Help: "Total amount of times a node has been called", + }, []string{"flow", "node"}), + node: prometheus.NewSummaryVec(prometheus.SummaryOpts{ + Name: "node_duration_seconds", + Help: "Avarage node execution duration in seconds", + }, []string{"flow", "node"}), + } + + err := registry.Register(collector.flowsDo) + if err != nil { + return nil, err + } + + err = registry.Register(collector.flowDo) + if err != nil { + return nil, err + } + + err = registry.Register(collector.flowsRollback) + if err != nil { + return nil, err + } + + err = registry.Register(collector.flowRollback) + if err != nil { + return nil, err + } + + err = registry.Register(collector.nodes) + if err != nil { + return nil, err + } + + err = registry.Register(collector.node) + if err != nil { + return nil, err + } + + return collector, nil +} + +// Collector represents a middleware collector +type Collector interface { + Registry() *prometheus.Registry + BeforeDo(next flow.BeforeManager) flow.BeforeManager + AfterDo(next flow.AfterManager) flow.AfterManager + BeforeNode(next flow.BeforeNode) flow.BeforeNode + AfterNode(next flow.AfterNode) flow.AfterNode + BeforeRollback(next flow.BeforeManager) flow.BeforeManager + AfterRollback(next flow.AfterManager) flow.AfterManager +} + +// Collector collects data from middleware calls and exposes them for prometheus to consume +type collector struct { + registry *prometheus.Registry + flowsDo *prometheus.CounterVec + flowDo *prometheus.SummaryVec + flowsRollback *prometheus.CounterVec + flowRollback *prometheus.SummaryVec + nodes *prometheus.CounterVec + node *prometheus.SummaryVec +} + +func (collector *collector) Registry() *prometheus.Registry { + return collector.registry +} + +// BeforeDo gets called before a flow gets executed +func (collector *collector) BeforeDo(next flow.BeforeManager) flow.BeforeManager { + return func(ctx context.Context, manager *flow.Manager, store refs.Store) (context.Context, error) { + req := collector.flowsDo.With(prometheus.Labels{ + "flow": manager.Name, + }) + + req.Inc() + + now := time.Now() + ctx = context.WithValue(ctx, StartTimeCtx, now) + + return next(ctx, manager, store) + } +} + +// AfterDo gets called after a flow is executed +func (collector *collector) AfterDo(next flow.AfterManager) flow.AfterManager { + return func(ctx context.Context, manager *flow.Manager, store refs.Store) (context.Context, error) { + value := ctx.Value(StartTimeCtx) + if value != nil { + duration := collector.flowDo.With(prometheus.Labels{ + "flow": manager.Name, + }) + + start := value.(time.Time) + diff := time.Now().Sub(start) + + duration.Observe(diff.Seconds()) + } + + return next(ctx, manager, store) + } +} + +// BeforeRollback gets called before a flow rollback gets executed +func (collector *collector) BeforeRollback(next flow.BeforeManager) flow.BeforeManager { + return func(ctx context.Context, manager *flow.Manager, store refs.Store) (context.Context, error) { + req := collector.flowsRollback.With(prometheus.Labels{ + "flow": manager.Name, + }) + + req.Inc() + + now := time.Now() + ctx = context.WithValue(ctx, StartTimeCtx, now) + + return next(ctx, manager, store) + } +} + +// AfterRollback gets called before a flow rollback is executed +func (collector *collector) AfterRollback(next flow.AfterManager) flow.AfterManager { + return func(ctx context.Context, manager *flow.Manager, store refs.Store) (context.Context, error) { + value := ctx.Value(StartTimeCtx) + if value != nil { + duration := collector.flowRollback.With(prometheus.Labels{ + "flow": manager.Name, + }) + + start := value.(time.Time) + diff := time.Now().Sub(start) + + duration.Observe(diff.Seconds()) + } + + return next(ctx, manager, store) + } +} + +// BeforeDo gets called before a node gets executed +func (collector *collector) BeforeNode(next flow.BeforeNode) flow.BeforeNode { + return func(ctx context.Context, node *flow.Node, tracker *flow.Tracker, processes *flow.Processes, store refs.Store) (context.Context, error) { + req := collector.nodes.With(prometheus.Labels{ + "flow": tracker.Flow, + "node": node.Name, + }) + + req.Inc() + + now := time.Now() + ctx = context.WithValue(ctx, StartTimeCtx, now) + + return next(ctx, node, tracker, processes, store) + } +} + +// AfterDo gets called after a node is executed +func (collector *collector) AfterNode(next flow.AfterNode) flow.AfterNode { + return func(ctx context.Context, node *flow.Node, tracker *flow.Tracker, processes *flow.Processes, store refs.Store) (context.Context, error) { + value := ctx.Value(StartTimeCtx) + if value != nil { + duration := collector.node.With(prometheus.Labels{ + "flow": tracker.Flow, + "node": node.Name, + }) + + start := value.(time.Time) + diff := time.Now().Sub(start) + + duration.Observe(diff.Seconds()) + } + + return next(ctx, node, tracker, processes, store) + } +} diff --git a/pkg/transport/grpc/caller_test.go b/pkg/transport/grpc/caller_test.go index 977dd18a..56f36cf9 100644 --- a/pkg/transport/grpc/caller_test.go +++ b/pkg/transport/grpc/caller_test.go @@ -28,7 +28,7 @@ func TestCaller(t *testing.T) { }) nodes := flow.Nodes{ - flow.NewNode(ctx, node, call, nil), + flow.NewNode(ctx, node, call, nil, nil), } listener, port := NewMockListener(t, nodes) diff --git a/pkg/transport/grpc/grpc_test.go b/pkg/transport/grpc/grpc_test.go index e13b378a..7fcbc404 100644 --- a/pkg/transport/grpc/grpc_test.go +++ b/pkg/transport/grpc/grpc_test.go @@ -43,7 +43,7 @@ func NewMockListener(t *testing.T, nodes flow.Nodes) (transport.Listener, int) { endpoints := []*transport.Endpoint{ { Request: NewSimpleMockSpecs(), - Flow: flow.NewManager(ctx, "test", nodes), + Flow: flow.NewManager(ctx, "test", nodes, nil), Options: specs.Options{ ServiceOption: "mock", MethodOption: "simple", diff --git a/pkg/transport/grpc/listener_test.go b/pkg/transport/grpc/listener_test.go index 8bf908bd..ce73c226 100644 --- a/pkg/transport/grpc/listener_test.go +++ b/pkg/transport/grpc/listener_test.go @@ -55,7 +55,7 @@ func TestListener(t *testing.T) { }) nodes := flow.Nodes{ - flow.NewNode(ctx, node, call, nil), + flow.NewNode(ctx, node, call, nil, nil), } listener, port := NewMockListener(t, nodes) diff --git a/pkg/transport/grpc/reflection_test.go b/pkg/transport/grpc/reflection_test.go index f9b23f88..a3fe45cc 100644 --- a/pkg/transport/grpc/reflection_test.go +++ b/pkg/transport/grpc/reflection_test.go @@ -66,14 +66,14 @@ func TestListServices(t *testing.T) { PackageOption: "com.mock", ServiceOption: "first", }, - Flow: flow.NewManager(ctx, "Get", []*flow.Node{}), + Flow: flow.NewManager(ctx, "Get", []*flow.Node{}, nil), }, { Options: specs.Options{ PackageOption: "com.mock", ServiceOption: "second", }, - Flow: flow.NewManager(ctx, "Get", []*flow.Node{}), + Flow: flow.NewManager(ctx, "Get", []*flow.Node{}, nil), }, } @@ -133,14 +133,14 @@ func TestFileContainingSymbol(t *testing.T) { PackageOption: "com.mock", ServiceOption: "first", }, - Flow: flow.NewManager(ctx, "Get", []*flow.Node{}), + Flow: flow.NewManager(ctx, "Get", []*flow.Node{}, nil), }, { Options: specs.Options{ PackageOption: "com.mock", ServiceOption: "second", }, - Flow: flow.NewManager(ctx, "Get", []*flow.Node{}), + Flow: flow.NewManager(ctx, "Get", []*flow.Node{}, nil), }, } diff --git a/pkg/transport/http/listener_test.go b/pkg/transport/http/listener_test.go index 4ed9f879..b44736b3 100644 --- a/pkg/transport/http/listener_test.go +++ b/pkg/transport/http/listener_test.go @@ -32,7 +32,7 @@ func NewMockListener(t *testing.T, nodes flow.Nodes) (transport.Listener, int) { endpoints := []*transport.Endpoint{ { Request: NewSimpleMockSpecs(), - Flow: flow.NewManager(ctx, "test", nodes), + Flow: flow.NewManager(ctx, "test", nodes, nil), Options: specs.Options{ EndpointOption: "/", MethodOption: http.MethodPost, @@ -59,7 +59,7 @@ func TestListener(t *testing.T) { }) nodes := flow.Nodes{ - flow.NewNode(ctx, node, call, nil), + flow.NewNode(ctx, node, call, nil, nil), } listener, port := NewMockListener(t, nodes) @@ -148,7 +148,7 @@ func TestPathReferences(t *testing.T) { ctx := instance.NewContext() endpoints := []*transport.Endpoint{ { - Flow: flow.NewManager(ctx, "test", nodes), + Flow: flow.NewManager(ctx, "test", nodes, nil), Options: specs.Options{ "endpoint": "/:message", "method": "GET",