Skip to content

Commit

Permalink
Fix more issues from Sonar report
Browse files Browse the repository at this point in the history
  • Loading branch information
artembilan committed Nov 3, 2022
1 parent 0b9bab3 commit 5f1c0c1
Show file tree
Hide file tree
Showing 13 changed files with 62 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ protected void populatePropagatedContext(@Nullable Observation state, Message<?>
@Override
public void afterMessageHandled(Message<?> message, MessageChannel channel, MessageHandler handler, Exception ex) {
Observation.Scope scope = this.scopes.get();
if (scope != null && scope == this.observationRegistry.getCurrentObservationScope()) {
if (scope != null && scope.equals(this.observationRegistry.getCurrentObservationScope())) {
scope.close();
this.scopes.remove();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -32,6 +32,7 @@
* @author Mark Fisher
* @author Oleg Zhurakousky
* @author Gary Russell
* @author Artem Bilan
*/
public class EventDrivenConsumer extends AbstractEndpoint implements IntegrationConsumer {

Expand Down Expand Up @@ -95,9 +96,15 @@ private void logComponentSubscriptionEvent(boolean add) {
String componentType = ((NamedComponent) this.handler).getComponentType();
componentType = StringUtils.hasText(componentType) ? componentType : "";
String componentName = getComponentName();
componentName = (StringUtils.hasText(componentName) && componentName.contains("#")) ? "" : ":" + componentName;
StringBuffer buffer = new StringBuffer();
buffer.append("{" + componentType + componentName + "} as a subscriber to the '" + channelName + "' channel");
componentName =
(StringUtils.hasText(componentName) && componentName.contains("#")) ? "" : ":" + componentName;
StringBuilder buffer = new StringBuilder();
buffer.append("{")
.append(componentType)
.append(componentName)
.append("} as a subscriber to the '")
.append(channelName)
.append("' channel");
if (add) {
buffer.insert(0, "Adding ");
}
Expand All @@ -107,4 +114,5 @@ private void logComponentSubscriptionEvent(boolean add) {
logger.info(buffer.toString());
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ protected void subscribeToPublisher(Publisher<? extends Message<?>> publisher) {
.map(this::trackMessageIfAny)
.doOnComplete(this::stop)
.doOnCancel(this::stop)
.doOnSubscribe((subscription) -> this.subscription = subscription);
.doOnSubscribe((subs) -> this.subscription = subs);

if (channelForSubscription instanceof ReactiveStreamsSubscribableChannel) {
((ReactiveStreamsSubscribableChannel) channelForSubscription).subscribeTo(messageFlux);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,29 +309,30 @@ private void doProduceOutput(Message<?> requestMessage, MessageHeaders requestHe
replyChannel = getOutputChannel();
}

ReactiveAdapter reactiveAdapter = null;

if (this.async &&
(reply instanceof org.springframework.util.concurrent.ListenableFuture<?>
|| reply instanceof CompletableFuture<?>
|| (reactiveAdapter = ReactiveAdapterRegistry.getSharedInstance().getAdapter(null, reply)) != null)) {

if (replyChannel instanceof ReactiveStreamsSubscribableChannel reactiveStreamsSubscribableChannel) {
Publisher<?> reactiveReply = toPublisherReply(reply, reactiveAdapter);
reactiveStreamsSubscribableChannel
.subscribeTo(
Flux.from(reactiveReply)
.doOnError((ex) -> sendErrorMessage(requestMessage, ex))
.map(result -> createOutputMessage(result, requestHeaders)));
}
else {
CompletableFuture<?> futureReply = toFutureReply(reply, reactiveAdapter);
futureReply.whenComplete(new ReplyFutureCallback(requestMessage, replyChannel));
if (this.async) {
ReactiveAdapter reactiveAdapter = ReactiveAdapterRegistry.getSharedInstance().getAdapter(null, reply);
if (reply instanceof org.springframework.util.concurrent.ListenableFuture<?>
|| reply instanceof CompletableFuture<?>
|| reactiveAdapter != null) {

if (replyChannel instanceof ReactiveStreamsSubscribableChannel reactiveStreamsSubscribableChannel) {
Publisher<?> reactiveReply = toPublisherReply(reply, reactiveAdapter);
reactiveStreamsSubscribableChannel
.subscribeTo(
Flux.from(reactiveReply)
.doOnError((ex) -> sendErrorMessage(requestMessage, ex))
.map(result -> createOutputMessage(result, requestHeaders)));
}
else {
CompletableFuture<?> futureReply = toFutureReply(reply, reactiveAdapter);
futureReply.whenComplete(new ReplyFutureCallback(requestMessage, replyChannel));
}

return;
}
}
else {
sendOutput(createOutputMessage(reply, requestHeaders), replyChannel, false);
}

sendOutput(createOutputMessage(reply, requestHeaders), replyChannel, false);
}

private static Publisher<?> toPublisherReply(Object reply, @Nullable ReactiveAdapter reactiveAdapter) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public void testDelayWithDefaultSchedulerCustomDelayHeader() {
long start = System.currentTimeMillis();
inputA.send(builder.build());
assertThat(outputA.receive(10000)).isNotNull();
assertThat(System.currentTimeMillis() - start).isCloseTo(2000, withinPercentage(5));
assertThat(System.currentTimeMillis() - start).isCloseTo(2000, withinPercentage(10));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.springframework.integration.dsl.flowservices;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.withinPercentage;

import java.time.Instant;
import java.util.Collection;
Expand All @@ -25,7 +26,6 @@
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;

import org.assertj.core.data.Percentage;
import org.junit.jupiter.api.Test;

import org.springframework.aop.framework.Advised;
Expand Down Expand Up @@ -145,7 +145,7 @@ public void noDoubleStartForEndpoints() {
.isEqualTo("B");

assertThat(receive2.getHeaders().getTimestamp() - receive1.getHeaders().getTimestamp())
.isCloseTo(500, Percentage.withPercentage(10));
.isCloseTo(500, withinPercentage(20));
}

@Configuration
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015 the original author or authors.
* Copyright 2015-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -64,10 +64,10 @@ public HazelcastClusterMonitorMessageProducer(HazelcastInstance hazelcastInstanc
}

public void setMonitorEventTypes(String monitorEventTypes) {
final Set<String> monitorTypes =
Set<String> types =
HazelcastIntegrationDefinitionValidator.validateEnumType(ClusterMonitorType.class, monitorEventTypes);
Assert.notEmpty(monitorTypes, "'monitorTypes' must have elements");
this.monitorTypes = monitorTypes;
Assert.notEmpty(types, "'monitorTypes' must have elements");
this.monitorTypes = types;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2021 the original author or authors.
* Copyright 2017-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -76,8 +76,8 @@ protected Object doRemove(Object id) {
@Override
protected Collection<?> doListKeys(String keyPattern) {
Assert.hasText(keyPattern, "'keyPattern' must not be empty");
keyPattern = keyPattern.replaceAll("\\*", "%");
return this.map.keySet(Predicates.like(QueryConstants.KEY_ATTRIBUTE_NAME.value(), keyPattern));
return this.map.keySet(Predicates.like(QueryConstants.KEY_ATTRIBUTE_NAME.value(),
keyPattern.replaceAll("\\*", "%")));
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2020 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -59,7 +59,7 @@ public class UnicastReceivingChannelAdapter extends AbstractInternetProtocolRece

private int soSendBufferSize = -1;

private SocketCustomizer socketCustomizer = socket -> { };
private SocketCustomizer socketCustomizer = (aSocket) -> { };

/**
* Constructs a UnicastReceivingChannelAdapter that listens on the specified port.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2001-2020 the original author or authors.
* Copyright 2001-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -50,7 +50,7 @@
/**
* A {@link org.springframework.messaging.MessageHandler} implementation that maps a Message into
* a UDP datagram packet and sends that to the specified host and port.
*
* <p>
* Messages can be basic, with no support for reliability, can be prefixed
* by a length so the receiving end can detect truncation, and can require
* a UDP acknowledgment to confirm delivery.
Expand Down Expand Up @@ -101,7 +101,7 @@ public class UnicastSendingMessageHandler extends

private EvaluationContext evaluationContext;

private SocketCustomizer socketCustomizer = socket -> { };
private SocketCustomizer socketCustomizer = (aSocket) -> { };

private volatile CountDownLatch ackLatch;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,14 +305,13 @@ private ClientSession initClientSession() throws IOException {
initClient();

Duration verifyTimeout = this.timeout != null ? Duration.ofMillis(this.timeout) : null;
HostConfigEntry hostConfig = this.hostConfig;
if (hostConfig == null) {
hostConfig =
new HostConfigEntry(SshdSocketAddress.isIPv6Address(this.host) ? "" : this.host, this.host,
HostConfigEntry config = this.hostConfig;
if (config == null) {
config = new HostConfigEntry(SshdSocketAddress.isIPv6Address(this.host) ? "" : this.host, this.host,
this.port, this.user);
}
ClientSession clientSession =
this.sshClient.connect(hostConfig)
this.sshClient.connect(config)
.verify(verifyTimeout)
.getSession();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -517,9 +517,9 @@ private static Mono<?> queryParams(ServerHttpRequest request) {
}

private static MediaType selectMoreSpecificMediaType(MediaType acceptable, MediaType producible) {
producible = producible.copyQualityValue(acceptable);
if (acceptable.isLessSpecific(producible)) {
return producible;
MediaType producibleToUse = producible.copyQualityValue(acceptable);
if (acceptable.isLessSpecific(producibleToUse)) {
return producibleToUse;
}
else {
return acceptable;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -41,7 +41,9 @@ public AggregatedXmlMessageValidationException(List<Throwable> exceptions) {
public String getMessage() {
StringBuilder message = new StringBuilder("Multiple causes:\n");
for (Throwable exception : this.exceptions) {
message.append(" " + exception.getMessage() + "\n");
message.append(" ")
.append(exception.getMessage())
.append("\n");
}
return message.toString();
}
Expand Down

0 comments on commit 5f1c0c1

Please sign in to comment.