Skip to content

Commit

Permalink
Introduce onResult2 in NameResolver Listener2 that returns Status (#1…
Browse files Browse the repository at this point in the history
…1313)

Introducing NameResolver listener method "Status Listener2::onResult2(ResolutionResult)" that returns Status of the acceptance of the name resolution by the load balancer, and the Name Resolver will call this method for both success and error cases.
  • Loading branch information
kannanjgithub authored Jul 26, 2024
1 parent 786523d commit 9ba2f9d
Show file tree
Hide file tree
Showing 8 changed files with 501 additions and 168 deletions.
10 changes: 10 additions & 0 deletions api/src/main/java/io/grpc/NameResolver.java
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,16 @@ public final void onAddresses(
*/
@Override
public abstract void onError(Status error);

/**
* Handles updates on resolved addresses and attributes.
*
* @param resolutionResult the resolved server addresses, attributes, and Service Config.
* @since 1.66
*/
public Status onResult2(ResolutionResult resolutionResult) {
throw new UnsupportedOperationException("Not implemented.");
}
}

/**
Expand Down
4 changes: 3 additions & 1 deletion core/src/main/java/io/grpc/internal/DnsNameResolver.java
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,9 @@ public void run() {
resolutionResultBuilder.setAttributes(result.attributes);
}
}
savedListener.onResult(resolutionResultBuilder.build());
syncContext.execute(() -> {
savedListener.onResult2(resolutionResultBuilder.build());
});
} catch (IOException e) {
savedListener.onError(
Status.UNAVAILABLE.withDescription("Unable to resolve host " + host).withCause(e));
Expand Down
255 changes: 128 additions & 127 deletions core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -1673,146 +1673,147 @@ final class NameResolverListener extends NameResolver.Listener2 {
public void onResult(final ResolutionResult resolutionResult) {
final class NamesResolved implements Runnable {

@SuppressWarnings("ReferenceEquality")
@Override
public void run() {
if (ManagedChannelImpl.this.nameResolver != resolver) {
return;
}

List<EquivalentAddressGroup> servers = resolutionResult.getAddresses();
channelLogger.log(
ChannelLogLevel.DEBUG,
"Resolved address: {0}, config={1}",
servers,
resolutionResult.getAttributes());

if (lastResolutionState != ResolutionState.SUCCESS) {
channelLogger.log(ChannelLogLevel.INFO, "Address resolved: {0}", servers);
lastResolutionState = ResolutionState.SUCCESS;
}

ConfigOrError configOrError = resolutionResult.getServiceConfig();
Status status = onResult2(resolutionResult);
ResolutionResultListener resolutionResultListener = resolutionResult.getAttributes()
.get(RetryingNameResolver.RESOLUTION_RESULT_LISTENER_KEY);
InternalConfigSelector resolvedConfigSelector =
resolutionResult.getAttributes().get(InternalConfigSelector.KEY);
ManagedChannelServiceConfig validServiceConfig =
configOrError != null && configOrError.getConfig() != null
? (ManagedChannelServiceConfig) configOrError.getConfig()
: null;
Status serviceConfigError = configOrError != null ? configOrError.getError() : null;

ManagedChannelServiceConfig effectiveServiceConfig;
if (!lookUpServiceConfig) {
if (validServiceConfig != null) {
channelLogger.log(
ChannelLogLevel.INFO,
"Service config from name resolver discarded by channel settings");
}
effectiveServiceConfig =
defaultServiceConfig == null ? EMPTY_SERVICE_CONFIG : defaultServiceConfig;
if (resolvedConfigSelector != null) {
resolutionResultListener.resolutionAttempted(status);
}
}

syncContext.execute(new NamesResolved());
}

@SuppressWarnings("ReferenceEquality")
@Override
public Status onResult2(final ResolutionResult resolutionResult) {
syncContext.throwIfNotInThisSynchronizationContext();
if (ManagedChannelImpl.this.nameResolver != resolver) {
return Status.OK;
}

List<EquivalentAddressGroup> servers = resolutionResult.getAddresses();
channelLogger.log(
ChannelLogLevel.DEBUG,
"Resolved address: {0}, config={1}",
servers,
resolutionResult.getAttributes());

if (lastResolutionState != ResolutionState.SUCCESS) {
channelLogger.log(ChannelLogLevel.INFO, "Address resolved: {0}", servers);
lastResolutionState = ResolutionState.SUCCESS;
}

ConfigOrError configOrError = resolutionResult.getServiceConfig();
InternalConfigSelector resolvedConfigSelector =
resolutionResult.getAttributes().get(InternalConfigSelector.KEY);
ManagedChannelServiceConfig validServiceConfig =
configOrError != null && configOrError.getConfig() != null
? (ManagedChannelServiceConfig) configOrError.getConfig()
: null;
Status serviceConfigError = configOrError != null ? configOrError.getError() : null;

ManagedChannelServiceConfig effectiveServiceConfig;
if (!lookUpServiceConfig) {
if (validServiceConfig != null) {
channelLogger.log(
ChannelLogLevel.INFO,
"Service config from name resolver discarded by channel settings");
}
effectiveServiceConfig =
defaultServiceConfig == null ? EMPTY_SERVICE_CONFIG : defaultServiceConfig;
if (resolvedConfigSelector != null) {
channelLogger.log(
ChannelLogLevel.INFO,
"Config selector from name resolver discarded by channel settings");
}
realChannel.updateConfigSelector(effectiveServiceConfig.getDefaultConfigSelector());
} else {
// Try to use config if returned from name resolver
// Otherwise, try to use the default config if available
if (validServiceConfig != null) {
effectiveServiceConfig = validServiceConfig;
if (resolvedConfigSelector != null) {
realChannel.updateConfigSelector(resolvedConfigSelector);
if (effectiveServiceConfig.getDefaultConfigSelector() != null) {
channelLogger.log(
ChannelLogLevel.INFO,
"Config selector from name resolver discarded by channel settings");
ChannelLogLevel.DEBUG,
"Method configs in service config will be discarded due to presence of"
+ "config-selector");
}
} else {
realChannel.updateConfigSelector(effectiveServiceConfig.getDefaultConfigSelector());
}
} else if (defaultServiceConfig != null) {
effectiveServiceConfig = defaultServiceConfig;
realChannel.updateConfigSelector(effectiveServiceConfig.getDefaultConfigSelector());
channelLogger.log(
ChannelLogLevel.INFO,
"Received no service config, using default service config");
} else if (serviceConfigError != null) {
if (!serviceConfigUpdated) {
// First DNS lookup has invalid service config, and cannot fall back to default
channelLogger.log(
ChannelLogLevel.INFO,
"Fallback to error due to invalid first service config without default config");
// This error could be an "inappropriate" control plane error that should not bleed
// through to client code using gRPC. We let them flow through here to the LB as
// we later check for these error codes when investigating pick results in
// GrpcUtil.getTransportFromPickResult().
onError(configOrError.getError());
return configOrError.getError();
} else {
// Try to use config if returned from name resolver
// Otherwise, try to use the default config if available
if (validServiceConfig != null) {
effectiveServiceConfig = validServiceConfig;
if (resolvedConfigSelector != null) {
realChannel.updateConfigSelector(resolvedConfigSelector);
if (effectiveServiceConfig.getDefaultConfigSelector() != null) {
channelLogger.log(
ChannelLogLevel.DEBUG,
"Method configs in service config will be discarded due to presence of"
+ "config-selector");
}
} else {
realChannel.updateConfigSelector(effectiveServiceConfig.getDefaultConfigSelector());
}
} else if (defaultServiceConfig != null) {
effectiveServiceConfig = defaultServiceConfig;
realChannel.updateConfigSelector(effectiveServiceConfig.getDefaultConfigSelector());
channelLogger.log(
ChannelLogLevel.INFO,
"Received no service config, using default service config");
} else if (serviceConfigError != null) {
if (!serviceConfigUpdated) {
// First DNS lookup has invalid service config, and cannot fall back to default
channelLogger.log(
ChannelLogLevel.INFO,
"Fallback to error due to invalid first service config without default config");
// This error could be an "inappropriate" control plane error that should not bleed
// through to client code using gRPC. We let them flow through here to the LB as
// we later check for these error codes when investigating pick results in
// GrpcUtil.getTransportFromPickResult().
onError(configOrError.getError());
if (resolutionResultListener != null) {
resolutionResultListener.resolutionAttempted(configOrError.getError());
}
return;
} else {
effectiveServiceConfig = lastServiceConfig;
}
} else {
effectiveServiceConfig = EMPTY_SERVICE_CONFIG;
realChannel.updateConfigSelector(null);
}
if (!effectiveServiceConfig.equals(lastServiceConfig)) {
channelLogger.log(
ChannelLogLevel.INFO,
"Service config changed{0}",
effectiveServiceConfig == EMPTY_SERVICE_CONFIG ? " to empty" : "");
lastServiceConfig = effectiveServiceConfig;
transportProvider.throttle = effectiveServiceConfig.getRetryThrottling();
}

try {
// TODO(creamsoup): when `servers` is empty and lastResolutionStateCopy == SUCCESS
// and lbNeedAddress, it shouldn't call the handleServiceConfigUpdate. But,
// lbNeedAddress is not deterministic
serviceConfigUpdated = true;
} catch (RuntimeException re) {
logger.log(
Level.WARNING,
"[" + getLogId() + "] Unexpected exception from parsing service config",
re);
}
effectiveServiceConfig = lastServiceConfig;
}
} else {
effectiveServiceConfig = EMPTY_SERVICE_CONFIG;
realChannel.updateConfigSelector(null);
}
if (!effectiveServiceConfig.equals(lastServiceConfig)) {
channelLogger.log(
ChannelLogLevel.INFO,
"Service config changed{0}",
effectiveServiceConfig == EMPTY_SERVICE_CONFIG ? " to empty" : "");
lastServiceConfig = effectiveServiceConfig;
transportProvider.throttle = effectiveServiceConfig.getRetryThrottling();
}

Attributes effectiveAttrs = resolutionResult.getAttributes();
// Call LB only if it's not shutdown. If LB is shutdown, lbHelper won't match.
if (NameResolverListener.this.helper == ManagedChannelImpl.this.lbHelper) {
Attributes.Builder attrBuilder =
effectiveAttrs.toBuilder().discard(InternalConfigSelector.KEY);
Map<String, ?> healthCheckingConfig =
effectiveServiceConfig.getHealthCheckingConfig();
if (healthCheckingConfig != null) {
attrBuilder
.set(LoadBalancer.ATTR_HEALTH_CHECKING_CONFIG, healthCheckingConfig)
.build();
}
Attributes attributes = attrBuilder.build();

Status addressAcceptanceStatus = helper.lb.tryAcceptResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(servers)
.setAttributes(attributes)
.setLoadBalancingPolicyConfig(effectiveServiceConfig.getLoadBalancingConfig())
.build());
// If a listener is provided, let it know if the addresses were accepted.
if (resolutionResultListener != null) {
resolutionResultListener.resolutionAttempted(addressAcceptanceStatus);
}
}
try {
// TODO(creamsoup): when `servers` is empty and lastResolutionStateCopy == SUCCESS
// and lbNeedAddress, it shouldn't call the handleServiceConfigUpdate. But,
// lbNeedAddress is not deterministic
serviceConfigUpdated = true;
} catch (RuntimeException re) {
logger.log(
Level.WARNING,
"[" + getLogId() + "] Unexpected exception from parsing service config",
re);
}
}

syncContext.execute(new NamesResolved());
Attributes effectiveAttrs = resolutionResult.getAttributes();
// Call LB only if it's not shutdown. If LB is shutdown, lbHelper won't match.
if (NameResolverListener.this.helper == ManagedChannelImpl.this.lbHelper) {
Attributes.Builder attrBuilder =
effectiveAttrs.toBuilder().discard(InternalConfigSelector.KEY);
Map<String, ?> healthCheckingConfig =
effectiveServiceConfig.getHealthCheckingConfig();
if (healthCheckingConfig != null) {
attrBuilder
.set(LoadBalancer.ATTR_HEALTH_CHECKING_CONFIG, healthCheckingConfig)
.build();
}
Attributes attributes = attrBuilder.build();

return helper.lb.tryAcceptResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(servers)
.setAttributes(attributes)
.setLoadBalancingPolicyConfig(effectiveServiceConfig.getLoadBalancingConfig())
.build());
}
return Status.OK;
}

@Override
Expand Down
12 changes: 12 additions & 0 deletions core/src/main/java/io/grpc/internal/RetryingNameResolver.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,24 @@ public void onResult(ResolutionResult resolutionResult) {
"RetryingNameResolver can only be used once to wrap a NameResolver");
}

// To have retry behavior for name resolvers that haven't migrated to onResult2.
delegateListener.onResult(resolutionResult.toBuilder().setAttributes(
resolutionResult.getAttributes().toBuilder()
.set(RESOLUTION_RESULT_LISTENER_KEY, new ResolutionResultListener()).build())
.build());
}

@Override
public Status onResult2(ResolutionResult resolutionResult) {
Status status = delegateListener.onResult2(resolutionResult);
if (status.isOk()) {
retryScheduler.reset();
} else {
retryScheduler.schedule(new DelayedNameResolverRefresh());
}
return status;
}

@Override
public void onError(Status error) {
delegateListener.onError(error);
Expand Down
Loading

0 comments on commit 9ba2f9d

Please sign in to comment.