Skip to content

Commit

Permalink
Build cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
sobychacko committed Jun 23, 2022
1 parent 9e673e7 commit 118fdd7
Show file tree
Hide file tree
Showing 19 changed files with 302 additions and 30 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ subprojects { subproject ->
eclipse.project.natures += 'org.springframework.ide.eclipse.core.springnature'

jacoco {
toolVersion = '0.8.6'
toolVersion = '0.8.7'
}

configurations {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package experiments.failover.consumer;

import java.io.Serial;

import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageRouter;
import org.apache.pulsar.client.api.TopicMetadata;
Expand Down Expand Up @@ -53,6 +55,8 @@ public void listen(String foo) {
}

static class FooRouter implements MessageRouter {
@Serial
private static final long serialVersionUID = -1L;

@Override
public int choosePartition(Message<?> msg, TopicMetadata metadata) {
Expand All @@ -61,6 +65,8 @@ public int choosePartition(Message<?> msg, TopicMetadata metadata) {
}

static class BarRouter implements MessageRouter {
@Serial
private static final long serialVersionUID = -1L;

@Override
public int choosePartition(Message<?> msg, TopicMetadata metadata) {
Expand All @@ -69,6 +75,8 @@ public int choosePartition(Message<?> msg, TopicMetadata metadata) {
}

static class BuzzRouter implements MessageRouter {
@Serial
private static final long serialVersionUID = -1L;

@Override
public int choosePartition(Message<?> msg, TopicMetadata metadata) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@
* Processing of {@code @PulsarListener} annotations is performed by registering a
* {@link PulsarListenerAnnotationBeanPostProcessor}. This can be done manually or, more
* conveniently, through {@link EnablePulsar} annotation.
*
* <p>
* </p>
*
* @author Soby Chacko
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,7 @@ else if (resolved != null) {
return result;
}

@SuppressWarnings("unchecked")
private void resolvePulsarProperties(MethodPulsarListenerEndpoint<?> endpoint, String[] propertyStrings) {
if (propertyStrings.length > 0) {
Properties properties = new Properties();
Expand All @@ -393,8 +394,7 @@ else if (value instanceof String[]) {
loadProperty(properties, prop, prop);
}
}
else if (value instanceof Collection) {
Collection<?> values = (Collection<?>) value;
else if (value instanceof Collection<?> values) {
if (values.size() > 0 && values.iterator().next() instanceof String) {
for (String prop : (Collection<String>) value) {
loadProperty(properties, prop, prop);
Expand Down Expand Up @@ -486,6 +486,7 @@ private String resolve(String value) {
return value;
}

@SuppressWarnings("unchecked")
private void resolveAsString(Object resolvedValue, List<String> result) {
if (resolvedValue instanceof String[]) {
for (Object object : (String[]) resolvedValue) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public abstract class AbstractPulsarListenerContainerFactory<C extends AbstractP
implements PulsarListenerContainerFactory<C>, ApplicationEventPublisherAware, InitializingBean,
ApplicationContextAware {

protected final LogAccessor logger = new LogAccessor(LogFactory.getLog(getClass()));
protected final LogAccessor logger = new LogAccessor(LogFactory.getLog(getClass())); // NOSONAR protected

private final PulsarContainerProperties containerProperties = new PulsarContainerProperties();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ protected HandlerAdapter configureListenerAdapter(PulsarMessagingMessageListener
return new HandlerAdapter(invocableHandlerMethod);
}

@SuppressWarnings({"unchecked", "rawtypes"})
protected PulsarMessagingMessageListenerAdapter<V> createMessageListenerInstance(
@Nullable MessageConverter messageConverter) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ public SchemaTopic(Schema<T> schema, String topicName, MessageRouter messageRout
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
@SuppressWarnings("unchecked")
SchemaTopic that = (SchemaTopic) o;
if (this.messageRouter == null && that.messageRouter == null) {
return schema.equals(that.schema) && topicName.equals(that.topicName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public class DefaultPulsarMessageListenerContainer<T> extends AbstractPulsarMess

private volatile CountDownLatch startLatch = new CountDownLatch(1);

private final AbstractPulsarMessageListenerContainer thisOrParentContainer;
private final AbstractPulsarMessageListenerContainer<?> thisOrParentContainer;

public DefaultPulsarMessageListenerContainer(PulsarConsumerFactory<? super T> pulsarConsumerFactory, PulsarContainerProperties pulsarContainerProperties) {
super(pulsarConsumerFactory, pulsarContainerProperties);
Expand Down Expand Up @@ -180,7 +180,7 @@ private final class Listener implements SchedulingAwareRunnable {

private volatile Thread consumerThread;

@SuppressWarnings("unchecked")
@SuppressWarnings({"unchecked", "rawtypes"})
Listener(MessageListener<?> messageListener) {
if (messageListener instanceof PulsarBatchMessageListener) {
this.batchMessageHandler = (PulsarBatchMessageListener<T>) messageListener;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,16 @@ public PulsarBatchMessagingMessageListenerAdapter(Object bean, Method method) {
super(bean, method);
}

public void setBatchMessageConverter(PulsarBatchMessageConverter messageConverter) {
public void setBatchMessageConverter(PulsarBatchMessageConverter<V> messageConverter) {
Assert.notNull(messageConverter, "'messageConverter' cannot be null");
this.batchMessageConverter = messageConverter;
PulsarRecordMessageConverter recordMessageConverter = messageConverter.getRecordMessageConverter();
PulsarRecordMessageConverter<V> recordMessageConverter = messageConverter.getRecordMessageConverter();
if (recordMessageConverter != null) {
setMessageConverter(recordMessageConverter);
}
}

protected final PulsarBatchMessageConverter getBatchMessageConverter() {
protected final PulsarBatchMessageConverter<V> getBatchMessageConverter() {
return this.batchMessageConverter;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public abstract class PulsarMessagingMessageListenerAdapter<V> {

private boolean converterSet;

private PulsarRecordMessageConverter messageConverter = new PulsarMessagingMessageConverter<V>();
private PulsarRecordMessageConverter<V> messageConverter = new PulsarMessagingMessageConverter<V>();

private Type fallbackType = Object.class;

Expand All @@ -80,12 +80,12 @@ public PulsarMessagingMessageListenerAdapter(Object bean, Method method) {
this.inferredType = determineInferredType(method);
}

public void setMessageConverter(PulsarRecordMessageConverter messageConverter) {
public void setMessageConverter(PulsarRecordMessageConverter<V> messageConverter) {
this.messageConverter = messageConverter;
this.converterSet = true;
}

protected final PulsarRecordMessageConverter getMessageConverter() {
protected final PulsarRecordMessageConverter<V> getMessageConverter() {
return this.messageConverter;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,10 @@
*/
public interface PulsarBatchMessageConverter<T> extends MessageConverter {

@NonNull
Message<?> toMessage(Messages<T> records, Consumer<T> consumer, Type payloadType);

T fromMessage(Messages<T> message, String defaultTopic);

@Nullable
default PulsarRecordMessageConverter<T> getRecordMessageConverter() {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,14 @@
*/
public class PulsarBatchMessagingMessageConverter<T> implements PulsarBatchMessageConverter<T> {

private final PulsarRecordMessageConverter recordConverter;
private final PulsarRecordMessageConverter<T> recordConverter;


public PulsarBatchMessagingMessageConverter() {
this(null);
}

public PulsarBatchMessagingMessageConverter(PulsarRecordMessageConverter recordConverter) {
public PulsarBatchMessagingMessageConverter(PulsarRecordMessageConverter<T> recordConverter) {
this.recordConverter = recordConverter;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package org.springframework.pulsar.core;

import java.io.Serial;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
Expand Down Expand Up @@ -65,21 +66,16 @@ public void testFailOverConsumersOnPartitionedTopic() throws Exception {
final DefaultPulsarConsumerFactory<String> pulsarConsumerFactory = new DefaultPulsarConsumerFactory<>(pulsarClient, config);
CountDownLatch latch = new CountDownLatch(3);
PulsarContainerProperties pulsarContainerProperties = new PulsarContainerProperties();
pulsarContainerProperties.setMessageListener(new MessageListener() {
@Override
public void received(Consumer consumer, Message msg) {
latch.countDown();
}
});
pulsarContainerProperties.setMessageListener((MessageListener<?>) (consumer, msg) -> latch.countDown());
pulsarContainerProperties.setSubscriptionType(SubscriptionType.Failover);
pulsarContainerProperties.setSchema(Schema.STRING);
DefaultPulsarMessageListenerContainer<String> container = new DefaultPulsarMessageListenerContainer(
DefaultPulsarMessageListenerContainer<String> container = new DefaultPulsarMessageListenerContainer<>(
pulsarConsumerFactory, pulsarContainerProperties);
container.start();
DefaultPulsarMessageListenerContainer<String> container1 = new DefaultPulsarMessageListenerContainer(
DefaultPulsarMessageListenerContainer<String> container1 = new DefaultPulsarMessageListenerContainer<>(
pulsarConsumerFactory, pulsarContainerProperties);
container1.start();
DefaultPulsarMessageListenerContainer<String> container2 = new DefaultPulsarMessageListenerContainer(
DefaultPulsarMessageListenerContainer<String> container2 = new DefaultPulsarMessageListenerContainer<>(
pulsarConsumerFactory, pulsarContainerProperties);
container2.start();
Map<String, Object> prodConfig = new HashMap<>();
Expand All @@ -98,6 +94,9 @@ public void received(Consumer consumer, Message msg) {

static class FooRouter implements MessageRouter {

@Serial
private static final long serialVersionUID = -1L;

@Override
public int choosePartition(Message<?> msg, TopicMetadata metadata) {
return 0;
Expand All @@ -106,6 +105,9 @@ public int choosePartition(Message<?> msg, TopicMetadata metadata) {

static class BarRouter implements MessageRouter {

@Serial
private static final long serialVersionUID = -1L;

@Override
public int choosePartition(Message<?> msg, TopicMetadata metadata) {
return 1;
Expand All @@ -114,6 +116,9 @@ public int choosePartition(Message<?> msg, TopicMetadata metadata) {

static class BuzzRouter implements MessageRouter {

@Serial
private static final long serialVersionUID = -1L;

@Override
public int choosePartition(Message<?> msg, TopicMetadata metadata) {
return 2;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ private void testPulsarFunctionality(String pulsarBrokerUrl) throws Exception {
PulsarClient client = PulsarClient.builder()
.serviceUrl(pulsarBrokerUrl)
.build();
Consumer consumer = client.newConsumer()
Consumer<byte[]> consumer = client.newConsumer()
.topic(TEST_TOPIC)
.subscriptionName("test-subs")
.subscribe();
Expand All @@ -117,8 +117,8 @@ private void testPulsarFunctionality(String pulsarBrokerUrl) throws Exception {
.create()
) {
producer.send("test containers".getBytes());
CompletableFuture<Message> future = consumer.receiveAsync();
Message message = future.get(5, TimeUnit.SECONDS);
CompletableFuture<Message<byte[]>> future = consumer.receiveAsync();
Message<byte[]> message = future.get(5, TimeUnit.SECONDS);

assertThat(new String(message.getData()))
.isEqualTo("test containers");
Expand Down
22 changes: 22 additions & 0 deletions src/api/overview.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
<html>
<body>
This document is the API specification for Spring for Apache Kafka project
<hr>
<div id="overviewBody">
<p>
For further API reference and developer documentation, see the
<a href="https://docs.spring.io/spring-kafka/reference" target="_top">Spring
for Apache Kafka reference documentation</a>.
That documentation contains more detailed, developer-targeted
descriptions, with conceptual overviews, definitions of terms,
workarounds, and working code examples.
</p>

<p>
If you are interested in commercial consultancy, and
support for Spring for Apache Kafka, please visit <a href="https://spring.io/" target="_top">
https://spring.io/</a>
</p>
</div>
</body>
</html>
17 changes: 17 additions & 0 deletions src/checkstyle/checkstyle-header.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
^\Q/*\E$
^\Q * Copyright \E20\d\d(\-20\d\d)?\Q the original author or authors.\E$
^\Q *\E$
^\Q * Licensed under the Apache License, Version 2.0 (the "License");\E$
^\Q * you may not use this file except in compliance with the License.\E$
^\Q * You may obtain a copy of the License at\E$
^\Q *\E$
^\Q * https://www.apache.org/licenses/LICENSE-2.0\E$
^\Q *\E$
^\Q * Unless required by applicable law or agreed to in writing, software\E$
^\Q * distributed under the License is distributed on an "AS IS" BASIS,\E$
^\Q * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\E$
^\Q * See the License for the specific language governing permissions and\E$
^\Q * limitations under the License.\E$
^\Q */\E$
^$
^.*$
13 changes: 13 additions & 0 deletions src/checkstyle/checkstyle-suppressions.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
<?xml version="1.0"?>
<!DOCTYPE suppressions PUBLIC
"-//Checkstyle//DTD SuppressionFilter Configuration 1.2//EN"
"https://checkstyle.org/dtds/suppressions_1_2.dtd">
<suppressions>
<suppress files="package-info\.java" checks=".*"/>
<suppress files="[\\/]test[\\/]" checks="RequireThis"/>
<suppress files="[\\/]test[\\/]" checks="Javadoc*"/>
<suppress files="KafkaMatchersTests" checks="RegexpSinglelineJava"/>
<suppress files="(DeserializationException|ConversionException)" checks="MutableException"/>
<suppress files="[\\/]kafka.jdocs[\\/]" checks="Regexp*"/>
<suppress files="[\\/]kafka.kdocs[\\/]" checks="Regexp*"/>
</suppressions>
Loading

0 comments on commit 118fdd7

Please sign in to comment.