Skip to content

Commit

Permalink
INT-3147: (3167,3173,1941) Improve MetadataStore
Browse files Browse the repository at this point in the history
Previously, `MetadataStore` couldn't be configured for Twitter Adapters
- only a global one could be used.
The `metadataKey` was generated automatically with a 'difficult' value.

* Register all `MessageSource` for `SourcePollingChannelAdapter`
as beans with id based on adapter id and prefix '.source' (INT-3147)
* Polishing parser to get rid of explicit `MessageSource` beans. (INT-3147)
* Make Feed and Twitter adapters `id` attribute as required -
now it presents a `metadataKey` for `MetadataStore` (INT-3147)
* Add to Twitter adapters a reference attribute for `MetadataStore` (INT-3173)
* Add Twitter adapters `poll-skip-period` attribute (INT-3167)
* Add and implement `MetadataStore#remove` (INT-1941)
* Make `MetadataStore` as `@ManagedResource` (INT-1941)
* Polishing tests

JIRAs:
https://jira.springsource.org/browse/INT-3147
https://jira.springsource.org/browse/INT-3167
https://jira.springsource.org/browse/INT-3173
https://jira.springsource.org/browse/INT-1941

INT-3147: Polishing and fixes

* add domain suffix to `metadataKey`
* change contract of `MetadataStore.remove`
* remove timeout window from `AbstractTwitterMessageSource`
* polishing and fix `SearchReceivingMessageSourceWithRedisTests`

INT-3147: Rebasing and polishing

INT-3147: fix 'metadata' package tangle

INT-3147 Doc Polishing
  • Loading branch information
Artem Bilan authored and garyrussell committed Nov 4, 2013
1 parent 1fb838d commit 5be8ef3
Show file tree
Hide file tree
Showing 40 changed files with 393 additions and 271 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.w3c.dom.Element;

