forked from apache/storm
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
1. SolrUpdate Bolt 2. Trident State implementation 3. Fields Mapper 4. JSON Mapper 5. Integration Tests
- Loading branch information
Showing
33 changed files
with
1,592 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -28,7 +28,7 @@ target | |
*.ipr | ||
*.iws | ||
.idea | ||
.* | ||
#.* | ||
!/.travis.yml | ||
!/.gitignore | ||
_site | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,110 @@ | ||
<?xml version="1.0" encoding="UTF-8"?> | ||
<project xmlns="http://maven.apache.org/POM/4.0.0" | ||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | ||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> | ||
<parent> | ||
<artifactId>storm</artifactId> | ||
<groupId>org.apache.storm</groupId> | ||
<version>0.11.0-SNAPSHOT</version> | ||
<relativePath>../../pom.xml</relativePath> | ||
</parent> | ||
<modelVersion>4.0.0</modelVersion> | ||
|
||
<artifactId>storm-solr</artifactId> | ||
|
||
<developers> | ||
<developer> | ||
<id>Hugo-Louro</id> | ||
<name>Hugo Louro</name> | ||
<email>hmclouro@gmail.com</email> | ||
</developer> | ||
</developers> | ||
|
||
<dependencies> | ||
<dependency> | ||
<groupId>org.apache.storm</groupId> | ||
<artifactId>storm-core</artifactId> | ||
<version>${project.version}</version> | ||
<scope>provided</scope> | ||
</dependency> | ||
<!-- Solr and its dependencies --> | ||
<dependency> | ||
<groupId>org.apache.solr</groupId> | ||
<artifactId>solr-solrj</artifactId> | ||
<version>5.2.1</version> | ||
<scope>compile</scope> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.apache.solr</groupId> | ||
<artifactId>solr-core</artifactId> | ||
<version>5.2.1</version> | ||
<scope>test</scope> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.apache.solr</groupId> | ||
<artifactId>solr-test-framework</artifactId> | ||
<version>5.2.1</version> | ||
<scope>test</scope> | ||
</dependency> | ||
<dependency> | ||
<groupId>commons-codec</groupId> | ||
<artifactId>commons-codec</artifactId> | ||
<version>1.3</version> | ||
</dependency> | ||
<dependency> | ||
<groupId>commons-httpclient</groupId> | ||
<artifactId>commons-httpclient</artifactId> | ||
<version>3.1</version> | ||
</dependency> | ||
<dependency> | ||
<groupId>commons-io</groupId> | ||
<artifactId>commons-io</artifactId> | ||
<version>1.4</version> | ||
</dependency> | ||
<!--test--> | ||
<dependency> | ||
<groupId>junit</groupId> | ||
<artifactId>junit</artifactId> | ||
<version>4.11</version> | ||
<scope>test</scope> | ||
</dependency> | ||
<dependency> | ||
<groupId>com.google.code.gson</groupId> | ||
<artifactId>gson</artifactId> | ||
<version>2.3.1</version> | ||
</dependency> | ||
<!--<dependency>--> | ||
<!--<groupId>com.googlecode.json-simple</groupId>--> | ||
<!--<artifactId>json-simple</artifactId>--> | ||
<!--</dependency>--> | ||
</dependencies> | ||
<build> | ||
<plugins> | ||
<plugin> | ||
<groupId>org.apache.maven.plugins</groupId> | ||
<artifactId>maven-jar-plugin</artifactId> | ||
<version>2.5</version> | ||
<executions> | ||
<execution> | ||
<goals> | ||
<goal>test-jar</goal> | ||
</goals> | ||
</execution> | ||
</executions> | ||
</plugin> | ||
<plugin> | ||
<artifactId>maven-assembly-plugin</artifactId> | ||
<configuration> | ||
<archive> | ||
<manifest> | ||
<mainClass>org.apache.storm.solr.topology.SolrFieldsTopology</mainClass> | ||
</manifest> | ||
</archive> | ||
<descriptorRefs> | ||
<descriptorRef>jar-with-dependencies</descriptorRef> | ||
</descriptorRefs> | ||
</configuration> | ||
</plugin> | ||
</plugins> | ||
</build> | ||
</project> |
1 change: 1 addition & 0 deletions
1
external/storm-solr/src/main/java/org/apache/storm/solr/bolt/.gitignore
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
# Created by .ignore support plugin (hsz.mobi) |
33 changes: 33 additions & 0 deletions
33
external/storm-solr/src/main/java/org/apache/storm/solr/bolt/AbstractSolrBolt.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,33 @@ | ||
package org.apache.storm.solr.bolt; | ||
|
||
import backtype.storm.task.OutputCollector; | ||
import backtype.storm.task.TopologyContext; | ||
import backtype.storm.topology.OutputFieldsDeclarer; | ||
import backtype.storm.topology.base.BaseRichBolt; | ||
import backtype.storm.tuple.Tuple; | ||
import org.apache.solr.client.solrj.SolrClient; | ||
import org.apache.solr.client.solrj.impl.CloudSolrClient; | ||
import org.apache.storm.solr.config.SolrConfig; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
import java.util.Map; | ||
|
||
/** | ||
* Created by hlouro on 7/17/15. | ||
*/ | ||
public abstract class AbstractSolrBolt extends BaseRichBolt { | ||
protected OutputCollector collector; | ||
protected SolrConfig solrConfig; | ||
protected SolrClient solrClient; | ||
|
||
public AbstractSolrBolt(SolrConfig solrConfig) { | ||
this.solrConfig = solrConfig; | ||
} | ||
|
||
@Override | ||
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { | ||
this.collector = collector; | ||
solrClient = new CloudSolrClient(solrConfig.getZkHostString()); | ||
} | ||
} |
107 changes: 107 additions & 0 deletions
107
external/storm-solr/src/main/java/org/apache/storm/solr/bolt/SolrUpdateBolt.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,107 @@ | ||
package org.apache.storm.solr.bolt; | ||
|
||
import backtype.storm.task.OutputCollector; | ||
import backtype.storm.task.TopologyContext; | ||
import backtype.storm.topology.OutputFieldsDeclarer; | ||
import backtype.storm.tuple.Tuple; | ||
import org.apache.solr.client.solrj.SolrRequest; | ||
import org.apache.solr.client.solrj.SolrServerException; | ||
import org.apache.storm.solr.config.SolrCommitStrategy; | ||
import org.apache.storm.solr.config.SolrConfig; | ||
import org.apache.storm.solr.mapper.SolrMapper; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
import java.io.IOException; | ||
import java.util.LinkedList; | ||
import java.util.List; | ||
import java.util.Map; | ||
|
||
/** | ||
* Created by hlouro on 7/19/15. | ||
*/ | ||
public class SolrUpdateBolt extends AbstractSolrBolt { | ||
private final Logger logger = LoggerFactory.getLogger(SolrUpdateBolt.class); | ||
private final SolrMapper solrMapper; | ||
private final SolrCommitStrategy commitStgy; | ||
private List<Tuple> toCommitTuples; | ||
private final String ackFailLock = "LOCK"; //serializable lock | ||
|
||
|
||
public SolrUpdateBolt(SolrConfig solrConfig, SolrMapper solrMapper) { | ||
this(solrConfig, solrMapper, null); | ||
} | ||
|
||
public SolrUpdateBolt(SolrConfig solrConfig, SolrMapper solrMapper, SolrCommitStrategy commitStgy) { | ||
super(solrConfig); | ||
this.solrMapper = solrMapper; | ||
this.commitStgy = commitStgy; | ||
logger.info("Created {} with the following configuration: " + | ||
"[SolrConfig = {}], [SolrMapper = {}], [CommitStgy = {}]", | ||
this.getClass().getSimpleName(), solrConfig, solrMapper, commitStgy); | ||
} | ||
|
||
@Override | ||
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { | ||
super.prepare(stormConf, context, collector); | ||
this.toCommitTuples = new LinkedList<>(); | ||
} | ||
|
||
@Override | ||
public void execute(Tuple tuple) { | ||
try { | ||
SolrRequest request = solrMapper.toSolrRequest(tuple); | ||
solrClient.request(request, solrMapper.getCollection()); | ||
ack(tuple); | ||
} catch (Exception e) { | ||
fail(tuple, e); | ||
} | ||
} | ||
|
||
private void ack(Tuple tuple) throws SolrServerException, IOException { | ||
if (commitStgy == null) { | ||
collector.ack(tuple); | ||
} else { | ||
synchronized(ackFailLock) { | ||
toCommitTuples.add(tuple); | ||
commitStgy.update(); | ||
} | ||
if (commitStgy.commit()) { | ||
solrClient.commit(solrMapper.getCollection()); | ||
ackCommittedTuples(); | ||
} | ||
} | ||
} | ||
|
||
private void ackCommittedTuples() { | ||
List<Tuple> toAckTuples = getQueuedTuples(); | ||
for (Tuple tuple : toAckTuples) { | ||
collector.ack(tuple); | ||
} | ||
} | ||
|
||
private void fail(Tuple tuple, Exception e) { | ||
collector.reportError(e); | ||
|
||
if (commitStgy == null) { | ||
collector.fail(tuple); | ||
} else { | ||
List<Tuple> failedTuples = getQueuedTuples(); | ||
for (Tuple failedTuple : failedTuples) { | ||
collector.fail(failedTuple); | ||
} | ||
} | ||
} | ||
|
||
private List<Tuple> getQueuedTuples() { | ||
List<Tuple> queuedTuples; | ||
synchronized(ackFailLock) { | ||
queuedTuples = toCommitTuples; | ||
toCommitTuples = new LinkedList<>(); | ||
} | ||
return queuedTuples; | ||
} | ||
|
||
@Override | ||
public void declareOutputFields(OutputFieldsDeclarer declarer) { } | ||
} |
39 changes: 39 additions & 0 deletions
39
external/storm-solr/src/main/java/org/apache/storm/solr/config/CountBasedCommit.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
package org.apache.storm.solr.config; | ||
|
||
/** | ||
* Class defining a count based commit strategy. When the count reaches the commit threshold, | ||
* SolrInputDocuments are committed to Solr. | ||
* | ||
* Created by hlouro on 7/29/15. | ||
*/ | ||
public class CountBasedCommit implements SolrCommitStrategy { | ||
private int threshHold; | ||
private int count; | ||
|
||
/** | ||
* Initializes a count based commit strategy with the specified threshold | ||
* | ||
* @param threshold The commit threshold, defining when SolrInputDocuments should be committed to Solr | ||
* */ | ||
public CountBasedCommit(int threshold) { | ||
if (threshold < 1) { | ||
throw new IllegalArgumentException("Threshold must be a positive integer: " + threshold); | ||
} | ||
this.threshHold = threshold; | ||
} | ||
|
||
@Override | ||
public boolean commit() { | ||
return count != 0 && count % threshHold == 0; | ||
} | ||
|
||
|
||
@Override | ||
public void update() { | ||
count++; | ||
} | ||
|
||
public int getCount() { | ||
return count; | ||
} | ||
} |
14 changes: 14 additions & 0 deletions
14
external/storm-solr/src/main/java/org/apache/storm/solr/config/SolrCommitStrategy.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
package org.apache.storm.solr.config; | ||
|
||
import java.io.Serializable; | ||
|
||
/** | ||
* Strategy definining when the Solr Bolt should commit the request to Solr. | ||
* <p></p> | ||
* Created by hlouro on 7/29/15. | ||
*/ | ||
public interface SolrCommitStrategy extends Serializable { | ||
boolean commit(); | ||
|
||
void update(); | ||
} |
26 changes: 26 additions & 0 deletions
26
external/storm-solr/src/main/java/org/apache/storm/solr/config/SolrConfig.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
package org.apache.storm.solr.config; | ||
|
||
import org.apache.solr.client.solrj.SolrClient; | ||
|
||
import java.io.Serializable; | ||
|
||
/** | ||
* Class containing Solr configuration to be made available to Storm Solr bolts. Any configuration needed in | ||
* the bolts should be put in this class. | ||
* <p></p> | ||
* Created by hlouro on 7/29/15. | ||
*/ | ||
public class SolrConfig implements Serializable { | ||
private String zkHostString; | ||
|
||
/** | ||
* @param zkHostString Zookeeper host string as defined in the {@link SolrClient} constructor | ||
* */ | ||
public SolrConfig(String zkHostString) { | ||
this.zkHostString = zkHostString; | ||
} | ||
|
||
public String getZkHostString() { | ||
return zkHostString; | ||
} | ||
} |
1 change: 1 addition & 0 deletions
1
external/storm-solr/src/main/java/org/apache/storm/solr/mapper/.gitignore
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
# Created by .ignore support plugin (hsz.mobi) |
Oops, something went wrong.