Skip to content

Commit

Permalink
STORM-851: Storm Solr Connector
Browse files Browse the repository at this point in the history
1. SolrUpdate Bolt
2. Trident State implementation
3. Fields Mapper
4. JSON Mapper
5. Integration Tests
  • Loading branch information
hmcl committed Aug 25, 2015
1 parent 528958c commit 4ab6e0c
Show file tree
Hide file tree
Showing 33 changed files with 1,592 additions and 1 deletion.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ target
*.ipr
*.iws
.idea
.*
#.*
!/.travis.yml
!/.gitignore
_site
Expand Down
110 changes: 110 additions & 0 deletions external/storm-solr/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>storm</artifactId>
<groupId>org.apache.storm</groupId>
<version>0.11.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>storm-solr</artifactId>

<developers>
<developer>
<id>Hugo-Louro</id>
<name>Hugo Louro</name>
<email>hmclouro@gmail.com</email>
</developer>
</developers>

<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<!-- Solr and its dependencies -->
<dependency>
<groupId>org.apache.solr</groupId>
<artifactId>solr-solrj</artifactId>
<version>5.2.1</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.solr</groupId>
<artifactId>solr-core</artifactId>
<version>5.2.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.solr</groupId>
<artifactId>solr-test-framework</artifactId>
<version>5.2.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.3</version>
</dependency>
<dependency>
<groupId>commons-httpclient</groupId>
<artifactId>commons-httpclient</artifactId>
<version>3.1</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>1.4</version>
</dependency>
<!--test-->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.3.1</version>
</dependency>
<!--<dependency>-->
<!--<groupId>com.googlecode.json-simple</groupId>-->
<!--<artifactId>json-simple</artifactId>-->
<!--</dependency>-->
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>2.5</version>
<executions>
<execution>
<goals>
<goal>test-jar</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<archive>
<manifest>
<mainClass>org.apache.storm.solr.topology.SolrFieldsTopology</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# Created by .ignore support plugin (hsz.mobi)
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package org.apache.storm.solr.bolt;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.storm.solr.config.SolrConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;

/**
* Created by hlouro on 7/17/15.
*/
public abstract class AbstractSolrBolt extends BaseRichBolt {
protected OutputCollector collector;
protected SolrConfig solrConfig;
protected SolrClient solrClient;

public AbstractSolrBolt(SolrConfig solrConfig) {
this.solrConfig = solrConfig;
}

@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
solrClient = new CloudSolrClient(solrConfig.getZkHostString());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package org.apache.storm.solr.bolt;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.storm.solr.config.SolrCommitStrategy;
import org.apache.storm.solr.config.SolrConfig;
import org.apache.storm.solr.mapper.SolrMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;

/**
* Created by hlouro on 7/19/15.
*/
public class SolrUpdateBolt extends AbstractSolrBolt {
private final Logger logger = LoggerFactory.getLogger(SolrUpdateBolt.class);
private final SolrMapper solrMapper;
private final SolrCommitStrategy commitStgy;
private List<Tuple> toCommitTuples;
private final String ackFailLock = "LOCK"; //serializable lock


public SolrUpdateBolt(SolrConfig solrConfig, SolrMapper solrMapper) {
this(solrConfig, solrMapper, null);
}

public SolrUpdateBolt(SolrConfig solrConfig, SolrMapper solrMapper, SolrCommitStrategy commitStgy) {
super(solrConfig);
this.solrMapper = solrMapper;
this.commitStgy = commitStgy;
logger.info("Created {} with the following configuration: " +
"[SolrConfig = {}], [SolrMapper = {}], [CommitStgy = {}]",
this.getClass().getSimpleName(), solrConfig, solrMapper, commitStgy);
}

@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
super.prepare(stormConf, context, collector);
this.toCommitTuples = new LinkedList<>();
}

@Override
public void execute(Tuple tuple) {
try {
SolrRequest request = solrMapper.toSolrRequest(tuple);
solrClient.request(request, solrMapper.getCollection());
ack(tuple);
} catch (Exception e) {
fail(tuple, e);
}
}

private void ack(Tuple tuple) throws SolrServerException, IOException {
if (commitStgy == null) {
collector.ack(tuple);
} else {
synchronized(ackFailLock) {
toCommitTuples.add(tuple);
commitStgy.update();
}
if (commitStgy.commit()) {
solrClient.commit(solrMapper.getCollection());
ackCommittedTuples();
}
}
}

private void ackCommittedTuples() {
List<Tuple> toAckTuples = getQueuedTuples();
for (Tuple tuple : toAckTuples) {
collector.ack(tuple);
}
}

private void fail(Tuple tuple, Exception e) {
collector.reportError(e);

if (commitStgy == null) {
collector.fail(tuple);
} else {
List<Tuple> failedTuples = getQueuedTuples();
for (Tuple failedTuple : failedTuples) {
collector.fail(failedTuple);
}
}
}

private List<Tuple> getQueuedTuples() {
List<Tuple> queuedTuples;
synchronized(ackFailLock) {
queuedTuples = toCommitTuples;
toCommitTuples = new LinkedList<>();
}
return queuedTuples;
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) { }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package org.apache.storm.solr.config;

/**
* Class defining a count based commit strategy. When the count reaches the commit threshold,
* SolrInputDocuments are committed to Solr.
*
* Created by hlouro on 7/29/15.
*/
public class CountBasedCommit implements SolrCommitStrategy {
private int threshHold;
private int count;

/**
* Initializes a count based commit strategy with the specified threshold
*
* @param threshold The commit threshold, defining when SolrInputDocuments should be committed to Solr
* */
public CountBasedCommit(int threshold) {
if (threshold < 1) {
throw new IllegalArgumentException("Threshold must be a positive integer: " + threshold);
}
this.threshHold = threshold;
}

@Override
public boolean commit() {
return count != 0 && count % threshHold == 0;
}


@Override
public void update() {
count++;
}

public int getCount() {
return count;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package org.apache.storm.solr.config;

import java.io.Serializable;

/**
* Strategy definining when the Solr Bolt should commit the request to Solr.
* <p></p>
* Created by hlouro on 7/29/15.
*/
public interface SolrCommitStrategy extends Serializable {
boolean commit();

void update();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package org.apache.storm.solr.config;

import org.apache.solr.client.solrj.SolrClient;

import java.io.Serializable;

/**
* Class containing Solr configuration to be made available to Storm Solr bolts. Any configuration needed in
* the bolts should be put in this class.
* <p></p>
* Created by hlouro on 7/29/15.
*/
public class SolrConfig implements Serializable {
private String zkHostString;

/**
* @param zkHostString Zookeeper host string as defined in the {@link SolrClient} constructor
* */
public SolrConfig(String zkHostString) {
this.zkHostString = zkHostString;
}

public String getZkHostString() {
return zkHostString;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# Created by .ignore support plugin (hsz.mobi)
Loading

0 comments on commit 4ab6e0c

Please sign in to comment.