Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement new DualReadMonitor for UriProperties #999

Merged
merged 17 commits into from
May 10, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ and what APIs have changed, if applicable.

## [Unreleased]

## [29.54.0] - 2024-05-08
- Dual read monitors cluster uris similarity

## [29.53.1] - 2024-04-24
- Remove emitting SD event for receiving URI data update

Expand Down Expand Up @@ -5686,7 +5689,8 @@ patch operations can re-use these classes for generating patch messages.

## [0.14.1]

[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.53.1...master
[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.54.0...master
[29.54.0]: https://github.com/linkedin/rest.li/compare/v29.53.1...v29.54.0
[29.53.1]: https://github.com/linkedin/rest.li/compare/v29.53.0...v29.53.1
[29.53.0]: https://github.com/linkedin/rest.li/compare/v29.52.1...v29.53.0
[29.52.1]: https://github.com/linkedin/rest.li/compare/v29.52.0...v29.52.1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,27 @@

package com.linkedin.d2.balancer.dualread;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;


public class DualReadLoadBalancerJmx implements DualReadLoadBalancerJmxMBean
{
private final AtomicInteger _servicePropertiesErrorCount = new AtomicInteger();
private final AtomicInteger _clusterPropertiesErrorCount = new AtomicInteger();
private final AtomicInteger _uriPropertiesErrorCount = new AtomicInteger();

private final AtomicInteger _servicePropertiesEvictCount = new AtomicInteger();
private final AtomicInteger _clusterPropertiesEvictCount = new AtomicInteger();
private final AtomicInteger _uriPropertiesEvictCount = new AtomicInteger();

private final AtomicInteger _servicePropertiesOutOfSyncCount = new AtomicInteger();
private final AtomicInteger _clusterPropertiesOutOfSyncCount = new AtomicInteger();
private final AtomicInteger _uriPropertiesOutOfSyncCount = new AtomicInteger();

private final AtomicReference<Double> _uriPropertiesSimilarity = new AtomicReference<>(0d);

private final Map<String, UriPropertiesDualReadMonitor.ClusterMatchRecord> _clusters = new HashMap<>();
bohhyang marked this conversation as resolved.
Show resolved Hide resolved


@Override
Expand All @@ -46,10 +51,11 @@ public int getClusterPropertiesErrorCount()
return _clusterPropertiesErrorCount.get();
}

@Deprecated
@Override
public int getUriPropertiesErrorCount()
{
return _uriPropertiesErrorCount.get();
return 0;
}

@Override
Expand All @@ -64,25 +70,41 @@ public int getClusterPropertiesEvictCount()
return _clusterPropertiesEvictCount.get();
}

@Deprecated
@Override
public int getUriPropertiesEvictCount()
{
return _uriPropertiesEvictCount.get();
return 0;
}

@Override
public int getServicePropertiesOutOfSyncCount() {
public int getServicePropertiesOutOfSyncCount()
{
return _servicePropertiesOutOfSyncCount.get();
}

@Override
public int getClusterPropertiesOutOfSyncCount() {
public int getClusterPropertiesOutOfSyncCount()
{
return _clusterPropertiesOutOfSyncCount.get();
}

@Deprecated
@Override
public int getUriPropertiesOutOfSyncCount()
{
return 0;
}

@Override
public double getUriPropertiesSimilarity()
{
return _uriPropertiesSimilarity.get();
}

@Override
public int getUriPropertiesOutOfSyncCount() {
return _uriPropertiesOutOfSyncCount.get();
public @Nullable UriPropertiesDualReadMonitor.ClusterMatchRecord getClusterMatchRecord(String clusterName) {
return _clusters.get(clusterName);
}

public void incrementServicePropertiesErrorCount()
Expand All @@ -95,11 +117,6 @@ public void incrementClusterPropertiesErrorCount()
_clusterPropertiesErrorCount.incrementAndGet();
}

public void incrementUriPropertiesErrorCount()
{
_uriPropertiesErrorCount.incrementAndGet();
}

public void incrementServicePropertiesEvictCount()
{
_servicePropertiesEvictCount.incrementAndGet();
Expand All @@ -110,11 +127,6 @@ public void incrementClusterPropertiesEvictCount()
_clusterPropertiesEvictCount.incrementAndGet();
}

public void incrementUriPropertiesEvictCount()
{
_uriPropertiesEvictCount.incrementAndGet();
}

public void incrementServicePropertiesOutOfSyncCount()
{
_servicePropertiesOutOfSyncCount.incrementAndGet();
Expand All @@ -125,11 +137,6 @@ public void incrementClusterPropertiesOutOfSyncCount()
_clusterPropertiesOutOfSyncCount.incrementAndGet();
}

public void incrementUriPropertiesOutOfSyncCount()
{
_uriPropertiesOutOfSyncCount.incrementAndGet();
}

public void decrementServicePropertiesOutOfSyncCount()
{
_servicePropertiesOutOfSyncCount.decrementAndGet();
Expand All @@ -140,8 +147,14 @@ public void decrementClusterPropertiesOutOfSyncCount()
_clusterPropertiesOutOfSyncCount.decrementAndGet();
}

public void decrementUriPropertiesOutOfSyncCount()
public void setUriPropertiesSimilarity(double similarity)
{
_uriPropertiesSimilarity.set(similarity);
}

public void setClusterMatchRecord(String clusterName,
UriPropertiesDualReadMonitor.ClusterMatchRecord clusterMatchRecord)
{
_uriPropertiesOutOfSyncCount.decrementAndGet();
_clusters.put(clusterName, clusterMatchRecord);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@ public interface DualReadLoadBalancerJmxMBean

int getClusterPropertiesErrorCount();

@Deprecated
int getUriPropertiesErrorCount();

// Evict count is incremented when cache grows to the max size and entries get evicted.
int getServicePropertiesEvictCount();

int getClusterPropertiesEvictCount();

@Deprecated
int getUriPropertiesEvictCount();

// Entries become out of sync when:
Expand All @@ -41,5 +43,13 @@ public interface DualReadLoadBalancerJmxMBean

int getClusterPropertiesOutOfSyncCount();

@Deprecated
int getUriPropertiesOutOfSyncCount();

// Similarity is calculated as the ratio of the number of URIs that are common between the two LBs to the total
// number of URIs in both LBs.
double getUriPropertiesSimilarity();

// Returns the ClusterMatchRecord for the given clusterName.
UriPropertiesDualReadMonitor.ClusterMatchRecord getClusterMatchRecord(String clusterName);
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@
import com.google.common.cache.RemovalCause;
import com.linkedin.d2.balancer.properties.ClusterProperties;
import com.linkedin.d2.balancer.properties.ServiceProperties;
import com.linkedin.d2.balancer.properties.UriProperties;
import com.linkedin.util.RateLimitedLogger;
import com.linkedin.util.clock.Clock;
import java.time.Instant;
import java.time.ZoneId;
Expand All @@ -42,26 +40,27 @@
*
* When a new service discovery data is reported, it will check if the cache of the other data source
* has data for the same property name. If there is, it will compare whether the two data are equal.
*
* Note that there are only two implementations of this class, one for {@link ServiceProperties} and one for
* {@link ClusterProperties}, and not one for {@link com.linkedin.d2.balancer.properties.UriProperties}. This is because
* the URI properties need to be compared holistically at the cluster level.
*/
public abstract class DualReadLoadBalancerMonitor<T>
{
private static final Logger LOG = LoggerFactory.getLogger(DualReadLoadBalancerMonitor.class);
public final static String DEFAULT_DATE_FORMAT = "YYYY/MM/dd HH:mm:ss.SSS";
public final static String VERSION_FROM_FS = "-1";
private static final long ERROR_REPORT_PERIOD = 600 * 1000; // Limit error report logging to every 10 minutes
private static final int MAX_CACHE_SIZE = 10000;
private final Cache<String, CacheEntry<T>> _oldLbPropertyCache;
private final Cache<String, CacheEntry<T>> _newLbPropertyCache;
private final RateLimitedLogger _rateLimitedLogger;
private final Clock _clock;
private final DateTimeFormatter _format;


public DualReadLoadBalancerMonitor(Clock clock)
private DualReadLoadBalancerMonitor(Clock clock)
{
_oldLbPropertyCache = buildCache();
_newLbPropertyCache = buildCache();
_rateLimitedLogger = new RateLimitedLogger(LOG, ERROR_REPORT_PERIOD, clock);
_clock = clock;
_format = DateTimeFormatter.ofPattern(DEFAULT_DATE_FORMAT);
}
Expand All @@ -72,7 +71,6 @@ public void reportData(String propertyName, T property, String propertyVersion,
Cache<String, CacheEntry<T>> cacheToAdd = fromNewLb ? _newLbPropertyCache : _oldLbPropertyCache;
CacheEntry<T> existingEntry = cacheToAdd.getIfPresent(propertyName);
String propertyClassName = property.getClass().getSimpleName();
boolean isUriProp = property instanceof UriProperties;
PapaCharlie marked this conversation as resolved.
Show resolved Hide resolved

if (existingEntry != null && existingEntry._data.equals(property))
{
Expand All @@ -92,19 +90,10 @@ public void reportData(String propertyName, T property, String propertyVersion,
// different.
if (!isReadFromFS(existingEntry._version, propertyVersion))
{
String msg = String.format("Received same data of different versions in %s LB for %s: %s."
+ " Old version: %s, New version: %s, Data: %s",
fromNewLb ? "New" : "Old", propertyClassName, propertyName, existingEntry._version,
propertyVersion, property);

if (isUriProp)
{
LOG.debug(msg);
}
else
{
warnByPropType(isUriProp, msg);
}
LOG.warn("Received same data of different versions in {} LB for {} {}"
+ " Old version: {} New version: {} Data: {}",
fromNewLb ? "New" : "Old", propertyClassName, propertyName, existingEntry._version,
propertyVersion, property);
}
// still need to put in the cache, don't skip
}
Expand Down Expand Up @@ -136,23 +125,14 @@ else if (!isDataEqual && isVersionEqual)
{ // data is not the same but version is the same, a mismatch!
incrementPropertiesErrorCount();
incrementEntryOutOfSyncCount(); // increment the out-of-sync count for the entry received later
warnByPropType(isUriProp,
String.format("Received mismatched %s for %s. %s", propertyClassName, propertyName, entriesLogMsg));
LOG.warn("Received mismatched {} for {}. {}", propertyClassName, propertyName, entriesLogMsg);
cacheToCompare.invalidate(propertyName);
}
else {
if (isDataEqual)
{
String msg = String.format("Received same data of %s for %s but with different versions: %s",
LOG.warn("Received same data of {} for {} but with different versions: {}",
propertyClassName, propertyName, entriesLogMsg);
if (isUriProp)
{
LOG.debug(msg);
}
else
{
warnByPropType(isUriProp, msg);
}
}
cacheToAdd.put(propertyName, newEntry);
incrementEntryOutOfSyncCount();
Expand Down Expand Up @@ -209,18 +189,6 @@ private boolean isReadFromFS(String v1, String v2)
return v1.startsWith(VERSION_FROM_FS) || v2.startsWith(VERSION_FROM_FS);
}

private void warnByPropType(boolean isUriProp, String msg)
{
if (isUriProp)
{
_rateLimitedLogger.warn(msg);
}
else
{
LOG.warn(msg);
}
}

@VisibleForTesting
String getEntriesMessage(boolean fromNewLb, CacheEntry<T> oldE, CacheEntry<T> newE)
{
Expand Down Expand Up @@ -319,37 +287,4 @@ void onEvict()
_dualReadLoadBalancerJmx.incrementServicePropertiesEvictCount();
}
}

public static final class UriPropertiesDualReadMonitor extends DualReadLoadBalancerMonitor<UriProperties>
{
private final DualReadLoadBalancerJmx _dualReadLoadBalancerJmx;

public UriPropertiesDualReadMonitor(DualReadLoadBalancerJmx dualReadLoadBalancerJmx, Clock clock)
{
super(clock);
_dualReadLoadBalancerJmx = dualReadLoadBalancerJmx;
}

@Override
void incrementEntryOutOfSyncCount() {
_dualReadLoadBalancerJmx.incrementUriPropertiesOutOfSyncCount();
}

@Override
void decrementEntryOutOfSyncCount() {
_dualReadLoadBalancerJmx.decrementUriPropertiesOutOfSyncCount();
}

@Override
void incrementPropertiesErrorCount()
{
_dualReadLoadBalancerJmx.incrementUriPropertiesErrorCount();
}

@Override
void onEvict()
{
_dualReadLoadBalancerJmx.incrementUriPropertiesEvictCount();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,17 +60,24 @@ public class DualReadStateManager

private final DualReadLoadBalancerJmx _dualReadLoadBalancerJmx;

private final DualReadLoadBalancerMonitor.UriPropertiesDualReadMonitor _uriPropertiesDualReadMonitor;
private final DualReadLoadBalancerMonitor.ServicePropertiesDualReadMonitor _servicePropertiesDualReadMonitor;
private final DualReadLoadBalancerMonitor.ClusterPropertiesDualReadMonitor _clusterPropertiesDualReadMonitor;
private final UriPropertiesDualReadMonitor _uriPropertiesDualReadMonitor;
private final boolean _monitorUriProperties;


@Deprecated
public DualReadStateManager(DualReadModeProvider dualReadModeProvider, ScheduledExecutorService executorService)
{
this(dualReadModeProvider, executorService, false);
bohhyang marked this conversation as resolved.
Show resolved Hide resolved
}

public DualReadStateManager(DualReadModeProvider dualReadModeProvider, ScheduledExecutorService executorService,
boolean monitorUriProperties)
{
_dualReadLoadBalancerJmx = new DualReadLoadBalancerJmx();
Clock clock = SystemClock.instance();
_uriPropertiesDualReadMonitor = new DualReadLoadBalancerMonitor.UriPropertiesDualReadMonitor(
_dualReadLoadBalancerJmx, clock);
_monitorUriProperties = monitorUriProperties;
_uriPropertiesDualReadMonitor = new UriPropertiesDualReadMonitor(_dualReadLoadBalancerJmx);
_servicePropertiesDualReadMonitor = new DualReadLoadBalancerMonitor.ServicePropertiesDualReadMonitor(
_dualReadLoadBalancerJmx, clock);
_clusterPropertiesDualReadMonitor = new DualReadLoadBalancerMonitor.ClusterPropertiesDualReadMonitor(
Expand Down Expand Up @@ -172,10 +179,10 @@ private void reportClusterPropertiesData(String propertyName, ClusterProperties

private void reportUriPropertiesData(String propertyName, UriProperties property, boolean fromNewLb)
{
if (_clusterDualReadModes.getOrDefault(propertyName, _dualReadMode) == DualReadModeProvider.DualReadMode.DUAL_READ)
if (_monitorUriProperties &&
_clusterDualReadModes.getOrDefault(propertyName, _dualReadMode) == DualReadModeProvider.DualReadMode.DUAL_READ)
{
String version = property.getVersion() + "|" + property.Uris().size();
_uriPropertiesDualReadMonitor.reportData(propertyName, property, version, fromNewLb);
_uriPropertiesDualReadMonitor.reportData(propertyName, property, fromNewLb);
}
}

Expand Down
Loading
Loading