Skip to content

Commit

Permalink
INT-3642: Improve MessageGroupStore Removal
Browse files Browse the repository at this point in the history
JIRA: https://jira.spring.io/browse/INT-3642

Currently, `removeMessageFromGroup` rebuilds the group on every removal.

In every case where this method is used in the framework, the result is not used.

Add `removeMessagesFromGroup` that removes a collection of messages and returns no result.

INT-3642: Polishing - PR Comments

INT-3642: Polishing and Fix Group Metadata Size
  • Loading branch information
garyrussell authored and artembilan committed Jun 23, 2015
1 parent 0065ed8 commit f3d525a
Show file tree
Hide file tree
Showing 23 changed files with 472 additions and 107 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2014 the original author or authors.
* Copyright 2002-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
Expand All @@ -26,6 +26,7 @@
*
* @author Oleg Zhurakousky
* @author Artem Bilan
* @author Gary Russell
* @since 2.1
*/
public class AggregatingMessageHandler extends AbstractCorrelatingMessageHandler {
Expand Down Expand Up @@ -64,9 +65,7 @@ protected void afterRelease(MessageGroup messageGroup, Collection<Message<?>> co
remove(messageGroup);
}
else {
for (Message<?> message : messageGroup.getMessages()) {
this.messageStore.removeMessageFromGroup(messageGroup.getGroupId(), message);
}
this.messageStore.removeMessagesFromGroup(messageGroup.getGroupId(), messageGroup.getMessages());
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2014 the original author or authors.
* Copyright 2002-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
Expand Down Expand Up @@ -43,6 +43,7 @@
*
* @author Iwein Fuld
* @author Oleg Zhurakousky
* @author Gary Russell
*
* @see AbstractCorrelatingMessageHandler
*/
Expand Down Expand Up @@ -118,7 +119,7 @@ public Message<Object> receive() {
Iterator<Message<?>> messages = group.getMessages().iterator();
if (messages.hasNext()) {
nextMessage = messages.next();
store.removeMessageFromGroup(key, nextMessage);
this.store.removeMessagesFromGroup(key, nextMessage);
if (log.isDebugEnabled()) {
log.debug(String.format("Released message for key [%s]: %s.", key, nextMessage));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2011 the original author or authors.
* Copyright 2002-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
Expand All @@ -25,6 +25,7 @@
* Will remove {@link MessageGroup}s only if 'sequenceSize' is provided and reached.
*
* @author Oleg Zhurakousky
* @author Gary Russell
* @since 2.1
*/
public class ResequencingMessageHandler extends AbstractCorrelatingMessageHandler {
Expand Down Expand Up @@ -83,9 +84,7 @@ protected void afterRelease(MessageGroup messageGroup, Collection<Message<?>> co
if (completedMessages != null){
int lastReleasedSequenceNumber = this.findLastReleasedSequenceNumber(messageGroup.getGroupId(), completedMessages);
messageStore.setLastReleasedSequenceNumberForGroup(messageGroup.getGroupId(), lastReleasedSequenceNumber);
for (Message<?> msg : completedMessages) {
this.messageStore.removeMessageFromGroup(messageGroup.getGroupId(), msg);
}
this.messageStore.removeMessagesFromGroup(messageGroup.getGroupId(), completedMessages);
}
if (timeout) {
this.messageStore.completeGroup(messageGroup.getGroupId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ private void releaseMessage(Message<?> message) {

private void doReleaseMessage(Message<?> message) {
if (removeDelayedMessageFromMessageStore(message)) {
this.messageStore.removeMessageFromGroup(this.messageGroupId, message);
this.messageStore.removeMessagesFromGroup(this.messageGroupId, message);
this.handleMessageInternal(message);
}
else {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright 2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.store;


/**
* @author Gary Russell
* @since 4.2
*
*/
public abstract class AbstractBatchingMessageGroupStore implements BasicMessageGroupStore {

private static final int DEFAULT_REMOVE_BATCH_SIZE = 100;

private volatile int removeBatchSize = DEFAULT_REMOVE_BATCH_SIZE;

/**
* Set the batch size when bulk removing messages from groups for message stores
* that support batch removal.
* Default 100.
* @param removeBatchSize the batch size.
* @since 4.2
*/
public void setRemoveBatchSize(int removeBatchSize) {
this.removeBatchSize = removeBatchSize;
}

public int getRemoveBatchSize() {
return removeBatchSize;
}

}
Loading

0 comments on commit f3d525a

Please sign in to comment.