Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support reloading ES client's password from file #4342

Merged
merged 15 commits into from
Sep 9, 2023
Prev Previous commit
Next Next commit
simplify, add tests
Signed-off-by: Yuri Shkuro <github@ysh.us>
  • Loading branch information
yurishkuro committed Sep 8, 2023
commit ee85e72cc3ab2b20894da425398bf92f6ce54484
35 changes: 13 additions & 22 deletions plugin/storage/es/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,36 +291,27 @@ func (f *Factory) Close() error {
}

func (f *Factory) onPrimaryPasswordChange() {
newPrimaryPassword, err := loadTokenFromFile(f.primaryConfig.PasswordFilePath)
if err != nil {
f.logger.Error("failed to reload password for primary Elasticsearch client", zap.Error(err))
return
}
newPrimaryCfg := *f.primaryConfig // copy by value
newPrimaryCfg.Password = newPrimaryPassword
newPrimaryCfg.PasswordFilePath = "" // avoid error that both are set
primaryClient, err := f.newClientFn(&newPrimaryCfg, f.logger, f.metricsFactory)
if err != nil {
f.logger.Error("failed to recreate primary Elasticsearch client from new password", zap.Error(err))
} else {
f.primaryClient.Store(&primaryClient)
}
f.onClientPasswordChange(f.primaryConfig, &f.primaryClient)
}

func (f *Factory) onArchivePasswordChange() {
newPassword, err := loadTokenFromFile(f.archiveConfig.PasswordFilePath)
f.onClientPasswordChange(f.archiveConfig, &f.archiveClient)
}

func (f *Factory) onClientPasswordChange(cfg *config.Configuration, client *atomic.Pointer[es.Client]) {
newPassword, err := loadTokenFromFile(cfg.PasswordFilePath)
if err != nil {
f.logger.Error("failed to reload password for archive Elasticsearch client", zap.Error(err))
f.logger.Error("failed to reload password for Elasticsearch client", zap.Error(err))
return
}
newArchiveCfg := *f.archiveConfig // copy by value
newArchiveCfg.Password = newPassword
newArchiveCfg.PasswordFilePath = "" // avoid error that both are set
archiveClient, err := f.newClientFn(&newArchiveCfg, f.logger, f.metricsFactory)
newCfg := *cfg // copy by value
newCfg.Password = newPassword
newCfg.PasswordFilePath = "" // avoid error that both are set
primaryClient, err := f.newClientFn(&newCfg, f.logger, f.metricsFactory)
if err != nil {
f.logger.Error("failed to recreate archive Elasticsearch client from new password", zap.Error(err))
f.logger.Error("failed to recreate Elasticsearch client with new password", zap.Error(err))
} else {
f.archiveClient.Store(&archiveClient)
client.Store(&primaryClient)
}
}

Expand Down
91 changes: 79 additions & 12 deletions plugin/storage/es/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,21 @@ import (
escfg "github.com/jaegertracing/jaeger/pkg/es/config"
"github.com/jaegertracing/jaeger/pkg/es/mocks"
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/pkg/testutils"
"github.com/jaegertracing/jaeger/storage"
"github.com/jaegertracing/jaeger/storage/spanstore"
)

var _ storage.Factory = new(Factory)

var mockEsServerResponse = []byte(`
{
"Version": {
"Number": "6"
}
}
`)

