Skip to content

Commit

Permalink
separating remote download and publication stats
Browse files Browse the repository at this point in the history
Signed-off-by: Himshikha Gupta <himshikh@amazon.com>
  • Loading branch information
Himshikha Gupta committed Nov 19, 2024
1 parent d4d70d8 commit 72d5889
Show file tree
Hide file tree
Showing 4 changed files with 195 additions and 151 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -298,9 +298,9 @@ PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublishRequest
}
} catch (Exception e) {
if (applyFullState) {
remoteClusterStateService.fullDownloadFailed();
remoteClusterStateService.fullIncomingPublicationFailed();
} else {
remoteClusterStateService.diffDownloadFailed();
remoteClusterStateService.diffIncomingPublicationFailed();
}
throw e;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1460,7 +1460,6 @@ ClusterState readClusterStateInParallel(
newRoutingTable = routingTableDiff.apply(previousState.getRoutingTable());
}
clusterStateBuilder.routingTable(newRoutingTable);

return clusterStateBuilder.build();
}

Expand All @@ -1470,173 +1469,191 @@ public ClusterState getClusterStateForManifest(
String localNodeId,
boolean includeEphemeral
) throws IOException {
ClusterState stateFromCache = remoteClusterStateCache.getState(clusterName, manifest);
if (stateFromCache != null) {
return stateFromCache;
}
try {
ClusterState stateFromCache = remoteClusterStateCache.getState(clusterName, manifest);
if (stateFromCache != null) {
return stateFromCache;
}

final ClusterState clusterState;
final long startTimeNanos = relativeTimeNanosSupplier.getAsLong();
if (manifest.onOrAfterCodecVersion(CODEC_V2)) {
clusterState = readClusterStateInParallel(
ClusterState.builder(new ClusterName(clusterName)).build(),
manifest,
manifest.getClusterUUID(),
localNodeId,
manifest.getIndices(),
manifest.getCustomMetadataMap(),
manifest.getCoordinationMetadata() != null,
manifest.getSettingsMetadata() != null,
includeEphemeral && manifest.getTransientSettingsMetadata() != null,
manifest.getTemplatesMetadata() != null,
includeEphemeral && manifest.getDiscoveryNodesMetadata() != null,
includeEphemeral && manifest.getClusterBlocksMetadata() != null,
includeEphemeral ? manifest.getIndicesRouting() : emptyList(),
includeEphemeral && manifest.getHashesOfConsistentSettings() != null,
includeEphemeral ? manifest.getClusterStateCustomMap() : emptyMap(),
false,
includeEphemeral
);
final ClusterState clusterState;
final long startTimeNanos = relativeTimeNanosSupplier.getAsLong();
if (manifest.onOrAfterCodecVersion(CODEC_V2)) {
clusterState = readClusterStateInParallel(
ClusterState.builder(new ClusterName(clusterName)).build(),
manifest,
manifest.getClusterUUID(),
localNodeId,
manifest.getIndices(),
manifest.getCustomMetadataMap(),
manifest.getCoordinationMetadata() != null,
manifest.getSettingsMetadata() != null,
includeEphemeral && manifest.getTransientSettingsMetadata() != null,
manifest.getTemplatesMetadata() != null,
includeEphemeral && manifest.getDiscoveryNodesMetadata() != null,
includeEphemeral && manifest.getClusterBlocksMetadata() != null,
includeEphemeral ? manifest.getIndicesRouting() : emptyList(),
includeEphemeral && manifest.getHashesOfConsistentSettings() != null,
includeEphemeral ? manifest.getClusterStateCustomMap() : emptyMap(),
false,
includeEphemeral
);

if (includeEphemeral
&& !remoteClusterStateValidationMode.equals(RemoteClusterStateValidationMode.NONE)
&& manifest.getClusterStateChecksum() != null) {
validateClusterStateFromChecksum(manifest, clusterState, clusterName, localNodeId, true);
if (includeEphemeral
&& !remoteClusterStateValidationMode.equals(RemoteClusterStateValidationMode.NONE)
&& manifest.getClusterStateChecksum() != null) {
validateClusterStateFromChecksum(manifest, clusterState, clusterName, localNodeId, true);
}
} else {
ClusterState state = readClusterStateInParallel(
ClusterState.builder(new ClusterName(clusterName)).build(),
manifest,
manifest.getClusterUUID(),
localNodeId,
manifest.getIndices(),
// for manifest codec V1, we don't have the following objects to read, so not passing anything
emptyMap(),
false,
false,
false,
false,
false,
false,
emptyList(),
false,
emptyMap(),
false,
false
);
Metadata.Builder mb = Metadata.builder(remoteGlobalMetadataManager.getGlobalMetadata(manifest.getClusterUUID(), manifest));
mb.indices(state.metadata().indices());
clusterState = ClusterState.builder(state).metadata(mb).build();
}
} else {
ClusterState state = readClusterStateInParallel(
ClusterState.builder(new ClusterName(clusterName)).build(),
manifest,
manifest.getClusterUUID(),
localNodeId,
manifest.getIndices(),
// for manifest codec V1, we don't have the following objects to read, so not passing anything
emptyMap(),
false,
false,
false,
false,
false,
false,
emptyList(),
false,
emptyMap(),
false,
false
);
Metadata.Builder mb = Metadata.builder(remoteGlobalMetadataManager.getGlobalMetadata(manifest.getClusterUUID(), manifest));
mb.indices(state.metadata().indices());
clusterState = ClusterState.builder(state).metadata(mb).build();
}
final long durationMillis = TimeValue.nsecToMSec(relativeTimeNanosSupplier.getAsLong() - startTimeNanos);
remoteStateStats.stateFullDownloadSucceeded();
remoteStateStats.stateFullDownloadTook(durationMillis);
if (includeEphemeral) {
// cache only if the entire cluster-state is present
remoteClusterStateCache.putState(clusterState);
final long durationMillis = TimeValue.nsecToMSec(relativeTimeNanosSupplier.getAsLong() - startTimeNanos);
remoteStateStats.stateFullDownloadSucceeded();
remoteStateStats.stateFullDownloadTook(durationMillis);
if (includeEphemeral) {
// cache only if the entire cluster-state is present
remoteClusterStateCache.putState(clusterState);
}
return clusterState;
} catch (Exception e) {
logger.error("Failure in downloading full cluster state. ", e);
remoteStateStats.stateFullDownloadFailed();
throw e;
}
return clusterState;
}

public ClusterState getClusterStateUsingDiff(ClusterMetadataManifest manifest, ClusterState previousState, String localNodeId) {
assert manifest.getDiffManifest() != null : "Diff manifest null which is required for downloading cluster state";
final long startTimeNanos = relativeTimeNanosSupplier.getAsLong();
ClusterStateDiffManifest diff = manifest.getDiffManifest();
boolean includeEphemeral = true;

List<UploadedIndexMetadata> updatedIndices = diff.getIndicesUpdated().stream().map(idx -> {
Optional<UploadedIndexMetadata> uploadedIndexMetadataOptional = manifest.getIndices()
.stream()
.filter(idx2 -> idx2.getIndexName().equals(idx))
.findFirst();
assert uploadedIndexMetadataOptional.isPresent() == true;
return uploadedIndexMetadataOptional.get();
}).collect(Collectors.toList());

Map<String, UploadedMetadataAttribute> updatedCustomMetadata = new HashMap<>();
if (diff.getCustomMetadataUpdated() != null) {
for (String customType : diff.getCustomMetadataUpdated()) {
updatedCustomMetadata.put(customType, manifest.getCustomMetadataMap().get(customType));
try {
assert manifest.getDiffManifest() != null : "Diff manifest null which is required for downloading cluster state";
final long startTimeNanos = relativeTimeNanosSupplier.getAsLong();
ClusterStateDiffManifest diff = manifest.getDiffManifest();
boolean includeEphemeral = true;

List<UploadedIndexMetadata> updatedIndices = diff.getIndicesUpdated().stream().map(idx -> {
Optional<UploadedIndexMetadata> uploadedIndexMetadataOptional = manifest.getIndices()
.stream()
.filter(idx2 -> idx2.getIndexName().equals(idx))
.findFirst();
assert uploadedIndexMetadataOptional.isPresent() == true;
return uploadedIndexMetadataOptional.get();
}).collect(Collectors.toList());

Map<String, UploadedMetadataAttribute> updatedCustomMetadata = new HashMap<>();
if (diff.getCustomMetadataUpdated() != null) {
for (String customType : diff.getCustomMetadataUpdated()) {
updatedCustomMetadata.put(customType, manifest.getCustomMetadataMap().get(customType));
}
}
}
Map<String, UploadedMetadataAttribute> updatedClusterStateCustom = new HashMap<>();
if (diff.getClusterStateCustomUpdated() != null) {
for (String customType : diff.getClusterStateCustomUpdated()) {
updatedClusterStateCustom.put(customType, manifest.getClusterStateCustomMap().get(customType));
Map<String, UploadedMetadataAttribute> updatedClusterStateCustom = new HashMap<>();
if (diff.getClusterStateCustomUpdated() != null) {
for (String customType : diff.getClusterStateCustomUpdated()) {
updatedClusterStateCustom.put(customType, manifest.getClusterStateCustomMap().get(customType));
}
}

List<UploadedIndexMetadata> updatedIndexRouting = new ArrayList<>();
if (manifest.getCodecVersion() == CODEC_V2 || manifest.getCodecVersion() == CODEC_V3) {
updatedIndexRouting.addAll(
remoteRoutingTableService.getUpdatedIndexRoutingTableMetadata(
diff.getIndicesRoutingUpdated(),
manifest.getIndicesRouting()
)
);
}
}

List<UploadedIndexMetadata> updatedIndexRouting = new ArrayList<>();
if (manifest.getCodecVersion() == CODEC_V2 || manifest.getCodecVersion() == CODEC_V3) {
updatedIndexRouting.addAll(
remoteRoutingTableService.getUpdatedIndexRoutingTableMetadata(diff.getIndicesRoutingUpdated(), manifest.getIndicesRouting())
ClusterState updatedClusterState = readClusterStateInParallel(
previousState,
manifest,
manifest.getClusterUUID(),
localNodeId,
updatedIndices,
updatedCustomMetadata,
diff.isCoordinationMetadataUpdated(),
diff.isSettingsMetadataUpdated(),
diff.isTransientSettingsMetadataUpdated(),
diff.isTemplatesMetadataUpdated(),
diff.isDiscoveryNodesUpdated(),
diff.isClusterBlocksUpdated(),
updatedIndexRouting,
diff.isHashesOfConsistentSettingsUpdated(),
updatedClusterStateCustom,
manifest.getDiffManifest() != null
&& manifest.getDiffManifest().getIndicesRoutingDiffPath() != null
&& !manifest.getDiffManifest().getIndicesRoutingDiffPath().isEmpty(),
includeEphemeral
);
}
ClusterState.Builder clusterStateBuilder = ClusterState.builder(updatedClusterState);
Metadata.Builder metadataBuilder = Metadata.builder(updatedClusterState.metadata());
// remove the deleted indices from the metadata
for (String index : diff.getIndicesDeleted()) {
metadataBuilder.remove(index);
}
// remove the deleted metadata customs from the metadata
if (diff.getCustomMetadataDeleted() != null) {
for (String customType : diff.getCustomMetadataDeleted()) {
metadataBuilder.removeCustom(customType);
}
}

ClusterState updatedClusterState = readClusterStateInParallel(
previousState,
manifest,
manifest.getClusterUUID(),
localNodeId,
updatedIndices,
updatedCustomMetadata,
diff.isCoordinationMetadataUpdated(),
diff.isSettingsMetadataUpdated(),
diff.isTransientSettingsMetadataUpdated(),
diff.isTemplatesMetadataUpdated(),
diff.isDiscoveryNodesUpdated(),
diff.isClusterBlocksUpdated(),
updatedIndexRouting,
diff.isHashesOfConsistentSettingsUpdated(),
updatedClusterStateCustom,
manifest.getDiffManifest() != null
&& manifest.getDiffManifest().getIndicesRoutingDiffPath() != null
&& !manifest.getDiffManifest().getIndicesRoutingDiffPath().isEmpty(),
includeEphemeral
);
ClusterState.Builder clusterStateBuilder = ClusterState.builder(updatedClusterState);
Metadata.Builder metadataBuilder = Metadata.builder(updatedClusterState.metadata());
// remove the deleted indices from the metadata
for (String index : diff.getIndicesDeleted()) {
metadataBuilder.remove(index);
}
// remove the deleted metadata customs from the metadata
if (diff.getCustomMetadataDeleted() != null) {
for (String customType : diff.getCustomMetadataDeleted()) {
metadataBuilder.removeCustom(customType);
// remove the deleted cluster state customs from the metadata
if (diff.getClusterStateCustomDeleted() != null) {
for (String customType : diff.getClusterStateCustomDeleted()) {
clusterStateBuilder.removeCustom(customType);
}
}
}

// remove the deleted cluster state customs from the metadata
if (diff.getClusterStateCustomDeleted() != null) {
for (String customType : diff.getClusterStateCustomDeleted()) {
clusterStateBuilder.removeCustom(customType);
HashMap<String, IndexRoutingTable> indexRoutingTables = new HashMap<>(
updatedClusterState.getRoutingTable().getIndicesRouting()
);
if (manifest.getCodecVersion() == CODEC_V2 || manifest.getCodecVersion() == CODEC_V3) {
for (String indexName : diff.getIndicesRoutingDeleted()) {
indexRoutingTables.remove(indexName);
}
}
}

HashMap<String, IndexRoutingTable> indexRoutingTables = new HashMap<>(updatedClusterState.getRoutingTable().getIndicesRouting());
if (manifest.getCodecVersion() == CODEC_V2 || manifest.getCodecVersion() == CODEC_V3) {
for (String indexName : diff.getIndicesRoutingDeleted()) {
indexRoutingTables.remove(indexName);
ClusterState clusterState = clusterStateBuilder.stateUUID(manifest.getStateUUID())
.version(manifest.getStateVersion())
.metadata(metadataBuilder)
.routingTable(new RoutingTable(manifest.getRoutingTableVersion(), indexRoutingTables))
.build();
if (!remoteClusterStateValidationMode.equals(RemoteClusterStateValidationMode.NONE)
&& manifest.getClusterStateChecksum() != null) {
validateClusterStateFromChecksum(manifest, clusterState, previousState.getClusterName().value(), localNodeId, false);
}
}
final long durationMillis = TimeValue.nsecToMSec(relativeTimeNanosSupplier.getAsLong() - startTimeNanos);
remoteStateStats.stateDiffDownloadSucceeded();
remoteStateStats.stateDiffDownloadTook(durationMillis);

ClusterState clusterState = clusterStateBuilder.stateUUID(manifest.getStateUUID())
.version(manifest.getStateVersion())
.metadata(metadataBuilder)
.routingTable(new RoutingTable(manifest.getRoutingTableVersion(), indexRoutingTables))
.build();
if (!remoteClusterStateValidationMode.equals(RemoteClusterStateValidationMode.NONE) && manifest.getClusterStateChecksum() != null) {
validateClusterStateFromChecksum(manifest, clusterState, previousState.getClusterName().value(), localNodeId, false);
assert includeEphemeral == true;
// newState includes all the fields of cluster-state (includeEphemeral=true always)
remoteClusterStateCache.putState(clusterState);
return clusterState;
} catch (Exception e) {
logger.error("Failure in downloading diff cluster state. ", e);
remoteStateStats.stateDiffDownloadFailed();
throw e;
}
final long durationMillis = TimeValue.nsecToMSec(relativeTimeNanosSupplier.getAsLong() - startTimeNanos);
remoteStateStats.stateDiffDownloadSucceeded();
remoteStateStats.stateDiffDownloadTook(durationMillis);

assert includeEphemeral == true;
// newState includes all the fields of cluster-state (includeEphemeral=true always)
remoteClusterStateCache.putState(clusterState);
return clusterState;
}

void validateClusterStateFromChecksum(
Expand Down Expand Up @@ -2036,6 +2053,14 @@ public void diffDownloadFailed() {
remoteStateStats.stateDiffDownloadFailed();
}

public void fullIncomingPublicationFailed() {
remoteStateStats.stateFullIncomingPublicationFailed();
}

public void diffIncomingPublicationFailed() {
remoteStateStats.stateDiffIncomingPublicationFailed();
}

RemoteClusterStateCache getRemoteClusterStateCache() {
return remoteClusterStateCache;
}
Expand Down
Loading

0 comments on commit 72d5889

Please sign in to comment.