Skip to content

Commit

Permalink
[SPARK-8015] [FLUME] Remove Guava dependency from flume-sink.
Browse files Browse the repository at this point in the history
The minimal change would be to disable shading of Guava in the module,
and rely on the transitive dependency from other libraries instead. But
since Guava's use is so localized, I think it's better to just not use
it instead, so I replaced that code and removed all traces of Guava from
the module's build.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes apache#6555 from vanzin/SPARK-8015 and squashes the following commits:

c0ceea8 [Marcelo Vanzin] Add comments about dependency management.
c38228d [Marcelo Vanzin] Add guava dep in test scope.
b7a0349 [Marcelo Vanzin] Add libthrift exclusion.
6e0942d [Marcelo Vanzin] Add comment in pom.
2d79260 [Marcelo Vanzin] [SPARK-8015] [flume] Remove Guava dependency from flume-sink.
  • Loading branch information
Marcelo Vanzin authored and tdas committed Jun 2, 2015
1 parent 1bb5d71 commit 0071bd8
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 7 deletions.
39 changes: 39 additions & 0 deletions external/flume-sink/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,46 @@
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-sdk</artifactId>
<exclusions>
<!-- Guava is excluded to avoid its use in this module. -->
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
<!--
Exclude libthrift since the flume poms seem to confuse sbt, which fails to find the
dependency.
-->
<exclusion>
<groupId>org.apache.thrift</groupId>
<artifactId>libthrift</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<exclusions>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.thrift</groupId>
<artifactId>libthrift</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
</dependency>
<dependency>
<!-- Add Guava in test scope since flume actually needs it. -->
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<!--
Netty explicitly added in test as it has been excluded from
Expand Down Expand Up @@ -85,6 +116,14 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<configuration>
<!-- Disable all relocations defined in the parent pom. -->
<relocations combine.self="override" />
</configuration>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import java.util.concurrent.atomic.AtomicLong

import scala.collection.mutable

import com.google.common.util.concurrent.ThreadFactoryBuilder
import org.apache.flume.Channel
import org.apache.commons.lang3.RandomStringUtils

Expand All @@ -45,8 +44,7 @@ import org.apache.commons.lang3.RandomStringUtils
private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Channel,
val transactionTimeout: Int, val backOffInterval: Int) extends SparkFlumeProtocol with Logging {
val transactionExecutorOpt = Option(Executors.newFixedThreadPool(threads,
new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("Spark Sink Processor Thread - %d").build()))
new SparkSinkThreadFactory("Spark Sink Processor Thread - %d")))
// Protected by `sequenceNumberToProcessor`
private val sequenceNumberToProcessor = mutable.HashMap[CharSequence, TransactionProcessor]()
// This sink will not persist sequence numbers and reuses them if it gets restarted.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.streaming.flume.sink

import java.util.concurrent.ThreadFactory
import java.util.concurrent.atomic.AtomicLong

/**
* Thread factory that generates daemon threads with a specified name format.
*/
private[sink] class SparkSinkThreadFactory(nameFormat: String) extends ThreadFactory {

private val threadId = new AtomicLong()

override def newThread(r: Runnable): Thread = {
val t = new Thread(r, nameFormat.format(threadId.incrementAndGet()))
t.setDaemon(true)
t
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import scala.collection.JavaConversions._
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}

import com.google.common.util.concurrent.ThreadFactoryBuilder
import org.apache.avro.ipc.NettyTransceiver
import org.apache.avro.ipc.specific.SpecificRequestor
import org.apache.flume.Context
Expand Down Expand Up @@ -194,9 +193,8 @@ class SparkSinkSuite extends FunSuite {
count: Int): Seq[(NettyTransceiver, SparkFlumeProtocol.Callback)] = {

(1 to count).map(_ => {
lazy val channelFactoryExecutor =
Executors.newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(true).
setNameFormat("Flume Receiver Channel Thread - %d").build())
lazy val channelFactoryExecutor = Executors.newCachedThreadPool(
new SparkSinkThreadFactory("Flume Receiver Channel Thread - %d"))
lazy val channelFactory =
new NioClientSocketChannelFactory(channelFactoryExecutor, channelFactoryExecutor)
val transceiver = new NettyTransceiver(address, channelFactory)
Expand Down

0 comments on commit 0071bd8

Please sign in to comment.