type mockClientBuilder struct {
err error
createTemplateError error
Expand Down Expand Up @@ -252,6 +262,25 @@ func TestInitFromOptions(t *testing.T) {
}

func TestPasswordFromFile(t *testing.T) {
t.Run("primary client", func(t *testing.T) {
f := NewFactory()
testPasswordFromFile(t, f, f.getPrimaryClient, f.CreateSpanWriter)
})

t.Run("archive client", func(t *testing.T) {
f2 := NewFactory()
testPasswordFromFile(t, f2, f2.getArchiveClient, f2.CreateArchiveSpanWriter)
})

t.Run("load token error", func(t *testing.T) {
file := filepath.Join(t.TempDir(), "does not exist")
token, err := loadTokenFromFile(file)
assert.Error(t, err)
assert.Equal(t, "", token)
})
}

func testPasswordFromFile(t *testing.T, f *Factory, getClient func() es.Client, getWriter func() (spanstore.Writer, error)) {
var authReceived atomic.Pointer[string]
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
t.Logf("request to fake ES server: %v", r)
Expand All @@ -262,42 +291,41 @@ func TestPasswordFromFile(t *testing.T) {
assert.NoError(t, err, "header: %s", h)
auth := string(authBytes)
authReceived.Store(&auth)
w.Write([]byte(`
{
"Version": {
"Number": "6"
}
}
`))
w.Write(mockEsServerResponse)
}))
defer server.Close()

pwdFile := filepath.Join(t.TempDir(), "pwd")
require.NoError(t, os.WriteFile(pwdFile, []byte("first password"), 0o600))

f := NewFactory()
f.primaryConfig = &escfg.Configuration{
Servers: []string{server.URL},
LogLevel: "debug",
PasswordFilePath: pwdFile,
}
f.archiveConfig = &escfg.Configuration{}
f.archiveConfig = &escfg.Configuration{
Enabled: true,
Servers: []string{server.URL},
LogLevel: "debug",
PasswordFilePath: pwdFile,
}
require.NoError(t, f.Initialize(metrics.NullFactory, zaptest.NewLogger(t)))
defer f.Close()

writer, err := f.CreateSpanWriter()
writer, err := getWriter()
require.NoError(t, err)
span := &model.Span{
Process: &model.Process{ServiceName: "foo"},
}
require.NoError(t, writer.WriteSpan(context.Background(), span))
require.Equal(t, ":first password", *authReceived.Load())

client1 := f.getPrimaryClient()
// replace password in the file
client1 := getClient()
require.NoError(t, os.WriteFile(pwdFile, []byte("second password"), 0o600))
assert.Eventually(t,
func() bool {
client2 := f.getPrimaryClient()
client2 := getClient()
return client1 != client2
},
5*time.Second, time.Millisecond,
Expand All @@ -306,3 +334,42 @@ func TestPasswordFromFile(t *testing.T) {
require.NoError(t, writer.WriteSpan(context.Background(), span))
require.Equal(t, ":second password", *authReceived.Load())
}

func TestPasswordFromFileErrors(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Write(mockEsServerResponse)
}))
defer server.Close()

pwdFile := filepath.Join(t.TempDir(), "pwd")
require.NoError(t, os.WriteFile(pwdFile, []byte("first password"), 0o600))

f := NewFactory()
f.primaryConfig = &escfg.Configuration{
Servers: []string{server.URL},
LogLevel: "debug",
PasswordFilePath: pwdFile,
}
f.archiveConfig = &escfg.Configuration{
Servers: []string{server.URL},
LogLevel: "debug",
PasswordFilePath: pwdFile,
}

logger, buf := testutils.NewEchoLogger(t)
require.NoError(t, f.Initialize(metrics.NullFactory, logger))
defer f.Close()

f.primaryConfig.Servers = []string{}
f.onPrimaryPasswordChange()
assert.Contains(t, buf.String(), "no servers specified")

f.archiveConfig.Servers = []string{}
buf.Reset()
f.onArchivePasswordChange()
assert.Contains(t, buf.String(), "no servers specified")

require.NoError(t, os.Remove(pwdFile))
f.onPrimaryPasswordChange()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know why but I have to explicitly call f.onPrimaryPasswordChange() here to make the password change. Don't know if it's a problem of the watcher in the implementation or a problem in my test.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you need to either fsync or close the file after writing, otherwise the write may be still in the OS buffer

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, because watcher is running in a separate goroutine, you cannot immediately validate if the client was updated, usually we use assert.Eventually (grep for examples in this repo)

f.onArchivePasswordChange()
}