Skip to content

Commit

Permalink
prometheus metrics consumption
Browse files Browse the repository at this point in the history
  • Loading branch information
jeroenrinzema committed May 14, 2020
1 parent d256f46 commit 2f96a41
Show file tree
Hide file tree
Showing 27 changed files with 677 additions and 152 deletions.
4 changes: 4 additions & 0 deletions cmd/maestro/config/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ http {
address = "$HTTP_ADDRESS"
}
prometheus {
address = ":5050"
}
services {
select "proto.users.*" {
host = "api.jexia.com"
Expand Down
5 changes: 5 additions & 0 deletions cmd/maestro/config/arguments.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/jexia/maestro/pkg/definitions/protoc"
"github.com/jexia/maestro/pkg/instance"
"github.com/jexia/maestro/pkg/logger"
"github.com/jexia/maestro/pkg/metrics/prometheus"
"github.com/jexia/maestro/pkg/specs"
"github.com/jexia/maestro/pkg/transport/graphql"
"github.com/jexia/maestro/pkg/transport/grpc"
Expand Down Expand Up @@ -62,6 +63,10 @@ func ConstructArguments(params *Maestro) ([]constructor.Option, error) {
arguments = append(arguments, maestro.WithListener(grpc.NewListener(params.GRPC.Address, specs.Options{})))
}

if params.Prometheus.Address != "" {
arguments = append(arguments, maestro.WithMiddleware(prometheus.New(params.Prometheus.Address)))
}

arguments = append([]constructor.Option{maestro.WithLogLevel(logger.Global, params.LogLevel)}, arguments...)

return arguments, nil
Expand Down
12 changes: 12 additions & 0 deletions cmd/maestro/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@ func Parse(options *hcl.Options, target *Maestro) {
Address: options.GRPC.Address,
}
}

if options.Prometheus != nil && target.Prometheus.Address == "" {
target.Prometheus = Prometheus{
Address: options.Prometheus.Address,
}
}
}

// Maestro configurations
Expand All @@ -50,10 +56,16 @@ type Maestro struct {
HTTP HTTP
GraphQL GraphQL
GRPC GRPC
Prometheus Prometheus
Protobuffers []string
Files []string
}

// Prometheus configurations
type Prometheus struct {
Address string
}

