Skip to content

Commit

Permalink
Implement new DualReadMonitor for UriProperties (#999)
Browse files Browse the repository at this point in the history
* Implement new DualReadMonitor for UriProperties

This new monitor should be as expensive as the previous one since the previous
one called the `.equals` method on `UriProperties`, which compares every single
URI. It now produces a similarity metric which represents the fraction of
matching hosts present in the ZK response and the observer response. The Cluster
and Service monitors remain unchanged.

* Update logging strategy

* add jmx method and tests, and put behind a config

* update changelog

* make it thread safe

* lock per cluster

* use cluster match record itself as lock and add test

* execute multi-thread test multiple times

* check if tasks completed before executor timeout

* modify test: ensure properties for the same lb are reported in order

* adding more multi-thread test cases

* clarify comments in test

* add a debug log and clean up test

* use one queue for each lb in test

* address comments

* adjust log msg

* Just lock the whole damn thing

---------

Co-authored-by: Bohan Yang <boyang@linkedin.com>
  • Loading branch information
PapaCharlie and bohhyang authored May 10, 2024
1 parent a646a52 commit 5128dd5
Show file tree
Hide file tree
Showing 12 changed files with 587 additions and 205 deletions.
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ and what APIs have changed, if applicable.

## [Unreleased]

## [29.54.0] - 2024-05-08
- Dual read monitors cluster uris similarity

## [29.53.1] - 2024-04-24
- Remove emitting SD event for receiving URI data update

Expand Down Expand Up @@ -5686,7 +5689,8 @@ patch operations can re-use these classes for generating patch messages.

## [0.14.1]

[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.53.1...master
[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.54.0...master
[29.54.0]: https://github.com/linkedin/rest.li/compare/v29.53.1...v29.54.0
[29.53.1]: https://github.com/linkedin/rest.li/compare/v29.53.0...v29.53.1
[29.53.0]: https://github.com/linkedin/rest.li/compare/v29.52.1...v29.53.0
[29.52.1]: https://github.com/linkedin/rest.li/compare/v29.52.0...v29.52.1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,27 @@

package com.linkedin.d2.balancer.dualread;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;


public class DualReadLoadBalancerJmx implements DualReadLoadBalancerJmxMBean
{
private final AtomicInteger _servicePropertiesErrorCount = new AtomicInteger();
private final AtomicInteger _clusterPropertiesErrorCount = new AtomicInteger();
private final AtomicInteger _uriPropertiesErrorCount = new AtomicInteger();

private final AtomicInteger _servicePropertiesEvictCount = new AtomicInteger();
private final AtomicInteger _clusterPropertiesEvictCount = new AtomicInteger();
private final AtomicInteger _uriPropertiesEvictCount = new AtomicInteger();

private final AtomicInteger _servicePropertiesOutOfSyncCount = new AtomicInteger();
private final AtomicInteger _clusterPropertiesOutOfSyncCount = new AtomicInteger();
private final AtomicInteger _uriPropertiesOutOfSyncCount = new AtomicInteger();

private final AtomicReference<Double> _uriPropertiesSimilarity = new AtomicReference<>(0d);

private final Map<String, UriPropertiesDualReadMonitor.ClusterMatchRecord> _clusters = new HashMap<>();


@Override
Expand All @@ -46,10 +51,11 @@ public int getClusterPropertiesErrorCount()
return _clusterPropertiesErrorCount.get();
}

@Deprecated
@Override
public int getUriPropertiesErrorCount()
{
return _uriPropertiesErrorCount.get();
return 0;
}

@Override
Expand All @@ -64,25 +70,41 @@ public int getClusterPropertiesEvictCount()
return _clusterPropertiesEvictCount.get();
}

@Deprecated
@Override
public int getUriPropertiesEvictCount()
{
return _uriPropertiesEvictCount.get();
return 0;
}

@Override
public int getServicePropertiesOutOfSyncCount() {
public int getServicePropertiesOutOfSyncCount()
{
return _servicePropertiesOutOfSyncCount.get();
}

@Override
public int getClusterPropertiesOutOfSyncCount() {
public int getClusterPropertiesOutOfSyncCount()
{
return _clusterPropertiesOutOfSyncCount.get();
}

@Deprecated
@Override
public int getUriPropertiesOutOfSyncCount()
{
return 0;
}

@Override
public double getUriPropertiesSimilarity()
{
return _uriPropertiesSimilarity.get();
}

@Override
public int getUriPropertiesOutOfSyncCount() {
return _uriPropertiesOutOfSyncCount.get();
public @Nullable UriPropertiesDualReadMonitor.ClusterMatchRecord getClusterMatchRecord(String clusterName) {
return _clusters.get(clusterName);
}

public void incrementServicePropertiesErrorCount()
Expand All @@ -95,11 +117,6 @@ public void incrementClusterPropertiesErrorCount()
_clusterPropertiesErrorCount.incrementAndGet();
}

public void incrementUriPropertiesErrorCount()
{
_uriPropertiesErrorCount.incrementAndGet();
}

public void incrementServicePropertiesEvictCount()
{
_servicePropertiesEvictCount.incrementAndGet();
Expand All @@ -110,11 +127,6 @@ public void incrementClusterPropertiesEvictCount()
_clusterPropertiesEvictCount.incrementAndGet();
}

public void incrementUriPropertiesEvictCount()
{
_uriPropertiesEvictCount.incrementAndGet();
}

public void incrementServicePropertiesOutOfSyncCount()
{
_servicePropertiesOutOfSyncCount.incrementAndGet();
Expand All @@ -125,11 +137,6 @@ public void incrementClusterPropertiesOutOfSyncCount()
_clusterPropertiesOutOfSyncCount.incrementAndGet();
}

public void incrementUriPropertiesOutOfSyncCount()
{
_uriPropertiesOutOfSyncCount.incrementAndGet();
}

public void decrementServicePropertiesOutOfSyncCount()
{
_servicePropertiesOutOfSyncCount.decrementAndGet();
Expand All @@ -140,8 +147,14 @@ public void decrementClusterPropertiesOutOfSyncCount()
_clusterPropertiesOutOfSyncCount.decrementAndGet();
}

public void decrementUriPropertiesOutOfSyncCount()
public void setUriPropertiesSimilarity(double similarity)
{
_uriPropertiesSimilarity.set(similarity);
}

public void setClusterMatchRecord(String clusterName,
UriPropertiesDualReadMonitor.ClusterMatchRecord clusterMatchRecord)
{
_uriPropertiesOutOfSyncCount.decrementAndGet();
_clusters.put(clusterName, clusterMatchRecord);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@ public interface DualReadLoadBalancerJmxMBean

int getClusterPropertiesErrorCount();

@Deprecated
int getUriPropertiesErrorCount();

// Evict count is incremented when cache grows to the max size and entries get evicted.
int getServicePropertiesEvictCount();

int getClusterPropertiesEvictCount();

@Deprecated
int getUriPropertiesEvictCount();

// Entries become out of sync when:
Expand All @@ -41,5 +43,13 @@ public interface DualReadLoadBalancerJmxMBean

int getClusterPropertiesOutOfSyncCount();

@Deprecated
int getUriPropertiesOutOfSyncCount();

// Similarity is calculated as the ratio of the number of URIs that are common between the two LBs to the total
// number of URIs in both LBs.
double getUriPropertiesSimilarity();

// Returns the ClusterMatchRecord for the given clusterName.
UriPropertiesDualReadMonitor.ClusterMatchRecord getClusterMatchRecord(String clusterName);
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@
import com.google.common.cache.RemovalCause;
import com.linkedin.d2.balancer.properties.ClusterProperties;
import com.linkedin.d2.balancer.properties.ServiceProperties;
import com.linkedin.d2.balancer.properties.UriProperties;
import com.linkedin.util.RateLimitedLogger;
import com.linkedin.util.clock.Clock;
import java.time.Instant;
import java.time.ZoneId;
Expand All @@ -42,26 +40,27 @@
*
* When a new service discovery data is reported, it will check if the cache of the other data source
* has data for the same property name. If there is, it will compare whether the two data are equal.
*
* Note that there are only two implementations of this class, one for {@link ServiceProperties} and one for
* {@link ClusterProperties}, and not one for {@link com.linkedin.d2.balancer.properties.UriProperties}. This is because
* the URI properties need to be compared holistically at the cluster level.
*/
public abstract class DualReadLoadBalancerMonitor<T>
{
private static final Logger LOG = LoggerFactory.getLogger(DualReadLoadBalancerMonitor.class);
public final static String DEFAULT_DATE_FORMAT = "YYYY/MM/dd HH:mm:ss.SSS";
public final static String VERSION_FROM_FS = "-1";
private static final long ERROR_REPORT_PERIOD = 600 * 1000; // Limit error report logging to every 10 minutes
private static final int MAX_CACHE_SIZE = 10000;
private final Cache<String, CacheEntry<T>> _oldLbPropertyCache;
private final Cache<String, CacheEntry<T>> _newLbPropertyCache;
private final RateLimitedLogger _rateLimitedLogger;
private final Clock _clock;
private final DateTimeFormatter _format;


public DualReadLoadBalancerMonitor(Clock clock)
private DualReadLoadBalancerMonitor(Clock clock)
{
_oldLbPropertyCache = buildCache();
_newLbPropertyCache = buildCache();
_rateLimitedLogger = new RateLimitedLogger(LOG, ERROR_REPORT_PERIOD, clock);
_clock = clock;
_format = DateTimeFormatter.ofPattern(DEFAULT_DATE_FORMAT);
}
Expand All @@ -72,7 +71,6 @@ public void reportData(String propertyName, T property, String propertyVersion,
Cache<String, CacheEntry<T>> cacheToAdd = fromNewLb ? _newLbPropertyCache : _oldLbPropertyCache;
CacheEntry<T> existingEntry = cacheToAdd.getIfPresent(propertyName);
String propertyClassName = property.getClass().getSimpleName();
boolean isUriProp = property instanceof UriProperties;

if (existingEntry != null && existingEntry._data.equals(property))
{
Expand All @@ -92,19 +90,10 @@ public void reportData(String propertyName, T property, String propertyVersion,
// different.
if (!isReadFromFS(existingEntry._version, propertyVersion))
{
String msg = String.format("Received same data of different versions in %s LB for %s: %s."
+ " Old version: %s, New version: %s, Data: %s",
fromNewLb ? "New" : "Old", propertyClassName, propertyName, existingEntry._version,
propertyVersion, property);

if (isUriProp)
{
LOG.debug(msg);
}
else
{
warnByPropType(isUriProp, msg);
}
LOG.warn("Received same data of different versions in {} LB for {} {}"
+ " Old version: {} New version: {} Data: {}",
fromNewLb ? "New" : "Old", propertyClassName, propertyName, existingEntry._version,
propertyVersion, property);
}
// still need to put in the cache, don't skip
}
Expand Down Expand Up @@ -136,23 +125,14 @@ else if (!isDataEqual && isVersionEqual)
{ // data is not the same but version is the same, a mismatch!
incrementPropertiesErrorCount();
incrementEntryOutOfSyncCount(); // increment the out-of-sync count for the entry received later
warnByPropType(isUriProp,
String.format("Received mismatched %s for %s. %s", propertyClassName, propertyName, entriesLogMsg));
LOG.warn("Received mismatched {} for {}. {}", propertyClassName, propertyName, entriesLogMsg);
cacheToCompare.invalidate(propertyName);
}
else {
if (isDataEqual)
{
String msg = String.format("Received same data of %s for %s but with different versions: %s",
LOG.warn("Received same data of {} for {} but with different versions: {}",
propertyClassName, propertyName, entriesLogMsg);
if (isUriProp)
{
LOG.debug(msg);
}
else
{
warnByPropType(isUriProp, msg);
}
}
cacheToAdd.put(propertyName, newEntry);
incrementEntryOutOfSyncCount();
Expand Down Expand Up @@ -209,18 +189,6 @@ private boolean isReadFromFS(String v1, String v2)
return v1.startsWith(VERSION_FROM_FS) || v2.startsWith(VERSION_FROM_FS);
}

private void warnByPropType(boolean isUriProp, String msg)
{
if (isUriProp)
{
_rateLimitedLogger.warn(msg);
}
else
{
LOG.warn(msg);
}
}

@VisibleForTesting
String getEntriesMessage(boolean fromNewLb, CacheEntry<T> oldE, CacheEntry<T> newE)
{
Expand Down Expand Up @@ -319,37 +287,4 @@ void onEvict()
_dualReadLoadBalancerJmx.incrementServicePropertiesEvictCount();
}
}

public static final class UriPropertiesDualReadMonitor extends DualReadLoadBalancerMonitor<UriProperties>
{
private final DualReadLoadBalancerJmx _dualReadLoadBalancerJmx;

public UriPropertiesDualReadMonitor(DualReadLoadBalancerJmx dualReadLoadBalancerJmx, Clock clock)
{
super(clock);
_dualReadLoadBalancerJmx = dualReadLoadBalancerJmx;
}

@Override
void incrementEntryOutOfSyncCount() {
_dualReadLoadBalancerJmx.incrementUriPropertiesOutOfSyncCount();
}

@Override
void decrementEntryOutOfSyncCount() {
_dualReadLoadBalancerJmx.decrementUriPropertiesOutOfSyncCount();
}

@Override
void incrementPropertiesErrorCount()
{
_dualReadLoadBalancerJmx.incrementUriPropertiesErrorCount();
}

@Override
void onEvict()
{
_dualReadLoadBalancerJmx.incrementUriPropertiesEvictCount();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,17 +60,24 @@ public class DualReadStateManager

private final DualReadLoadBalancerJmx _dualReadLoadBalancerJmx;

private final DualReadLoadBalancerMonitor.UriPropertiesDualReadMonitor _uriPropertiesDualReadMonitor;
private final DualReadLoadBalancerMonitor.ServicePropertiesDualReadMonitor _servicePropertiesDualReadMonitor;
private final DualReadLoadBalancerMonitor.ClusterPropertiesDualReadMonitor _clusterPropertiesDualReadMonitor;
private final UriPropertiesDualReadMonitor _uriPropertiesDualReadMonitor;
private final boolean _monitorUriProperties;


@Deprecated
public DualReadStateManager(DualReadModeProvider dualReadModeProvider, ScheduledExecutorService executorService)
{
this(dualReadModeProvider, executorService, false);
}

public DualReadStateManager(DualReadModeProvider dualReadModeProvider, ScheduledExecutorService executorService,
boolean monitorUriProperties)
{
_dualReadLoadBalancerJmx = new DualReadLoadBalancerJmx();
Clock clock = SystemClock.instance();
_uriPropertiesDualReadMonitor = new DualReadLoadBalancerMonitor.UriPropertiesDualReadMonitor(
_dualReadLoadBalancerJmx, clock);
_monitorUriProperties = monitorUriProperties;
_uriPropertiesDualReadMonitor = new UriPropertiesDualReadMonitor(_dualReadLoadBalancerJmx);
_servicePropertiesDualReadMonitor = new DualReadLoadBalancerMonitor.ServicePropertiesDualReadMonitor(
_dualReadLoadBalancerJmx, clock);
_clusterPropertiesDualReadMonitor = new DualReadLoadBalancerMonitor.ClusterPropertiesDualReadMonitor(
Expand Down Expand Up @@ -172,10 +179,10 @@ private void reportClusterPropertiesData(String propertyName, ClusterProperties

private void reportUriPropertiesData(String propertyName, UriProperties property, boolean fromNewLb)
{
if (_clusterDualReadModes.getOrDefault(propertyName, _dualReadMode) == DualReadModeProvider.DualReadMode.DUAL_READ)
if (_monitorUriProperties &&
_clusterDualReadModes.getOrDefault(propertyName, _dualReadMode) == DualReadModeProvider.DualReadMode.DUAL_READ)
{
String version = property.getVersion() + "|" + property.Uris().size();
_uriPropertiesDualReadMonitor.reportData(propertyName, property, version, fromNewLb);
_uriPropertiesDualReadMonitor.reportData(propertyName, property, fromNewLb);
}
}

Expand Down
Loading

0 comments on commit 5128dd5

Please sign in to comment.