Skip to content

Commit

Permalink
Use schema hash in producer cache key
Browse files Browse the repository at this point in the history
  • Loading branch information
onobc authored and sobychacko committed Jul 28, 2022
1 parent d85fbba commit c57f5dd
Show file tree
Hide file tree
Showing 2 changed files with 156 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,10 @@

import java.time.Duration;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import org.aopalliance.intercept.MethodInterceptor;
import org.aopalliance.intercept.MethodInvocation;
import org.apache.commons.logging.LogFactory;
Expand All @@ -32,11 +30,14 @@
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.protocol.schema.SchemaHash;

import org.springframework.aop.framework.AopProxyUtils;
import org.springframework.aop.framework.ProxyFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.core.log.LogAccessor;
import org.springframework.lang.NonNull;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;

import com.github.benmanes.caffeine.cache.Cache;
Expand All @@ -63,7 +64,7 @@ public class CachingPulsarProducerFactory<T> extends DefaultPulsarProducerFactor

private final LogAccessor logger = new LogAccessor(LogFactory.getLog(this.getClass()));

private final Cache<SchemaTopic<T>, Producer<T>> producerCache;
private final Cache<ProducerCacheKey<T>, Producer<T>> producerCache;

/**
* Construct a caching producer factory with the specified values for the cache configuration.
Expand All @@ -82,7 +83,7 @@ public CachingPulsarProducerFactory(PulsarClient pulsarClient, Map<String, Objec
.maximumSize(cacheMaximumSize)
.initialCapacity(cacheInitialCapacity)
.scheduler(Scheduler.systemScheduler())
.evictionListener((RemovalListener<SchemaTopic<T>, Producer<T>>) (schemaTopic, producer, cause) -> {
.evictionListener((RemovalListener<ProducerCacheKey<T>, Producer<T>>) (producerCacheKey, producer, cause) -> {
this.logger.debug(() -> String.format("Producer %s evicted from cache due to %s",
ProducerUtils.formatProducer(producer), cause));
closeProducer(producer);
Expand All @@ -92,10 +93,10 @@ public CachingPulsarProducerFactory(PulsarClient pulsarClient, Map<String, Objec
@Override
public Producer<T> createProducer(String topic, Schema<T> schema, MessageRouter messageRouter) {
final String topicName = ProducerUtils.resolveTopicName(topic, this);
SchemaTopic<T> schemaTopic = new SchemaTopic<>(schema, topicName, messageRouter);
return this.producerCache.get(schemaTopic, (st) -> {
ProducerCacheKey<T> producerCacheKey = new ProducerCacheKey<>(schema, topicName, messageRouter);
return this.producerCache.get(producerCacheKey, (st) -> {
try {
return this.doCreateProducer(st.topicName, st.schema, messageRouter);
return this.doCreateProducer(st.topic, st.schema, st.router);
}
catch (PulsarClientException ex) {
throw new RuntimeException(ex);
Expand All @@ -116,7 +117,7 @@ private Producer<T> wrapProducerWithCloseCallback(Producer<T> producer, Consumer
factory.addAdvice(new MethodInterceptor() {
@Nullable
@Override
public Object invoke(@Nonnull MethodInvocation invocation) throws Throwable {
public Object invoke(@NonNull MethodInvocation invocation) throws Throwable {
if (invocation.getMethod().getName().equals("close")) {
closeCallback.accept((Producer<T>) invocation.getThis());
return null;
Expand All @@ -133,8 +134,8 @@ public Object invoke(@Nonnull MethodInvocation invocation) throws Throwable {

@Override
public void destroy() {
this.producerCache.asMap().forEach((schemaTopic, producer) -> {
this.producerCache.invalidate(schemaTopic);
this.producerCache.asMap().forEach((producerCacheKey, producer) -> {
this.producerCache.invalidate(producerCacheKey);
closeProducer(producer);
});
}
Expand All @@ -151,18 +152,50 @@ private void closeProducer(Producer<T> producer) {
}

/**
* Holder for a schema, topic and optional message router used as unique identifier of a producer in cache key.
*
* @param schema schema of the message
* @param topicName topic to send the message to
* @param messageRouter router to use to send the topic
* Uniquely identifies a producer that was handed out by the factory.
*
* @param <T> type of the schema
*/
record SchemaTopic<T>(Schema<T> schema, String topicName, MessageRouter messageRouter) {
public SchemaTopic {
static class ProducerCacheKey<T> {

private final Schema<T> schema;
private final SchemaHash schemaHash;
private final String topic;
private final MessageRouter router;

/**
* Constructs an instance.
*
* @param schema the schema the producer is configured to use
* @param topic the topic the producer is configured to send to
* @param router the custom message router the producer is configured to use
*/
ProducerCacheKey(Schema<T> schema, String topic, @Nullable MessageRouter router) {
Assert.notNull(schema, () -> "'schema' must be non-null");
Assert.notNull(topicName, () -> "'topicName' must be non-null");
Assert.notNull(topic, () -> "'topic' must be non-null");
this.schema = schema;
this.schemaHash = SchemaHash.of(this.schema);
this.topic = topic;
this.router = router;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ProducerCacheKey<?> that = (ProducerCacheKey<?>) o;
return this.topic.equals(that.topic) &&
this.schemaHash.equals(that.schemaHash) &&
Objects.equals(this.router, that.router);
}

@Override
public int hashCode() {
return this.topic.hashCode() + this.schemaHash.hashCode() + Objects.hashCode(this.router);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.jupiter.params.provider.Arguments.arguments;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;

Expand All @@ -27,6 +29,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;

import org.apache.pulsar.client.api.MessageRouter;
import org.apache.pulsar.client.api.Producer;
Expand All @@ -37,11 +40,15 @@
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Named;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

import org.springframework.aop.framework.AopProxyUtils;
import org.springframework.pulsar.core.CachingPulsarProducerFactory.SchemaTopic;
import org.springframework.pulsar.core.CachingPulsarProducerFactory.ProducerCacheKey;
import org.springframework.test.util.ReflectionTestUtils;
import org.springframework.util.ObjectUtils;

Expand Down Expand Up @@ -69,14 +76,14 @@ void cleanupFromTests() {
@Test
void createProducerMultipleCalls() throws PulsarClientException {
PulsarProducerFactory<String> producerFactory = producerFactory(pulsarClient, Collections.emptyMap());
SchemaTopic<String> cacheKey = new SchemaTopic<>(schema, "topic1", null);
ProducerCacheKey<String> cacheKey = new ProducerCacheKey<>(schema, "topic1", null);

Producer<String> producer1 = producerFactory.createProducer("topic1", schema);
Producer<String> producer2 = producerFactory.createProducer("topic1", schema);
Producer<String> producer3 = producerFactory.createProducer("topic1", schema);
Producer<String> producer2 = producerFactory.createProducer("topic1", new StringSchema());
Producer<String> producer3 = producerFactory.createProducer("topic1", new StringSchema());
assertThat(producer1).isSameAs(producer2).isSameAs(producer3);

Cache<SchemaTopic<String>, Producer<String>> producerCache = getAssertedProducerCache(producerFactory, Collections.singletonList(cacheKey));
Cache<ProducerCacheKey<String>, Producer<String>> producerCache = getAssertedProducerCache(producerFactory, Collections.singletonList(cacheKey));
Producer<String> cachedProducerProxy = producerCache.asMap().get(cacheKey);
assertThat(cachedProducerProxy).isSameAs(producer1);
}
Expand All @@ -101,45 +108,48 @@ void createProducerWithMatrixOfCacheKeys() throws PulsarClientException {
String topic2 = "topic2";
Schema<String> schema1 = new StringSchema();
Schema<String> schema2 = new StringSchema();
MessageRouter router1 = Mockito.mock(MessageRouter.class);
MessageRouter router2 = Mockito.mock(MessageRouter.class);
MessageRouter router1 = mock(MessageRouter.class);
MessageRouter router2 = mock(MessageRouter.class);

PulsarProducerFactory<String> producerFactory = producerFactory(pulsarClient, Collections.emptyMap());

producerFactory.createProducer(topic1, schema1);
producerFactory.createProducer(topic1, schema1, router1);
producerFactory.createProducer(topic1, schema1, router2);
producerFactory.createProducer(topic1, schema2);
producerFactory.createProducer(topic1, schema2, router1);
producerFactory.createProducer(topic1, schema2, router2);
producerFactory.createProducer(topic2, schema1);
producerFactory.createProducer(topic2, schema1, router1);
producerFactory.createProducer(topic2, schema1, router2);

List<SchemaTopic<String>> expectedCacheKeys = new ArrayList<>();
expectedCacheKeys.add(new SchemaTopic<>(schema1, topic1, null));
expectedCacheKeys.add(new SchemaTopic<>(schema1, topic1, router1));
expectedCacheKeys.add(new SchemaTopic<>(schema1, topic1, router2));
expectedCacheKeys.add(new SchemaTopic<>(schema1, topic2, null));
expectedCacheKeys.add(new SchemaTopic<>(schema1, topic2, router1));
expectedCacheKeys.add(new SchemaTopic<>(schema1, topic2, router2));
expectedCacheKeys.add(new SchemaTopic<>(schema2, topic1, null));
expectedCacheKeys.add(new SchemaTopic<>(schema2, topic1, router1));
expectedCacheKeys.add(new SchemaTopic<>(schema2, topic1, router2));
// ask for the same 9 unique combos 3x - should end up w/ only 9 entries in cache
for (int i = 0; i < 3; i++) {
producerFactory.createProducer(topic1, schema1);
producerFactory.createProducer(topic1, schema1, router1);
producerFactory.createProducer(topic1, schema1, router2);
producerFactory.createProducer(topic1, schema2);
producerFactory.createProducer(topic1, schema2, router1);
producerFactory.createProducer(topic1, schema2, router2);
producerFactory.createProducer(topic2, schema1);
producerFactory.createProducer(topic2, schema1, router1);
producerFactory.createProducer(topic2, schema1, router2);
}

List<ProducerCacheKey<String>> expectedCacheKeys = new ArrayList<>();
expectedCacheKeys.add(new ProducerCacheKey<>(schema1, topic1, null));
expectedCacheKeys.add(new ProducerCacheKey<>(schema1, topic1, router1));
expectedCacheKeys.add(new ProducerCacheKey<>(schema1, topic1, router2));
expectedCacheKeys.add(new ProducerCacheKey<>(schema1, topic2, null));
expectedCacheKeys.add(new ProducerCacheKey<>(schema1, topic2, router1));
expectedCacheKeys.add(new ProducerCacheKey<>(schema1, topic2, router2));
expectedCacheKeys.add(new ProducerCacheKey<>(schema2, topic1, null));
expectedCacheKeys.add(new ProducerCacheKey<>(schema2, topic1, router1));
expectedCacheKeys.add(new ProducerCacheKey<>(schema2, topic1, router2));

getAssertedProducerCache(producerFactory, expectedCacheKeys);
}

@Test
void factoryDestroyCleansUpCacheAndClosesProducers() throws PulsarClientException {
CachingPulsarProducerFactory<String> producerFactory = producerFactory(pulsarClient, Collections.emptyMap());
SchemaTopic<String> cacheKey1 = new SchemaTopic<>(schema, "topic1", null);
SchemaTopic<String> cacheKey2 = new SchemaTopic<>(schema, "topic2", null);
ProducerCacheKey<String> cacheKey1 = new ProducerCacheKey<>(schema, "topic1", null);
ProducerCacheKey<String> cacheKey2 = new ProducerCacheKey<>(schema, "topic2", null);

Producer<String> actualProducer1 = actualProducerFrom(producerFactory.createProducer("topic1", schema));
Producer<String> actualProducer2 = actualProducerFrom(producerFactory.createProducer("topic2", schema));

Cache<SchemaTopic<String>, Producer<String>> producerCache = getAssertedProducerCache(producerFactory,
Cache<ProducerCacheKey<String>, Producer<String>> producerCache = getAssertedProducerCache(producerFactory,
Arrays.asList(cacheKey1, cacheKey2));
producerFactory.destroy();
Awaitility.await()
Expand All @@ -155,11 +165,11 @@ void factoryDestroyCleansUpCacheAndClosesProducers() throws PulsarClientExceptio
void producerEvictedFromCache() throws PulsarClientException {
CachingPulsarProducerFactory<String> producerFactory = new CachingPulsarProducerFactory<>(pulsarClient,
Collections.emptyMap(), Duration.ofSeconds(3L), 10L, 2);
SchemaTopic<String> cacheKey = new SchemaTopic<>(schema, "topic1", null);
ProducerCacheKey<String> cacheKey = new ProducerCacheKey<>(schema, "topic1", null);

Producer<String> actualProducer = actualProducerFrom(producerFactory.createProducer("topic1", schema));

Cache<SchemaTopic<String>, Producer<String>> producerCache = getAssertedProducerCache(producerFactory,
Cache<ProducerCacheKey<String>, Producer<String>> producerCache = getAssertedProducerCache(producerFactory,
Collections.singletonList(cacheKey));
Awaitility.await()
.pollDelay(Duration.ofSeconds(5L))
Expand Down Expand Up @@ -194,9 +204,9 @@ private Producer<String> actualProducerFrom(Producer<String> proxyProducer) {
}

@SuppressWarnings("unchecked")
private Cache<SchemaTopic<String>, Producer<String>> getAssertedProducerCache(PulsarProducerFactory<String> producerFactory,
List<SchemaTopic<String>> expectedCacheKeys) {
Cache<SchemaTopic<String>, Producer<String>> producerCache = (Cache<SchemaTopic<String>, Producer<String>>)
private Cache<ProducerCacheKey<String>, Producer<String>> getAssertedProducerCache(PulsarProducerFactory<String> producerFactory,
List<ProducerCacheKey<String>> expectedCacheKeys) {
Cache<ProducerCacheKey<String>, Producer<String>> producerCache = (Cache<ProducerCacheKey<String>, Producer<String>>)
ReflectionTestUtils.getField(producerFactory, "producerCache");
assertThat(producerCache).isNotNull();
if (ObjectUtils.isEmpty(expectedCacheKeys)) {
Expand All @@ -215,4 +225,62 @@ protected CachingPulsarProducerFactory<String> producerFactory(PulsarClient puls
producerFactories.add(producerFactory);
return producerFactory;
}

@Nested
class ProducerCacheKeyTests {

@Test
void nullSchemaIsNotAllowed() {
assertThatThrownBy(() -> new ProducerCacheKey<>(null, "topic1", null))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("'schema' must be non-null");
}

@Test
void nullTopicIsNotAllowed() {
assertThatThrownBy(() -> new ProducerCacheKey<>(schema, null, null))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("'topic' must be non-null");
}

@ParameterizedTest(name = "equals({0}) should be {2}")
@MethodSource("equalsAndHashCodeTestProvider")
void equalsAndHashCodeTest(Object key1, Object key2, boolean shouldBeEquals) {
assertThat(key1.equals(key2)).isEqualTo(shouldBeEquals);
if (shouldBeEquals) {
assertThat(key1.hashCode()).isEqualTo(key2.hashCode());
}
}

static Stream<Arguments> equalsAndHashCodeTestProvider() {
MessageRouter router1 = mock(MessageRouter.class);
ProducerCacheKey<String> key1 = new ProducerCacheKey<>(Schema.STRING, "topic1", router1);
return Stream.of(
arguments(Named.of("differentClass", key1), "someStrangeObject", false),
arguments(Named.of("null", key1), null, false),
arguments(Named.of("sameInstance", key1), key1, true),
arguments(Named.of("sameSchemaSameTopicSameNullRouter",
new ProducerCacheKey<>(Schema.STRING, "topic1", null)),
new ProducerCacheKey<>(Schema.STRING, "topic1", null), true),
arguments(Named.of("sameSchemaSameTopicSameNonNullRouter",
new ProducerCacheKey<>(Schema.STRING, "topic1", router1)),
new ProducerCacheKey<>(Schema.STRING, "topic1", router1), true),
arguments(Named.of("differentSchemaInstanceSameSchemaType",
new ProducerCacheKey<>(new StringSchema(), "topic1", router1)),
new ProducerCacheKey<>(new StringSchema(), "topic1", router1), true),
arguments(Named.of("differentSchemaType",
new ProducerCacheKey<>(Schema.STRING, "topic1", router1)),
new ProducerCacheKey<>(Schema.INT64, "topic1", router1), false),
arguments(Named.of("differentTopic",
new ProducerCacheKey<>(Schema.STRING, "topic1", router1)),
new ProducerCacheKey<>(Schema.STRING, "topic2", router1), false),
arguments(Named.of("differentNonNullRouter",
new ProducerCacheKey<>(Schema.STRING, "topic1", router1)),
new ProducerCacheKey<>(Schema.STRING, "topic1", mock(MessageRouter.class)), false),
arguments(Named.of("differentNullRouter",
new ProducerCacheKey<>(Schema.STRING, "topic1", router1)),
new ProducerCacheKey<>(Schema.STRING, "topic1", null), false)
);
}
}
}

0 comments on commit c57f5dd

Please sign in to comment.