// HTTP configurations
type HTTP struct {
Address string
Expand Down
7 changes: 6 additions & 1 deletion cmd/maestro/validate/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,12 @@ func run(cmd *cobra.Command, args []string) error {
}

ctx := instance.NewContext()
_, err = constructor.Specs(ctx, functions.Collection{}, maestro.NewOptions(ctx, arguments...))
options, err := maestro.NewOptions(ctx, arguments...)
if err != nil {
return err
}

_, err = constructor.Specs(ctx, functions.Collection{}, options)
if err != nil {
return err
}
Expand Down
4 changes: 4 additions & 0 deletions examples/graphql/config.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ graphql {
address = ":8080"
}

prometheus {
address = ":5050"
}

services {
select "proto.*" {
host = "https://jsonplaceholder.typicode.com/"
Expand Down
15 changes: 15 additions & 0 deletions examples/graphql/flow.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@ endpoint "todo" "graphql" {
base = "query"
}

endpoint "sample" "graphql" {
path = "sample"
name = "Sample"
base = "query"
}

flow "todo" {
input "proto.Query" {}

Expand All @@ -41,3 +47,12 @@ flow "todo" {
completed = "{{ query:completed }}"
}
}

flow "sample" {
input "proto.Query" {}

output "proto.Item" {
title = "sample"
completed = true
}
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ require (
github.com/mitchellh/go-wordwrap v1.0.0 // indirect
github.com/onsi/ginkgo v1.10.1 // indirect
github.com/onsi/gomega v1.7.0 // indirect
github.com/prometheus/client_golang v1.2.1 // indirect
github.com/prometheus/client_golang v1.2.1
github.com/sirupsen/logrus v1.4.2
github.com/spf13/cobra v0.0.6
github.com/spf13/pflag v1.0.5 // indirect
Expand Down
5 changes: 4 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,10 @@ func (client *Client) Close() {
// New constructs a new Maestro instance
func New(opts ...constructor.Option) (*Client, error) {
ctx := instance.NewContext()
options := NewOptions(ctx, opts...)
options, err := NewOptions(ctx, opts...)
if err != nil {
return nil, err
}

mem := functions.Collection{}
collection, err := constructor.Specs(ctx, mem, options)
Expand Down
147 changes: 147 additions & 0 deletions middleware.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
package maestro

import (
"context"

"github.com/jexia/maestro/pkg/constructor"
"github.com/jexia/maestro/pkg/flow"
"github.com/jexia/maestro/pkg/instance"
"github.com/jexia/maestro/pkg/refs"
)

// WithMiddleware initialises the given middleware and defines all options
func WithMiddleware(middleware constructor.Middleware) constructor.Option {
return func(options *constructor.Options) {
options.Middleware = append(options.Middleware, middleware)
}
}

// AfterConstructor the passed function gets called once all options have been applied
func AfterConstructor(wrapper constructor.AfterConstructorHandler) constructor.Option {
return func(options *constructor.Options) {
if options.AfterConstructor == nil {
options.AfterConstructor = wrapper(func(instance.Context, *constructor.Collection) error { return nil })
return
}

options.AfterConstructor = wrapper(options.AfterConstructor)
}
}

// BeforeManagerDo the passed function gets called before a request gets handled by a flow manager
func BeforeManagerDo(wrapper flow.BeforeManagerHandler) constructor.Option {
return func(options *constructor.Options) {
if options.BeforeManagerDo == nil {
options.BeforeManagerDo = wrapper(func(ctx context.Context, manager *flow.Manager, store refs.Store) (context.Context, error) {
return ctx, nil
})

return
}

options.BeforeManagerDo = wrapper(options.BeforeManagerDo)
}
}

// BeforeManagerRollback the passed function gets called before a rollback request gets handled by a flow manager
func BeforeManagerRollback(wrapper flow.BeforeManagerHandler) constructor.Option {
return func(options *constructor.Options) {
if options.BeforeManagerRollback == nil {
options.BeforeManagerRollback = wrapper(func(ctx context.Context, manager *flow.Manager, store refs.Store) (context.Context, error) {
return ctx, nil
})

return
}

options.BeforeManagerRollback = wrapper(options.BeforeManagerRollback)
}
}

// AfterManagerDo the passed function gets after a flow has been handled by the flow manager
func AfterManagerDo(wrapper flow.AfterManagerHandler) constructor.Option {
return func(options *constructor.Options) {
if options.AfterManagerDo == nil {
options.AfterManagerDo = wrapper(func(ctx context.Context, manager *flow.Manager, store refs.Store) (context.Context, error) {
return ctx, nil
})

return
}

options.AfterManagerDo = wrapper(options.AfterManagerDo)
}
}

// AfterManagerRollback the passed function gets after a flow rollback has been handled by the flow manager
func AfterManagerRollback(wrapper flow.AfterManagerHandler) constructor.Option {
return func(options *constructor.Options) {
if options.AfterManagerRollback == nil {
options.AfterManagerRollback = wrapper(func(ctx context.Context, manager *flow.Manager, store refs.Store) (context.Context, error) {
return ctx, nil
})

return
}

options.AfterManagerRollback = wrapper(options.AfterManagerRollback)
}
}

// BeforeNodeDo the passed function gets called before a node is executed
func BeforeNodeDo(wrapper flow.BeforeNodeHandler) constructor.Option {
return func(options *constructor.Options) {
if options.BeforeNodeDo == nil {
options.BeforeNodeDo = wrapper(func(ctx context.Context, node *flow.Node, tracker *flow.Tracker, processes *flow.Processes, store refs.Store) (context.Context, error) {
return ctx, nil
})

return
}

options.BeforeNodeDo = wrapper(options.BeforeNodeDo)
}
}

// BeforeNodeRollback the passed function gets called before a node rollback is executed
func BeforeNodeRollback(wrapper flow.BeforeNodeHandler) constructor.Option {
return func(options *constructor.Options) {
if options.BeforeNodeRollback == nil {
options.BeforeNodeRollback = wrapper(func(ctx context.Context, node *flow.Node, tracker *flow.Tracker, processes *flow.Processes, store refs.Store) (context.Context, error) {
return ctx, nil
})

return
}

options.BeforeNodeRollback = wrapper(options.BeforeNodeRollback)
}
}

// AfterNodeDo the passed function gets called after a node is executed
func AfterNodeDo(wrapper flow.AfterNodeHandler) constructor.Option {
return func(options *constructor.Options) {
if options.AfterNodeDo == nil {
options.AfterNodeDo = wrapper(func(ctx context.Context, node *flow.Node, tracker *flow.Tracker, processes *flow.Processes, store refs.Store) (context.Context, error) {
return ctx, nil
})
return
}

options.AfterNodeDo = wrapper(options.AfterNodeDo)
}
}

// AfterNodeRollback the passed function gets called after a node rollback is executed
func AfterNodeRollback(wrapper flow.AfterNodeHandler) constructor.Option {
return func(options *constructor.Options) {
if options.AfterNodeRollback == nil {
options.AfterNodeRollback = wrapper(func(ctx context.Context, node *flow.Node, tracker *flow.Tracker, processes *flow.Processes, store refs.Store) (context.Context, error) {
return ctx, nil
})
return
}

options.AfterNodeRollback = wrapper(options.AfterNodeRollback)
}
}
32 changes: 15 additions & 17 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,33 +10,31 @@ import (
"github.com/jexia/maestro/pkg/transport"
)

// BeforeOptionsFn gets called before initialising the options
type BeforeOptionsFn func(instance.Context, *constructor.Options) error

// AfterOptionsFn gets called after all options have been initialised
type AfterOptionsFn func(instance.Context, *constructor.Options) error

// NewOptions constructs a constructor.Options object from the given constructor.Option constructors
func NewOptions(ctx instance.Context, options ...constructor.Option) constructor.Options {
func NewOptions(ctx instance.Context, options ...constructor.Option) (constructor.Options, error) {
result := constructor.NewOptions(ctx)

for _, option := range options {
option(&result)
}

return result
}

// AfterConstructor the passed function gets called once all options have been applied
func AfterConstructor(wrapper constructor.AfterConstructorHandler) constructor.Option {
return func(options *constructor.Options) {
if options.AfterConstructor == nil {
options.AfterConstructor = wrapper(func(instance.Context, *constructor.Collection) error { return nil })
return
for _, middleware := range result.Middleware {
options, err := middleware(ctx)
if err != nil {
return result, err
}

options.AfterConstructor = wrapper(options.AfterConstructor)
for _, option := range options {
option(&result)
}
}

return result, nil
}

// NewCollection constructs a new options collection
func NewCollection(options ...constructor.Option) []constructor.Option {
return options
}

// WithFlows appends the given flows resolver to the available flow resolvers
Expand Down
15 changes: 13 additions & 2 deletions pkg/constructor/constructor.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,15 +90,26 @@ func FlowManager(ctx instance.Context, mem functions.Collection, services *specs
return nil, err
}

nodes[index] = flow.NewNode(ctx, node, caller, rollback)
nodes[index] = flow.NewNode(ctx, node, caller, rollback, &flow.NodeMiddleware{
BeforeDo: options.BeforeNodeDo,
AfterDo: options.AfterNodeDo,
BeforeRollback: options.BeforeNodeRollback,
AfterRollback: options.AfterNodeRollback,
})
}

forward, err := Forward(services, flows, manager.GetForward(), options)
if err != nil {
return nil, err
}

result.Flow = flow.NewManager(ctx, manager.GetName(), nodes)
result.Flow = flow.NewManager(ctx, manager.GetName(), nodes, &flow.ManagerMiddleware{
BeforeDo: options.BeforeManagerDo,
AfterDo: options.AfterManagerDo,
BeforeRollback: options.BeforeManagerRollback,
AfterRollback: options.AfterManagerRollback,
})

result.Forward = forward

results[index] = result
Expand Down
Loading

0 comments on commit 2f96a41

Please sign in to comment.