Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into max_log_size
Browse files Browse the repository at this point in the history
  • Loading branch information
Snyssfx committed Oct 14, 2021
2 parents 1d8aba5 + 2bed43d commit 7974ccf
Show file tree
Hide file tree
Showing 44 changed files with 1,190 additions and 167 deletions.
6 changes: 3 additions & 3 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
FROM ubuntu:20.04

RUN apt-get update
RUN apt-get install systemd -y

WORKDIR /file.d

COPY ./file.d .

RUN apt-get update
RUN apt-get install systemd -y

CMD ./file.d
26 changes: 21 additions & 5 deletions cfg/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@ import (
"github.com/bitly/go-simplejson"
"github.com/ghodss/yaml"
"github.com/ozonru/file.d/logger"
"github.com/pkg/errors"
)

type Config struct {
Vault VaultConfig
Pipelines map[string]*PipelineConfig
Vault VaultConfig
PanicTimeout time.Duration
Pipelines map[string]*PipelineConfig
}

type (
Expand Down Expand Up @@ -97,7 +97,7 @@ func applyEnvs(json *simplejson.Json) error {
for _, env := range os.Environ() {
kv := strings.SplitN(env, "=", 2)
if len(kv) != 2 {
return errors.Errorf("can't parse env %s", env)
return fmt.Errorf("can't parse env %s", env)
}

k, v := kv[0], kv[1]
Expand Down Expand Up @@ -139,6 +139,20 @@ func parseConfig(json *simplejson.Json) *Config {
config.Pipelines[i] = &PipelineConfig{Raw: raw}
}

panicTimeoutStr, err := json.Get("panic_timeout").String()
if err != nil {
logger.Warnf("can't get panic_timeout: %s", err.Error())
}
if panicTimeoutStr == "" {
panicTimeoutStr = "1m"
}

panicTimeout, err := time.ParseDuration(panicTimeoutStr)
if err != nil {
logger.Panicf("can't parse panic_timeout: %s", err.Error())
}
config.PanicTimeout = panicTimeout

return config
}

Expand Down Expand Up @@ -189,11 +203,13 @@ func tryGetSecret(vault secreter, field *simplejson.Json) (string, bool) {
noSpaces := strings.ReplaceAll(args, " ", "")
pathAndKey := strings.Split(noSpaces, ",")

logger.Infof("get secrets for %q and %q", pathAndKey[0], pathAndKey[1])
secret, err := vault.GetSecret(pathAndKey[0], pathAndKey[1])
if err != nil {
logger.Fatalf("can't GetSecret: %s", err.Error())
}

logger.Infof("success getting secret %q and %q", pathAndKey[0], pathAndKey[1])
return secret, true
}

Expand Down Expand Up @@ -294,7 +310,7 @@ func ParseField(v reflect.Value, vField reflect.Value, tField reflect.StructFiel
case reflect.Int:
val, err := strconv.Atoi(tag)
if err != nil {
return errors.Wrapf(err, "default value for field %s should be int, got=%s", tField.Name, tag)
return fmt.Errorf("default value for field %s should be int, got=%s: %w", tField.Name, tag, err)
}
vField.SetInt(int64(val))
case reflect.Slice:
Expand Down
9 changes: 5 additions & 4 deletions cfg/vault.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package cfg

import (
"fmt"

"github.com/hashicorp/vault/api"
"github.com/pkg/errors"
)

type secreter interface {
Expand All @@ -18,7 +19,7 @@ func newVault(addr, token string) (*vault, error) {
conf.Address = addr
c, err := api.NewClient(conf)
if err != nil {
return nil, errors.Wrap(err, "can't create client")
return nil, fmt.Errorf("can't create client: %w", err)
}

c.SetToken(token)
Expand All @@ -30,12 +31,12 @@ func (v *vault) GetSecret(path, key string) (string, error) {
c := v.c
secret, err := c.Logical().Read(path)
if err != nil {
return "", errors.Wrap(err, "can't get secret")
return "", fmt.Errorf("can't get secret: %w", err)
}

str, ok := secret.Data[key].(string)
if !ok {
return "", errors.Wrap(err, "can't get 'value' of the secret")
return "", fmt.Errorf("can't get 'key' of the secret: %q", key)
}

return str, nil
Expand Down
8 changes: 6 additions & 2 deletions cmd/file.d.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/ozonru/file.d/cfg"
"github.com/ozonru/file.d/fd"
"github.com/ozonru/file.d/logger"
"github.com/ozonru/file.d/longpanic"
"github.com/ozonru/file.d/pipeline"
insaneJSON "github.com/vitkovskii/insane-json"
"go.uber.org/automaxprocs/maxprocs"
Expand Down Expand Up @@ -69,14 +70,17 @@ func main() {
_, _ = maxprocs.Set(maxprocs.Logger(logger.Debugf))

go listenSignals()
go start()
longpanic.Go(start)

<-exit
logger.Infof("see you soon...")
}

func start() {
fileD = fd.New(cfg.NewConfigFromFile(*config), *http)
cfg := cfg.NewConfigFromFile(*config)
longpanic.SetTimeout(cfg.PanicTimeout)

fileD = fd.New(cfg, *http)
fileD.Start()
}

Expand Down
4 changes: 4 additions & 0 deletions cmd/file.d_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ const testTime = 10 * time.Minute
// It's something like fuzz testing. file.d shouldn't crash/panic or hang for infinite time.
// E.g. keep this test running while you are sleeping :)
func TestEndToEnd(t *testing.T) {
if testing.Short() {
t.Skip("skipping testing in short mode")
}

configFilename := "./../testdata/config/e2e.yaml"
iterationInterval := time.Second * 10
writerCount := 8
Expand Down
29 changes: 25 additions & 4 deletions docs/architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,31 @@ Here is a bit simplified architecture of the **file.d** solution.

What's going on here:

- **Input plugin** pulls data from external systems and pushes next to the pipeline controller. Full list of input plugins available is [here](../plugin/input).
- The **pipeline controller** is in charge of converting data to event and subsequent routing.
- **Input plugin** pulls data from external systems and pushes it next to the pipeline controller. Full list of input plugins available is [here](../plugin/input).
- The **pipeline controller** creates **streams** of the data and is in charge of converting data to event and subsequent routing.
- The **event pool** provides fast event instancing.
- Events are processed by one or more **action plugins**; they act on the events which meet particular criteria.
- Events are processed by one or more **processors**. Every processor holds all **action plugins** from the configuration.
- Every moment the processor gets a stream of data, process 1 or more events and returns the stream to a **streamer** that is a pool of streams.
- Action plugins act on the events which meet particular criteria.
- Finally, the event goes to the **output plugins** and is dispatched to the external system.

You can extend `file.d` by adding your own input, action, and output plugins.
You can extend `file.d` by adding your own input, action, and output plugins.

## Plugin endpoints
Every plugin can expose their own API via `PluginStaticInfo.Endpoints`.
Plugin endpoints can be accessed via URL
`/pipelines/<pipeline_name>/<plugin_index_in_config>/<plugin_endpoint>`.
Input plugin has the index of zero, output plugin has the last index.

#### `/info` and `/sample`
Actions also have the standard endpoints `/info` and `/sample`.
If the action has `metric_name`, it will be collected and can be viewed via the `/info` endpoint.
The `/sample` handler stores and shows an event before and after processing, so you can debug the action better.

#### `longpanic` and `/reset`
Every goroutine can (and should) use `longpanic.Go` and `longpanic.WithRecover` functions.
`longpanic.Go` is a goroutine wrapper that panics only after a timeout that you can set in pipeline settings.
`longpanic.WithRecover` is essentially the same but it runs a function synchronously.
It helps to debug the app, because you can see the state of failed file.d via API.
Also you can restart the failed plugin via API, i.e. with the `/reset` endpoint of `file` input plugin.
In case of nobody call API, it will panic with the given error message.
6 changes: 4 additions & 2 deletions fd/file.d.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/bitly/go-simplejson"
"github.com/ozonru/file.d/cfg"
"github.com/ozonru/file.d/logger"
"github.com/ozonru/file.d/longpanic"
"github.com/ozonru/file.d/pipeline"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
Expand Down Expand Up @@ -76,7 +77,7 @@ func (f *FileD) addPipeline(name string, config *cfg.PipelineConfig) {

logger.Infof("creating pipeline %q: capacity=%d, stream field=%s, decoder=%s", name, settings.Capacity, settings.StreamField, settings.Decoder)

p := pipeline.New(name, settings, f.registry, mux)
p := pipeline.New(name, settings, f.registry)
err := f.setupInput(p, config, values)
if err != nil {
logger.Fatalf("can't create pipeline %q: %s", name, err.Error())
Expand All @@ -89,6 +90,7 @@ func (f *FileD) addPipeline(name string, config *cfg.PipelineConfig) {
logger.Fatalf("can't create pipeline %q: %s", name, err.Error())
}

p.SetupHTTPHandlers(mux)
f.Pipelines = append(f.Pipelines, p)
}

Expand Down Expand Up @@ -256,7 +258,7 @@ func (f *FileD) startHTTP() {
mux.Handle("/metrics", promhttp.Handler())

f.server = &http.Server{Addr: f.httpAddr, Handler: mux}
go f.listenHTTP()
longpanic.Go(f.listenHTTP)
}

func (f *FileD) listenHTTP() {
Expand Down
3 changes: 1 addition & 2 deletions fd/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"github.com/ozonru/file.d/cfg"
"github.com/ozonru/file.d/logger"
"github.com/ozonru/file.d/pipeline"
"github.com/pkg/errors"
)

func extractPipelineParams(settings *simplejson.Json) *pipeline.Settings {
Expand Down Expand Up @@ -114,7 +113,7 @@ func extractConditions(condJSON *simplejson.Json) (pipeline.MatchConditions, err
if len(value) > 0 && value[0] == '/' {
r, err := cfg.CompileRegex(value)
if err != nil {
return nil, errors.Wrapf(err, "can't compile regexp %s: %s", value, err)
return nil, fmt.Errorf("can't compile regexp %s: %w", value, err)
}
condition.Regexp = r
} else {
Expand Down
65 changes: 63 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
module github.com/ozonru/file.d

go 1.17

require (
github.com/Shopify/sarama v1.29.1
github.com/alecthomas/kingpin v2.2.6+incompatible
Expand All @@ -14,7 +16,6 @@ require (
github.com/hashicorp/vault/api v1.1.1
github.com/imdario/mergo v0.3.7 // indirect
github.com/minio/minio-go v6.0.14+incompatible
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.4.0
github.com/rjeczalik/notify v0.9.2
github.com/satori/go.uuid v1.2.0
Expand All @@ -28,11 +29,71 @@ require (
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/ini.v1 v1.62.0 // indirect
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b
k8s.io/api v0.0.0-20190620084959-7cf5895f2711
k8s.io/apimachinery v0.0.0-20190704094625-facf06a8f4b8
k8s.io/client-go v11.0.0+incompatible
k8s.io/klog v0.3.3 // indirect
k8s.io/utils v0.0.0-20190829053155-3a4a5477acf8 // indirect
)

go 1.15
require (
github.com/BurntSushi/toml v0.3.1 // indirect
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cenkalti/backoff/v3 v3.0.0 // indirect
github.com/cespare/xxhash/v2 v2.1.1 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/eapache/go-resiliency v1.2.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 // indirect
github.com/eapache/queue v1.1.0 // indirect
github.com/gogo/protobuf v1.3.1 // indirect
github.com/golang/protobuf v1.4.2 // indirect
github.com/golang/snappy v0.0.3 // indirect
github.com/google/gofuzz v1.0.0 // indirect
github.com/hashicorp/errwrap v1.0.0 // indirect
github.com/hashicorp/go-cleanhttp v0.5.1 // indirect
github.com/hashicorp/go-multierror v1.1.0 // indirect
github.com/hashicorp/go-retryablehttp v0.6.6 // indirect
github.com/hashicorp/go-rootcerts v1.0.2 // indirect
github.com/hashicorp/go-sockaddr v1.0.2 // indirect
github.com/hashicorp/go-uuid v1.0.2 // indirect
github.com/hashicorp/golang-lru v0.5.3 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/hashicorp/vault/sdk v0.2.1 // indirect
github.com/jcmturner/aescts/v2 v2.0.0 // indirect
github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect
github.com/jcmturner/gofork v1.0.0 // indirect
github.com/jcmturner/gokrb5/v8 v8.4.2 // indirect
github.com/jcmturner/rpc/v2 v2.0.3 // indirect
github.com/json-iterator/go v1.1.9 // indirect
github.com/klauspost/compress v1.12.2 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/mitchellh/go-homedir v1.1.0 // indirect
github.com/mitchellh/mapstructure v1.3.2 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.1 // indirect
github.com/pierrec/lz4 v2.6.0+incompatible // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.9.1 // indirect
github.com/prometheus/procfs v0.0.8 // indirect
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect
github.com/ryanuber/go-glob v1.0.0 // indirect
github.com/spf13/pflag v1.0.3 // indirect
go.uber.org/multierr v1.3.0 // indirect
go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee // indirect
golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e // indirect
golang.org/x/lint v0.0.0-20190930215403-16217165b5de // indirect
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1 // indirect
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1 // indirect
golang.org/x/text v0.3.6 // indirect
golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1 // indirect
golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5 // indirect
google.golang.org/appengine v1.4.0 // indirect
google.golang.org/protobuf v1.25.0 // indirect
gopkg.in/square/go-jose.v2 v2.5.1 // indirect
gopkg.in/yaml.v2 v2.2.8 // indirect
honnef.co/go/tools v0.0.1-2019.2.3 // indirect
sigs.k8s.io/yaml v1.1.0 // indirect
)
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -149,9 +149,7 @@ github.com/googleapis/gnostic v0.3.1/go.mod h1:on+2t9HRStVgn95RSsFWFz+6Q0Snyqv1a
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8=
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
github.com/gorilla/mux v1.7.4/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
github.com/gorilla/securecookie v1.1.1 h1:miw7JPhV+b/lAHSXz4qd/nN9jRiAFV5FwjeKyCS8BvQ=
github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4=
github.com/gorilla/sessions v1.2.1 h1:DHd3rPN5lE3Ts3D8rKkQ8x/0kqfeNmBAaiSi+o7FsgI=
github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM=
github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
Expand Down
Loading

0 comments on commit 7974ccf

Please sign in to comment.