Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support reloading ES client's password from file #4342

Merged
merged 15 commits into from
Sep 9, 2023
16 changes: 14 additions & 2 deletions pkg/es/config/config.go
Original file line number Diff line number Diff line change
@@ -49,6 +49,7 @@ type Configuration struct {
Username string `mapstructure:"username"`
Password string `mapstructure:"password" json:"-"`
TokenFilePath string `mapstructure:"token_file"`
PasswordFilePath string `mapstructure:"password_file"`
AllowTokenFromContext bool `mapstructure:"-"`
Sniffer bool `mapstructure:"sniffer"` // https://github.com/olivere/elastic/wiki/Sniffing
SnifferTLSEnabled bool `mapstructure:"sniffer_tls_enabled"`
@@ -310,6 +311,17 @@ func (c *Configuration) getConfigOptions(logger *zap.Logger) ([]elastic.ClientOp
Timeout: c.Timeout,
}
options = append(options, elastic.SetHttpClient(httpClient))

if c.Password != "" && c.PasswordFilePath != "" {
return nil, fmt.Errorf("both Password and PasswordFilePath are set")
}
if c.PasswordFilePath != "" {
passwordFromFile, err := loadTokenFromFile(c.PasswordFilePath)
if err != nil {
return nil, fmt.Errorf("failed to load password from file: %w", err)
}
c.Password = passwordFromFile
}
options = append(options, elastic.SetBasicAuth(c.Username, c.Password))

if c.SendGetBodyAs != "" {
@@ -396,7 +408,7 @@ func GetHTTPRoundTripper(c *Configuration, logger *zap.Logger) (http.RoundTrippe
if c.AllowTokenFromContext {
logger.Warn("Token file and token propagation are both enabled, token from file won't be used")
}
tokenFromFile, err := loadToken(c.TokenFilePath)
tokenFromFile, err := loadTokenFromFile(c.TokenFilePath)
if err != nil {
return nil, err
}
@@ -412,7 +424,7 @@ func GetHTTPRoundTripper(c *Configuration, logger *zap.Logger) (http.RoundTrippe
return transport, nil
}

func loadToken(path string) (string, error) {
func loadTokenFromFile(path string) (string, error) {
b, err := os.ReadFile(filepath.Clean(path))
if err != nil {
return "", err
10 changes: 5 additions & 5 deletions plugin/storage/es/dependencystore/storage.go
Original file line number Diff line number Diff line change
@@ -38,7 +38,7 @@ const (

// DependencyStore handles all queries and insertions to ElasticSearch dependencies
type DependencyStore struct {
client es.Client
client func() es.Client
logger *zap.Logger
dependencyIndexPrefix string
indexDateLayout string
@@ -48,7 +48,7 @@ type DependencyStore struct {

// DependencyStoreParams holds constructor parameters for NewDependencyStore
type DependencyStoreParams struct {
Client es.Client
Client func() es.Client
Logger *zap.Logger
IndexPrefix string
IndexDateLayout string
@@ -84,15 +84,15 @@ func (s *DependencyStore) WriteDependencies(ts time.Time, dependencies []model.D

// CreateTemplates creates index templates.
func (s *DependencyStore) CreateTemplates(dependenciesTemplate string) error {
_, err := s.client.CreateTemplate("jaeger-dependencies").Body(dependenciesTemplate).Do(context.Background())
_, err := s.client().CreateTemplate("jaeger-dependencies").Body(dependenciesTemplate).Do(context.Background())
if err != nil {
return err
}
return nil
}

func (s *DependencyStore) writeDependencies(indexName string, ts time.Time, dependencies []model.DependencyLink) {
s.client.Index().Index(indexName).Type(dependencyType).
s.client().Index().Index(indexName).Type(dependencyType).
BodyJson(&dbmodel.TimeDependencies{
Timestamp: ts,
Dependencies: dbmodel.FromDomainDependencies(dependencies),
@@ -102,7 +102,7 @@ func (s *DependencyStore) writeDependencies(indexName string, ts time.Time, depe
// GetDependencies returns all interservice dependencies
func (s *DependencyStore) GetDependencies(ctx context.Context, endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) {
indices := s.getReadIndices(endTs, lookback)
searchResult, err := s.client.Search(indices...).
searchResult, err := s.client().Search(indices...).
Size(s.maxDocCount).
Query(buildTSQuery(endTs, lookback)).
IgnoreUnavailable(true).
5 changes: 3 additions & 2 deletions plugin/storage/es/dependencystore/storage_test.go
Original file line number Diff line number Diff line change
@@ -29,6 +29,7 @@ import (
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/pkg/es"
"github.com/jaegertracing/jaeger/pkg/es/mocks"
"github.com/jaegertracing/jaeger/pkg/testutils"
"github.com/jaegertracing/jaeger/storage/dependencystore"
@@ -51,7 +52,7 @@ func withDepStorage(indexPrefix, indexDateLayout string, maxDocCount int, fn fun
logger: logger,
logBuffer: logBuffer,
storage: NewDependencyStore(DependencyStoreParams{
Client: client,
Client: func() es.Client { return client },
Logger: logger,
IndexPrefix: indexPrefix,
IndexDateLayout: indexDateLayout,
@@ -78,7 +79,7 @@ func TestNewSpanReaderIndexPrefix(t *testing.T) {
for _, testCase := range testCases {
client := &mocks.Client{}
r := NewDependencyStore(DependencyStoreParams{
Client: client,
Client: func() es.Client { return client },
Logger: zap.NewNop(),
IndexPrefix: testCase.prefix,
IndexDateLayout: "2006-01-02",
115 changes: 94 additions & 21 deletions plugin/storage/es/factory.go
Original file line number Diff line number Diff line change
@@ -16,9 +16,14 @@
package es

import (
"errors"
"flag"
"fmt"
"io"
"os"
"path/filepath"
"strings"
"sync/atomic"

"github.com/spf13/viper"
"go.opentelemetry.io/otel"
@@ -27,6 +32,7 @@ import (

"github.com/jaegertracing/jaeger/pkg/es"
"github.com/jaegertracing/jaeger/pkg/es/config"
"github.com/jaegertracing/jaeger/pkg/fswatcher"
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/plugin"
esDepStore "github.com/jaegertracing/jaeger/plugin/storage/es/dependencystore"
@@ -57,9 +63,12 @@ type Factory struct {
newClientFn func(c *config.Configuration, logger *zap.Logger, metricsFactory metrics.Factory) (es.Client, error)

primaryConfig *config.Configuration
primaryClient es.Client
archiveConfig *config.Configuration
archiveClient es.Client

primaryClient atomic.Pointer[es.Client]
archiveClient atomic.Pointer[es.Client]

watchers []*fswatcher.FSWatcher
}

// NewFactory creates a new Factory.
@@ -87,62 +96,87 @@ func (f *Factory) InitFromViper(v *viper.Viper, logger *zap.Logger) {
func (f *Factory) InitFromOptions(o Options) {
f.Options = &o
f.primaryConfig = f.Options.GetPrimary()
if cfg := f.Options.Get(archiveNamespace); cfg != nil {
f.archiveConfig = cfg
}
f.archiveConfig = f.Options.Get(archiveNamespace)
}

// Initialize implements storage.Factory
// Initialize implements storage.Factory.
func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) error {
f.metricsFactory, f.logger = metricsFactory, logger

primaryClient, err := f.newClientFn(f.primaryConfig, logger, metricsFactory)
if err != nil {
return fmt.Errorf("failed to create primary Elasticsearch client: %w", err)
}
f.primaryClient = primaryClient
f.primaryClient.Store(&primaryClient)

if f.primaryConfig.PasswordFilePath != "" {
primaryWatcher, err := fswatcher.New([]string{f.primaryConfig.PasswordFilePath}, f.onPrimaryPasswordChange, f.logger)
if err != nil {
return fmt.Errorf("failed to create watcher for primary ES client's password: %w", err)
}
f.watchers = append(f.watchers, primaryWatcher)
}

if f.archiveConfig.Enabled {
f.archiveClient, err = f.newClientFn(f.archiveConfig, logger, metricsFactory)
archiveClient, err := f.newClientFn(f.archiveConfig, logger, metricsFactory)
if err != nil {
return fmt.Errorf("failed to create archive Elasticsearch client: %w", err)
}
f.archiveClient.Store(&archiveClient)

if f.archiveConfig.PasswordFilePath != "" {
archiveWatcher, err := fswatcher.New([]string{f.archiveConfig.PasswordFilePath}, f.onArchivePasswordChange, f.logger)
if err != nil {
return fmt.Errorf("failed to create watcher for archive ES client's password: %w", err)
}
f.watchers = append(f.watchers, archiveWatcher)
}
}

return nil
}

func (f *Factory) getPrimaryClient() es.Client {
return *(f.primaryClient.Load())
}

func (f *Factory) getArchiveClient() es.Client {
return *f.archiveClient.Load()
}

// CreateSpanReader implements storage.Factory
func (f *Factory) CreateSpanReader() (spanstore.Reader, error) {
return createSpanReader(f.primaryClient, f.primaryConfig, false, f.metricsFactory, f.logger, f.tracer)
return createSpanReader(f.getPrimaryClient, f.primaryConfig, false, f.metricsFactory, f.logger, f.tracer)
}

// CreateSpanWriter implements storage.Factory
func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) {
return createSpanWriter(f.primaryClient, f.primaryConfig, false, f.metricsFactory, f.logger)
return createSpanWriter(f.getPrimaryClient, f.primaryConfig, false, f.metricsFactory, f.logger)
}

// CreateDependencyReader implements storage.Factory
func (f *Factory) CreateDependencyReader() (dependencystore.Reader, error) {
return createDependencyReader(f.primaryClient, f.primaryConfig, f.logger)
return createDependencyReader(f.getPrimaryClient, f.primaryConfig, f.logger)
}

// CreateArchiveSpanReader implements storage.ArchiveFactory
func (f *Factory) CreateArchiveSpanReader() (spanstore.Reader, error) {
if !f.archiveConfig.Enabled {
return nil, nil
}
return createSpanReader(f.archiveClient, f.archiveConfig, true, f.metricsFactory, f.logger, f.tracer)
return createSpanReader(f.getArchiveClient, f.archiveConfig, true, f.metricsFactory, f.logger, f.tracer)
}

// CreateArchiveSpanWriter implements storage.ArchiveFactory
func (f *Factory) CreateArchiveSpanWriter() (spanstore.Writer, error) {
if !f.archiveConfig.Enabled {
return nil, nil
}
return createSpanWriter(f.archiveClient, f.archiveConfig, true, f.metricsFactory, f.logger)
return createSpanWriter(f.getArchiveClient, f.archiveConfig, true, f.metricsFactory, f.logger)
}

func createSpanReader(
client es.Client,
clientFn func() es.Client,
cfg *config.Configuration,
archive bool,
mFactory metrics.Factory,
@@ -153,7 +187,7 @@ func createSpanReader(
return nil, fmt.Errorf("--es.use-ilm must always be used in conjunction with --es.use-aliases to ensure ES writers and readers refer to the single index mapping")
}
return esSpanStore.NewSpanReader(esSpanStore.SpanReaderParams{
Client: client,
Client: clientFn,
MaxDocCount: cfg.MaxDocCount,
MaxSpanAge: cfg.MaxSpanAge,
IndexPrefix: cfg.IndexPrefix,
@@ -172,7 +206,7 @@ func createSpanReader(
}

func createSpanWriter(
client es.Client,
clientFn func() es.Client,
cfg *config.Configuration,
archive bool,
mFactory metrics.Factory,
@@ -202,7 +236,7 @@ func createSpanWriter(
return nil, err
}
writer := esSpanStore.NewSpanWriter(esSpanStore.SpanWriterParams{
Client: client,
Client: clientFn,
IndexPrefix: cfg.IndexPrefix,
SpanIndexDateLayout: cfg.IndexDateLayoutSpans,
ServiceIndexDateLayout: cfg.IndexDateLayoutServices,
@@ -226,12 +260,12 @@ func createSpanWriter(
}

func createDependencyReader(
client es.Client,
clientFn func() es.Client,
cfg *config.Configuration,
logger *zap.Logger,
) (dependencystore.Reader, error) {
reader := esDepStore.NewDependencyStore(esDepStore.DependencyStoreParams{
Client: client,
Client: clientFn,
Logger: logger,
IndexPrefix: cfg.IndexPrefix,
IndexDateLayout: cfg.IndexDateLayoutDependencies,
@@ -245,8 +279,47 @@ var _ io.Closer = (*Factory)(nil)

// Close closes the resources held by the factory
func (f *Factory) Close() error {
var errs []error
for _, w := range f.watchers {
errs = append(errs, w.Close())
}
if cfg := f.Options.Get(archiveNamespace); cfg != nil {
cfg.TLS.Close()
errs = append(errs, cfg.TLS.Close())
}
errs = append(errs, f.Options.GetPrimary().TLS.Close())
return errors.Join(errs...)
}

func (f *Factory) onPrimaryPasswordChange() {
f.onClientPasswordChange(f.primaryConfig, &f.primaryClient)
}

func (f *Factory) onArchivePasswordChange() {
f.onClientPasswordChange(f.archiveConfig, &f.archiveClient)
}

func (f *Factory) onClientPasswordChange(cfg *config.Configuration, client *atomic.Pointer[es.Client]) {
newPassword, err := loadTokenFromFile(cfg.PasswordFilePath)
if err != nil {
f.logger.Error("failed to reload password for Elasticsearch client", zap.Error(err))
return
}
f.logger.Sugar().Infof("loaded new password of length %d from file", len(newPassword))
newCfg := *cfg // copy by value
newCfg.Password = newPassword
newCfg.PasswordFilePath = "" // avoid error that both are set
primaryClient, err := f.newClientFn(&newCfg, f.logger, f.metricsFactory)
if err != nil {
f.logger.Error("failed to recreate Elasticsearch client with new password", zap.Error(err))
} else {
client.Store(&primaryClient)
}
}

func loadTokenFromFile(path string) (string, error) {
b, err := os.ReadFile(filepath.Clean(path))
if err != nil {
return "", err
}
return f.Options.GetPrimary().TLS.Close()
return strings.TrimRight(string(b), "\r\n"), nil
}
Loading
Oops, something went wrong.