Skip to content

Commit

Permalink
[SPARK-13171][CORE] Replace future calls with Future
Browse files Browse the repository at this point in the history
Trivial search-and-replace to eliminate deprecation warnings in Scala 2.11.
Also works with 2.10

Author: Jakob Odersky <jakob@odersky.com>

Closes apache#11085 from jodersky/SPARK-13171.
  • Loading branch information
jodersky authored and rxin committed Feb 6, 2016
1 parent 875f507 commit 6883a51
Show file tree
Hide file tree
Showing 8 changed files with 23 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.net.URL
import java.util.concurrent.TimeoutException

import scala.collection.mutable.ListBuffer
import scala.concurrent.{future, promise, Await}
import scala.concurrent.{Await, Future, Promise}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.language.postfixOps
Expand Down Expand Up @@ -249,7 +249,7 @@ private object FaultToleranceTest extends App with Logging {

/** This includes Client retry logic, so it may take a while if the cluster is recovering. */
private def assertUsable() = {
val f = future {
val f = Future {
try {
val res = sc.parallelize(0 until 10).collect()
assertTrue(res.toList == (0 until 10))
Expand Down Expand Up @@ -283,7 +283,7 @@ private object FaultToleranceTest extends App with Logging {
numAlive == 1 && numStandby == masters.size - 1 && numLiveApps >= 1
}

val f = future {
val f = Future {
try {
while (!stateValid()) {
Thread.sleep(1000)
Expand Down Expand Up @@ -405,7 +405,7 @@ private object SparkDocker {
}

private def startNode(dockerCmd: ProcessBuilder) : (String, DockerId, File) = {
val ipPromise = promise[String]()
val ipPromise = Promise[String]()
val outFile = File.createTempFile("fault-tolerance-test", "", Utils.createTempDir())
val outStream: FileWriter = new FileWriter(outFile)
def findIpAndLog(line: String): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ private[deploy] class Worker(
// rpcEndpoint.
// Copy ids so that it can be used in the cleanup thread.
val appIds = executors.values.map(_.appId).toSet
val cleanupFuture = concurrent.future {
val cleanupFuture = concurrent.Future {
val appDirs = workDir.listFiles()
if (appDirs == null) {
throw new IOException("ERROR: Failed to list files in " + appDirs)
Expand Down
18 changes: 9 additions & 9 deletions core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.util.concurrent.Semaphore
import scala.concurrent.Await
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.concurrent.future
import scala.concurrent.Future

import org.scalatest.BeforeAndAfter
import org.scalatest.Matchers
Expand Down Expand Up @@ -103,7 +103,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft

val rdd1 = rdd.map(x => x)

future {
Future {
taskStartedSemaphore.acquire()
sc.cancelAllJobs()
taskCancelledSemaphore.release(100000)
Expand All @@ -126,7 +126,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
})

// jobA is the one to be cancelled.
val jobA = future {
val jobA = Future {
sc.setJobGroup("jobA", "this is a job to be cancelled")
sc.parallelize(1 to 10000, 2).map { i => Thread.sleep(10); i }.count()
}
Expand Down Expand Up @@ -191,7 +191,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
})

// jobA is the one to be cancelled.
val jobA = future {
val jobA = Future {
sc.setJobGroup("jobA", "this is a job to be cancelled", interruptOnCancel = true)
sc.parallelize(1 to 10000, 2).map { i => Thread.sleep(100000); i }.count()
}
Expand Down Expand Up @@ -231,7 +231,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
val f2 = rdd.countAsync()

// Kill one of the action.
future {
Future {
sem1.acquire()
f1.cancel()
JobCancellationSuite.twoJobsSharingStageSemaphore.release(10)
Expand All @@ -247,7 +247,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
// Cancel before launching any tasks
{
val f = sc.parallelize(1 to 10000, 2).map { i => Thread.sleep(10); i }.countAsync()
future { f.cancel() }
Future { f.cancel() }
val e = intercept[SparkException] { f.get() }
assert(e.getMessage.contains("cancelled") || e.getMessage.contains("killed"))
}
Expand All @@ -263,7 +263,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
})

val f = sc.parallelize(1 to 10000, 2).map { i => Thread.sleep(10); i }.countAsync()
future {
Future {
// Wait until some tasks were launched before we cancel the job.
sem.acquire()
f.cancel()
Expand All @@ -277,7 +277,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
// Cancel before launching any tasks
{
val f = sc.parallelize(1 to 10000, 2).map { i => Thread.sleep(10); i }.takeAsync(5000)
future { f.cancel() }
Future { f.cancel() }
val e = intercept[SparkException] { f.get() }
assert(e.getMessage.contains("cancelled") || e.getMessage.contains("killed"))
}
Expand All @@ -292,7 +292,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
}
})
val f = sc.parallelize(1 to 10000, 2).map { i => Thread.sleep(10); i }.takeAsync(5000)
future {
Future {
sem.acquire()
f.cancel()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.io.InputStream
import java.util.concurrent.Semaphore

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.future
import scala.concurrent.Future

import org.mockito.Matchers.{any, eq => meq}
import org.mockito.Mockito._
Expand Down Expand Up @@ -149,7 +149,7 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
when(transfer.fetchBlocks(any(), any(), any(), any(), any())).thenAnswer(new Answer[Unit] {
override def answer(invocation: InvocationOnMock): Unit = {
val listener = invocation.getArguments()(4).asInstanceOf[BlockFetchingListener]
future {
Future {
// Return the first two blocks, and wait till task completion before returning the 3rd one
listener.onBlockFetchSuccess(
ShuffleBlockId(0, 0, 0).toString, blocks(ShuffleBlockId(0, 0, 0)))
Expand Down Expand Up @@ -211,7 +211,7 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
when(transfer.fetchBlocks(any(), any(), any(), any(), any())).thenAnswer(new Answer[Unit] {
override def answer(invocation: InvocationOnMock): Unit = {
val listener = invocation.getArguments()(4).asInstanceOf[BlockFetchingListener]
future {
Future {
// Return the first block, and then fail.
listener.onBlockFetchSuccess(
ShuffleBlockId(0, 0, 0).toString, blocks(ShuffleBlockId(0, 0, 0)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class CodeGenerationSuite extends SparkFunSuite with ExpressionEvalHelper {
import scala.concurrent.duration._

val futures = (1 to 20).map { _ =>
future {
Future {
GeneratePredicate.generate(EqualTo(Literal(1), Literal(1)))
GenerateMutableProjection.generate(EqualTo(Literal(1), Literal(1)) :: Nil)
GenerateOrdering.generate(Add(Literal(1), Literal(1)).asc :: Nil)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ case class BroadcastHashJoin(

// broadcastFuture is used in "doExecute". Therefore we can get the execution id correctly here.
val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
future {
Future {
// This will run in another thread. Set the execution id so that we can connect these jobs
// with the correct execution.
SQLExecution.withExecutionId(sparkContext, executionId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ case class BroadcastHashOuterJoin(

// broadcastFuture is used in "doExecute". Therefore we can get the execution id correctly here.
val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
future {
Future {
// This will run in another thread. Set the execution id so that we can connect these jobs
// with the correct execution.
SQLExecution.withExecutionId(sparkContext, executionId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import java.sql.{Date, DriverManager, SQLException, Statement}

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.{future, Await, ExecutionContext, Promise}
import scala.concurrent.{Await, ExecutionContext, Future, Promise}
import scala.concurrent.duration._
import scala.io.Source
import scala.util.{Random, Try}
Expand Down Expand Up @@ -362,7 +362,7 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
try {
// Start a very-long-running query that will take hours to finish, then cancel it in order
// to demonstrate that cancellation works.
val f = future {
val f = Future {
statement.executeQuery(
"SELECT COUNT(*) FROM test_map " +
List.fill(10)("join test_map").mkString(" "))
Expand All @@ -380,7 +380,7 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
// Cancellation is a no-op if spark.sql.hive.thriftServer.async=false
statement.executeQuery("SET spark.sql.hive.thriftServer.async=false")
try {
val sf = future {
val sf = Future {
statement.executeQuery(
"SELECT COUNT(*) FROM test_map " +
List.fill(4)("join test_map").mkString(" ")
Expand Down

0 comments on commit 6883a51

Please sign in to comment.