Skip to content

Commit

Permalink
Unified snapshots configuration for datanode (#21031)
Browse files Browse the repository at this point in the history
* Preflight check for usable space and cache size

* add warning when search cache size occupies more than 80% of free disk space

* Added changelog

* unified searchable snapshots configuration for datanode

* removed node cache from OpensearchConfiguration (is managed by searchable snaphots config)

* code cleanup

* better node search cache documentation

* updated changelog

---------

Co-authored-by: Matthias Oesterheld <33032967+moesterheld@users.noreply.github.com>
  • Loading branch information
todvora and moesterheld authored Dec 3, 2024
1 parent 5cdcb0e commit fe5102b
Show file tree
Hide file tree
Showing 14 changed files with 571 additions and 52 deletions.
4 changes: 4 additions & 0 deletions changelog/unreleased/pr-21031.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
type = "a"
message = "Unified snapshot configuration, role and cache management in datanode"

pulls = ["21031"]
11 changes: 9 additions & 2 deletions data-node/src/main/java/org/graylog/datanode/Configuration.java
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,14 @@ public class Configuration {
@Parameter(value = "metrics_policy")
private String metricsPolicy = "gl-datanode-metrics-ism";

@Documentation(value = "Cache size for searchable snaphots")
/**
* @see <a href="https://opensearch.org/docs/latest/tuning-your-cluster/availability-and-recovery/snapshots/searchable_snapshot/#configuring-a-node-to-use-searchable-snapshots}">Searchable snapshots</a>
*/
@Documentation(value = """
Cache size for searchable snapshots. This space will be automatically reserved
if you have either S3 or shared filesystem repositories enabled and configured.
See s3_client_* configuration options and path_repo.
""")
@Parameter(value = "node_search_cache_size")
private String searchCacheSize = "10gb";

Expand All @@ -271,7 +278,7 @@ public class Configuration {

@Documentation("The list of the opensearch node’s roles.")
@Parameter(value = "node_roles", converter = StringListConverter.class)
private List<String> nodeRoles = List.of("cluster_manager", "data", "ingest", "remote_cluster_client", "search");
private List<String> nodeRoles = List.of("cluster_manager", "data", "ingest", "remote_cluster_client");

@Documentation(visible = false)
@Parameter(value = "async_eventbus_processors")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@
import org.graylog.datanode.opensearch.OpensearchProcess;
import org.graylog.datanode.opensearch.OpensearchProcessImpl;
import org.graylog.datanode.opensearch.OpensearchProcessService;
import org.graylog.datanode.opensearch.configuration.OpensearchUsableSpace;
import org.graylog.datanode.opensearch.configuration.OpensearchUsableSpaceProvider;
import org.graylog.datanode.opensearch.configuration.beans.OpensearchConfigurationBean;
import org.graylog.datanode.opensearch.configuration.beans.impl.SearchableSnapshotsConfigurationBean;
import org.graylog.datanode.opensearch.statemachine.OpensearchStateMachine;
import org.graylog.datanode.opensearch.statemachine.OpensearchStateMachineProvider;
import org.graylog.datanode.opensearch.statemachine.tracer.ClusterNodeStateTracer;
Expand All @@ -42,6 +46,13 @@ protected void configure() {

bind(OpensearchProcess.class).to(OpensearchProcessImpl.class).asEagerSingleton();
bind(OpensearchStateMachine.class).toProvider(OpensearchStateMachineProvider.class).asEagerSingleton();

bind(OpensearchUsableSpace.class).toProvider(OpensearchUsableSpaceProvider.class).asEagerSingleton();

//opensearch configuration beans
Multibinder<OpensearchConfigurationBean> opensearchConfigurationBeanMultibinder = Multibinder.newSetBinder(binder(), OpensearchConfigurationBean.class);
opensearchConfigurationBeanMultibinder.addBinding().to(SearchableSnapshotsConfigurationBean.class).asEagerSingleton();

// this service both starts and provides the opensearch process
serviceBinder.addBinding().to(OpensearchConfigurationService.class).asEagerSingleton();
serviceBinder.addBinding().to(OpensearchProcessService.class).asEagerSingleton();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import com.google.common.eventbus.Subscribe;
import com.google.common.util.concurrent.AbstractIdleService;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import org.graylog.datanode.Configuration;
import org.graylog.datanode.configuration.variants.InSecureConfiguration;
Expand All @@ -31,20 +30,22 @@
import org.graylog.datanode.configuration.variants.UploadedCertFilesSecureConfiguration;
import org.graylog.datanode.opensearch.OpensearchConfigurationChangeEvent;
import org.graylog.datanode.opensearch.configuration.OpensearchConfiguration;
import org.graylog.datanode.opensearch.configuration.beans.OpensearchConfigurationBean;
import org.graylog.datanode.opensearch.configuration.beans.OpensearchConfigurationPart;
import org.graylog.security.certutil.ca.exceptions.KeyStoreStorageException;
import org.graylog2.cluster.Node;
import org.graylog2.cluster.nodes.DataNodeDto;
import org.graylog2.cluster.nodes.NodeService;
import org.graylog2.security.JwtSecret;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.security.GeneralSecurityException;
import java.security.cert.X509Certificate;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

Expand All @@ -57,7 +58,7 @@ public class OpensearchConfigurationService extends AbstractIdleService {
private final DatanodeConfiguration datanodeConfiguration;
private final JwtSecret signingKey;
private final NodeService<DataNodeDto> nodeService;
private final S3RepositoryConfiguration s3RepositoryConfiguration;
private final Set<OpensearchConfigurationBean> opensearchConfigurationBeans;

/**
* This configuration won't survive datanode restart. But it can be repeatedly provided to the managed opensearch
Expand All @@ -75,7 +76,7 @@ public OpensearchConfigurationService(final Configuration localConfiguration,
final InSecureConfiguration inSecureConfiguration,
final NodeService<DataNodeDto> nodeService,
JwtSecret jwtSecret,
final S3RepositoryConfiguration s3RepositoryConfiguration,
final Set<OpensearchConfigurationBean> opensearchConfigurationBeans,
final EventBus eventBus) {
this.localConfiguration = localConfiguration;
this.datanodeConfiguration = datanodeConfiguration;
Expand All @@ -84,7 +85,7 @@ public OpensearchConfigurationService(final Configuration localConfiguration,
this.inSecureConfiguration = inSecureConfiguration;
this.signingKey = jwtSecret;
this.nodeService = nodeService;
this.s3RepositoryConfiguration = s3RepositoryConfiguration;
this.opensearchConfigurationBeans = opensearchConfigurationBeans;
this.eventBus = eventBus;
eventBus.register(this);
}
Expand Down Expand Up @@ -159,6 +160,10 @@ private OpensearchConfiguration get() {
opensearchProperties.putAll(securityConfiguration.getProperties());
}

final Set<OpensearchConfigurationPart> configurationParts = opensearchConfigurationBeans.stream()
.map(OpensearchConfigurationBean::buildConfigurationPart)
.collect(Collectors.toSet());

return new OpensearchConfiguration(
datanodeConfiguration.opensearchDistributionProvider().get(),
datanodeConfiguration.datanodeDirectories(),
Expand All @@ -171,8 +176,7 @@ private OpensearchConfiguration get() {
localConfiguration.getNodeRoles(),
localConfiguration.getOpensearchDiscoverySeedHosts(),
securityConfiguration,
s3RepositoryConfiguration,
localConfiguration.getNodeSearchCacheSize(),
configurationParts,
opensearchProperties.build()
);
} catch (GeneralSecurityException | KeyStoreStorageException | IOException e) {
Expand All @@ -189,11 +193,6 @@ private ImmutableMap<String, Object> commonOpensearchConfig(final Configuration

config.put("network.bind_host", localConfiguration.getBindAddress());

// https://opensearch.org/docs/latest/tuning-your-cluster/availability-and-recovery/snapshots/snapshot-restore/#shared-file-system
if (localConfiguration.getPathRepo() != null && !localConfiguration.getPathRepo().isEmpty()) {
config.put("path.repo", localConfiguration.getPathRepo());
}

config.put("network.publish_host", localConfiguration.getHostname());

if (localConfiguration.getOpensearchDebug() != null && !localConfiguration.getOpensearchDebug().isBlank()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,13 @@

import com.github.joschi.jadconfig.Parameter;
import com.github.joschi.jadconfig.converters.BooleanConverter;
import com.google.common.collect.ImmutableMap;
import org.apache.commons.lang3.StringUtils;
import org.graylog.datanode.configuration.variants.KeystoreContributor;
import org.graylog2.configuration.Documentation;

import java.util.Arrays;
import java.util.Map;

public class S3RepositoryConfiguration implements KeystoreContributor {
public class S3RepositoryConfiguration {

@Documentation("S3 repository access key for searchable snapshots")
@Parameter(value = "s3_client_default_access_key")
Expand Down Expand Up @@ -101,15 +99,4 @@ private boolean noneBlank(String... properties) {
private boolean allBlank(String... properties) {
return Arrays.stream(properties).allMatch(StringUtils::isBlank);
}

@Override
public Map<String, String> getKeystoreItems() {
final ImmutableMap.Builder<String, String> config = ImmutableMap.builder();
if (isRepositoryEnabled()) {
config.put("s3.client.default.access_key", getS3ClientDefaultAccessKey());
config.put("s3.client.default.secret_key", getS3ClientDefaultSecretKey());

}
return config.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
import java.util.Optional;
import java.util.stream.Collectors;

public class OpensearchSecurityConfiguration implements KeystoreContributor {
public class OpensearchSecurityConfiguration {

private static final Logger LOG = LoggerFactory.getLogger(OpensearchSecurityConfiguration.class);

Expand Down Expand Up @@ -129,7 +129,6 @@ public Map<String, String> getProperties() throws GeneralSecurityException, IOEx
return config.build();
}

@Override
public Map<String, String> getKeystoreItems() {
final ImmutableMap.Builder<String, String> config = ImmutableMap.builder();
if (securityEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,23 @@
*/
package org.graylog.datanode.opensearch.configuration;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import jakarta.annotation.Nonnull;
import org.apache.commons.exec.OS;
import org.graylog.datanode.OpensearchDistribution;
import org.graylog.datanode.configuration.DatanodeDirectories;
import org.graylog.datanode.configuration.S3RepositoryConfiguration;
import org.graylog.datanode.configuration.variants.KeystoreContributor;
import org.graylog.datanode.configuration.variants.OpensearchSecurityConfiguration;
import org.graylog.datanode.opensearch.configuration.beans.OpensearchConfigurationPart;
import org.graylog.datanode.process.Environment;
import org.graylog.shaded.opensearch2.org.apache.http.HttpHost;

import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand All @@ -44,11 +48,10 @@ public record OpensearchConfiguration(
List<String> nodeRoles,
List<String> discoverySeedHosts,
OpensearchSecurityConfiguration opensearchSecurityConfiguration,
S3RepositoryConfiguration s3RepositoryConfiguration,
Set<OpensearchConfigurationPart> configurationParts,

String nodeSearchCacheSize,
Map<String, Object> additionalConfiguration
) implements KeystoreContributor {
) {
public Map<String, Object> asMap() {

Map<String, Object> config = new LinkedHashMap<>();
Expand All @@ -71,25 +74,35 @@ public Map<String, Object> asMap() {
}

config.put("node.name", nodeName);
config.put("node.roles", buildRolesList());

if (nodeRoles != null && !nodeRoles.isEmpty()) {
config.put("node.roles", toValuesList(nodeRoles));
}
if (discoverySeedHosts != null && !discoverySeedHosts.isEmpty()) {
config.put("discovery.seed_hosts", toValuesList(discoverySeedHosts));
}

config.put("discovery.seed_providers", "file");

config.put("node.search.cache.size", nodeSearchCacheSize);
if (s3RepositoryConfiguration.isRepositoryEnabled()) {
config.putAll(s3RepositoryConfiguration.toOpensearchProperties());
}
configurationParts.stream()
.map(OpensearchConfigurationPart::properties)
.forEach(config::putAll);

config.putAll(additionalConfiguration);
return config;
}

@Nonnull
private String buildRolesList() {
final ImmutableList.Builder<String> roles = ImmutableList.builder();
if (nodeRoles != null) {
roles.addAll(nodeRoles);
}
configurationParts.stream()
.map(OpensearchConfigurationPart::nodeRoles)
.forEach(roles::addAll);

return toValuesList(roles.build());
}

private String toValuesList(List<String> values) {
return String.join(",", values);
}
Expand Down Expand Up @@ -128,10 +141,15 @@ public boolean securityConfigured() {
}


@Override
public Map<String, String> getKeystoreItems() {
Stream<KeystoreContributor> keystoreContributorStream = Stream.of(opensearchSecurityConfiguration, s3RepositoryConfiguration);
return keystoreContributorStream.flatMap(config -> config.getKeystoreItems().entrySet().stream())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

final ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
builder.putAll(opensearchSecurityConfiguration.getKeystoreItems());

configurationParts.stream()
.map(OpensearchConfigurationPart::keystoreItems)
.forEach(builder::putAll);

return builder.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,9 @@
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*/
package org.graylog.datanode.configuration.variants;
package org.graylog.datanode.opensearch.configuration;

import java.util.Map;
import java.nio.file.Path;

public interface KeystoreContributor {
/**
* @return collection of key-value pairs that should be added to the opensearch keystore (holding secrets)
*/
Map<String, String> getKeystoreItems();
public record OpensearchUsableSpace(Path dataDir, long usableSpace) {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright (C) 2020 Graylog, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*/
package org.graylog.datanode.opensearch.configuration;

import jakarta.inject.Inject;
import jakarta.inject.Provider;
import jakarta.inject.Singleton;
import org.graylog.datanode.configuration.DatanodeConfiguration;

import java.io.IOException;
import java.nio.file.FileStore;
import java.nio.file.Files;
import java.nio.file.Path;

@Singleton
public class OpensearchUsableSpaceProvider implements Provider<OpensearchUsableSpace> {

private final Path dataTargetDir;

@Inject
public OpensearchUsableSpaceProvider(DatanodeConfiguration datanodeConfiguration) {
dataTargetDir = datanodeConfiguration.datanodeDirectories().getDataTargetDir();
}

@Override
public OpensearchUsableSpace get() {
return new OpensearchUsableSpace(dataTargetDir, getUsableSpace(dataTargetDir));
}

private static long getUsableSpace(Path opensearchDataLocation) {
final FileStore fileStore;
try {
fileStore = Files.getFileStore(opensearchDataLocation);
return fileStore.getUsableSpace();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright (C) 2020 Graylog, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*/
package org.graylog.datanode.opensearch.configuration.beans;

public interface OpensearchConfigurationBean {
OpensearchConfigurationPart buildConfigurationPart();
}
Loading

0 comments on commit fe5102b

Please sign in to comment.