import org.springframework.beans.BeanMetadataElement;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.beans.factory.support.AbstractBeanDefinition;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.xml.ParserContext;
Expand All @@ -31,18 +32,25 @@
* @author Mark Fisher
* @author Gary Russell
* @author Oleg Zhurakousky
* @author Artem Bilan
*/
public abstract class AbstractPollingInboundChannelAdapterParser extends AbstractChannelAdapterParser {

@Override
@SuppressWarnings("unchecked")
protected AbstractBeanDefinition doParse(Element element, ParserContext parserContext, String channelName) {
BeanMetadataElement source = this.parseSource(element, parserContext);
if (source == null) {
parserContext.getReaderContext().error("failed to parse source", element);
}

String channelAdapterId = this.resolveId(element, (AbstractBeanDefinition) source, parserContext);
String sourceBeanName = channelAdapterId + ".source";
parserContext.getRegistry().registerBeanDefinition(sourceBeanName, (BeanDefinition) source);

BeanDefinitionBuilder adapterBuilder = BeanDefinitionBuilder
.genericBeanDefinition(SourcePollingChannelAdapterFactoryBean.class);
adapterBuilder.addPropertyValue("source", source);
adapterBuilder.addPropertyReference("source", sourceBeanName);
adapterBuilder.addPropertyReference("outputChannel", channelName);
IntegrationNamespaceUtils.setValueIfAttributeDefined(adapterBuilder, element, "send-timeout");
Element pollerElement = DomUtils.getChildElementByTagName(element, "poller");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.springframework.core.convert.ConversionService;
import org.springframework.expression.spel.support.StandardEvaluationContext;
import org.springframework.integration.MessageChannel;
import org.springframework.integration.store.metadata.MetadataStore;
import org.springframework.integration.metadata.MetadataStore;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.util.Assert;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@
* limitations under the License.
*/

package org.springframework.integration.store.metadata;
package org.springframework.integration.metadata;

import org.springframework.jmx.export.annotation.ManagedAttribute;
import org.springframework.jmx.export.annotation.ManagedResource;

/**
* Strategy interface for storing metadata from certain adapters
Expand All @@ -25,6 +28,7 @@
* @author Mark Fisher
* @since 2.0
*/
@ManagedResource
public interface MetadataStore {

/**
Expand All @@ -35,6 +39,15 @@ public interface MetadataStore {
/**
* Reads a value for the given key from this MetadataStore.
*/
@ManagedAttribute
String get(String key);

/**
* Remove a value for the given key from this MetadataStore.
* return the previous value associated with <tt>key</tt>, or
* <tt>null</tt> if there was no mapping for <tt>key</tt>.
*/
@ManagedAttribute
String remove(String key);

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package org.springframework.integration.store.metadata;
package org.springframework.integration.metadata;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
Expand Down Expand Up @@ -86,6 +86,12 @@ public String get(String key) {
return this.metadata.getProperty(key);
}

@Override
@SuppressWarnings("uchecked")
public String remove(String key) {
return (String) this.metadata.remove(key);
}

public void destroy() throws Exception {
this.saveMetadata();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
* specific language governing permissions and limitations under the License.
*/

package org.springframework.integration.store.metadata;
package org.springframework.integration.metadata;

import java.util.HashMap;
import java.util.Map;
Expand All @@ -37,4 +37,9 @@ public String get(String key) {
return this.metadata.get(key);
}

@Override
public String remove(String key) {
return metadata.remove(key);
}

}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/**
* Provides classes supporting metadata stores.
*/
package org.springframework.integration.store.metadata;
package org.springframework.integration.metadata;
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package org.springframework.integration.store.metadata;
package org.springframework.integration.metadata;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
Expand All @@ -27,7 +27,6 @@

import org.springframework.core.io.FileSystemResource;
import org.springframework.core.io.support.PropertiesLoaderUtils;
import org.springframework.integration.store.metadata.PropertiesPersistingMetadataStore;

/**
* @author Oleg Zhurakousky
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2010 the original author or authors.
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -23,6 +23,7 @@
import org.springframework.beans.factory.xml.ParserContext;
import org.springframework.integration.config.xml.AbstractPollingInboundChannelAdapterParser;
import org.springframework.integration.config.xml.IntegrationNamespaceUtils;
import org.springframework.integration.feed.inbound.FeedEntryMessageSource;
import org.springframework.util.StringUtils;

/**
Expand All @@ -31,20 +32,23 @@
* @author Josh Long
* @author Oleg Zhurakousky
* @author Mark Fisher
* @author Gunnar Hillert
* @author Artem Bilan
* @since 2.0
*/
public class FeedInboundChannelAdapterParser extends AbstractPollingInboundChannelAdapterParser {

@Override
protected BeanMetadataElement parseSource(final Element element, final ParserContext parserContext) {
BeanDefinitionBuilder sourceBuilder = BeanDefinitionBuilder.genericBeanDefinition(
"org.springframework.integration.feed.inbound.FeedEntryMessageSource");
BeanDefinitionBuilder sourceBuilder = BeanDefinitionBuilder.genericBeanDefinition(FeedEntryMessageSource.class);
sourceBuilder.addConstructorArgValue(element.getAttribute("url"));
sourceBuilder.addConstructorArgValue(element.getAttribute(ID_ATTRIBUTE));
String feedFetcherRef = element.getAttribute("feed-fetcher");
if (StringUtils.hasText(feedFetcherRef)) {
sourceBuilder.addConstructorArgReference(feedFetcherRef);
}
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(sourceBuilder, element, "metadata-store");

return sourceBuilder.getBeanDefinition();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@
import org.springframework.integration.context.IntegrationContextUtils;
import org.springframework.integration.context.IntegrationObjectSupport;
import org.springframework.integration.core.MessageSource;
import org.springframework.integration.store.metadata.MetadataStore;
import org.springframework.integration.store.metadata.SimpleMetadataStore;
import org.springframework.integration.metadata.MetadataStore;
import org.springframework.integration.metadata.SimpleMetadataStore;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
Expand All @@ -52,6 +52,7 @@
* @author Josh Long
* @author Mario Gray
* @author Oleg Zhurakousky
* @author Artem Bilan
* @since 2.0
*/
public class FeedEntryMessageSource extends IntegrationObjectSupport implements MessageSource<SyndEntry> {
Expand All @@ -62,7 +63,7 @@ public class FeedEntryMessageSource extends IntegrationObjectSupport implements

private final Queue<SyndEntry> entries = new ConcurrentLinkedQueue<SyndEntry>();

private volatile String metadataKey;
private final String metadataKey;

private volatile MetadataStore metadataStore;

Expand All @@ -82,17 +83,19 @@ public class FeedEntryMessageSource extends IntegrationObjectSupport implements
* If the feed URL has a protocol other than http*, consider providing a custom implementation of the
* {@link FeedFetcher} via the alternate constructor.
*/
public FeedEntryMessageSource(URL feedUrl) {
this(feedUrl, new HttpURLFeedFetcher(HashMapFeedInfoCache.getInstance()));
public FeedEntryMessageSource(URL feedUrl, String metadataKey) {
this(feedUrl, metadataKey, new HttpURLFeedFetcher(HashMapFeedInfoCache.getInstance()));
}

/**
* Creates a FeedEntryMessageSource that will use the provided FeedFetcher to read from the given feed URL.
*/
public FeedEntryMessageSource(URL feedUrl, FeedFetcher feedFetcher) {
public FeedEntryMessageSource(URL feedUrl, String metadataKey, FeedFetcher feedFetcher) {
Assert.notNull(feedUrl, "feedUrl must not be null");
Assert.notNull(metadataKey, "metadataKey must not be null");
Assert.notNull(feedFetcher, "feedFetcher must not be null");
this.feedUrl = feedUrl;
this.metadataKey = metadataKey + "." + this.feedUrl;
this.feedFetcher = feedFetcher;
}

Expand Down Expand Up @@ -130,18 +133,7 @@ protected void onInit() throws Exception {
this.metadataStore = new SimpleMetadataStore();
}
}
StringBuilder metadataKeyBuilder = new StringBuilder();
if (StringUtils.hasText(this.getComponentType())) {
metadataKeyBuilder.append(this.getComponentType() + ".");
}
if (StringUtils.hasText(this.getComponentName())) {
metadataKeyBuilder.append(this.getComponentName() + ".");
}
else if (logger.isWarnEnabled()) {
logger.warn("FeedEntryMessageSource has no name. MetadataStore key might not be unique.");
}
metadataKeyBuilder.append(this.feedUrl);
this.metadataKey = metadataKeyBuilder.toString();

String lastTimeValue = this.metadataStore.get(this.metadataKey);
if (StringUtils.hasText(lastTimeValue)) {
this.lastTime = Long.parseLong(lastTimeValue);
Expand Down Expand Up @@ -237,6 +229,7 @@ public int compare(SyndEntry entry1, SyndEntry entry2) {
}
return (date2 == null) ? 1 : 0;
}

}


Expand All @@ -258,6 +251,7 @@ else if (FetcherEvent.EVENT_TYPE_FEED_UNCHANGED.equals(eventType)) {
}
}
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,28 @@
<xsd:sequence>
<xsd:element ref="integration:poller" minOccurs="0" maxOccurs="1" />
</xsd:sequence>
<xsd:attributeGroup ref="integration:channelAdapterAttributes"/>
<xsd:attribute name="id" type="xsd:string" use="required">
<xsd:annotation>
<xsd:documentation>
The bean id of this Polling Endpoint; the MessageSource is also registered with this id
plus a suffix '.source'; also used as the
MetaDataStore key with suffix '.' + feedUrl - The URL for an RSS or ATOM feed.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="channel" type="xsd:string">
<xsd:annotation>
<xsd:appinfo>
<tool:annotation kind="ref">
<tool:expected-type type="org.springframework.integration.MessageChannel" />
</tool:annotation>
</xsd:appinfo>
<xsd:documentation>
Identifies the channel attached to this adapter, to which messages will be sent.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attributeGroup ref="integration:smartLifeCycleAttributeGroup"/>
<xsd:attribute name="url" type="xsd:string" use="required">
<xsd:annotation>
<xsd:documentation>
Expand Down Expand Up @@ -54,7 +75,7 @@
</xsd:documentation>
<xsd:appinfo>
<tool:annotation kind="ref">
<tool:expected-type type="org.springframework.integration.store.MetadataStore" />
<tool:expected-type type="org.springframework.integration.metadata.MetadataStore" />
</tool:annotation>
</xsd:appinfo>
</xsd:annotation>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,6 @@

<bean id="fileUrlFeedFetcher" class="org.springframework.integration.feed.inbound.FileUrlFeedFetcher"/>

<bean id="metadataStore" class="org.springframework.integration.store.metadata.PropertiesPersistingMetadataStore"/>
<bean id="metadataStore" class="org.springframework.integration.metadata.PropertiesPersistingMetadataStore"/>

</beans>

This file was deleted.

Loading

0 comments on commit 5be8ef3

Please sign in to comment.