Skip to content

Commit

Permalink
Third draft to execute the forked tests in parallel.
Browse files Browse the repository at this point in the history
This feature is not activated by default. To enable it set `testForkedParallel` to `true`.

The test-agent then executes the tests in a thread pool.
For now it has a fixed size set to the number of available processors.
The concurrent restrictions configuration should be used.
  • Loading branch information
backuitist authored and eed3si9n committed Mar 21, 2014
1 parent 67c879b commit 4bb0873
Show file tree
Hide file tree
Showing 11 changed files with 289 additions and 138 deletions.
15 changes: 8 additions & 7 deletions main/actions/src/main/scala/sbt/ForkTests.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ private[sbt] object ForkTests
if(opts.tests.isEmpty)
constant( TestOutput(TestResult.Passed, Map.empty[String, SuiteResult], Iterable.empty) )
else
mainTestTask(runners, opts, classpath, fork, log).tagw(config.tags: _*)
mainTestTask(runners, opts, classpath, fork, log, config.parallel).tagw(config.tags: _*)
main.dependsOn( all(opts.setup) : _*) flatMap { results =>
all(opts.cleanup).join.map( _ => results)
}
}

private[this] def mainTestTask(runners: Map[TestFramework, Runner], opts: ProcessedOptions, classpath: Seq[File], fork: ForkOptions, log: Logger): Task[TestOutput] =
private[this] def mainTestTask(runners: Map[TestFramework, Runner], opts: ProcessedOptions, classpath: Seq[File], fork: ForkOptions, log: Logger, parallel: Boolean): Task[TestOutput] =
std.TaskExtra.task
{
val server = new ServerSocket(0)
Expand All @@ -41,7 +41,8 @@ private[sbt] object ForkTests
object Acceptor extends Runnable {
val resultsAcc = mutable.Map.empty[String, SuiteResult]
lazy val result = TestOutput(overall(resultsAcc.values.map(_.result)), resultsAcc.toMap, Iterable.empty)
def run: Unit = {

def run() {
val socket =
try {
server.accept()
Expand All @@ -58,21 +59,21 @@ private[sbt] object ForkTests
val is = new ObjectInputStream(socket.getInputStream)

try {
os.writeBoolean(log.ansiCodesSupported)
val config = new ForkConfiguration(log.ansiCodesSupported, parallel)
os.writeObject(config)

val taskdefs = opts.tests.map(t => new TaskDef(t.name, forkFingerprint(t.fingerprint), t.explicitlySpecified, t.selectors))
os.writeObject(taskdefs.toArray)

os.writeInt(runners.size)
for ((testFramework, mainRunner) <- runners) {
val remoteArgs = mainRunner.remoteArgs()
os.writeObject(testFramework.implClassNames.toArray)
os.writeObject(mainRunner.args)
os.writeObject(remoteArgs)
os.writeObject(mainRunner.remoteArgs)
}
os.flush()

(new React(is, os, log, opts.testListeners, resultsAcc)).react()
new React(is, os, log, opts.testListeners, resultsAcc).react()
} finally {
is.close(); os.close(); socket.close()
}
Expand Down
15 changes: 11 additions & 4 deletions main/src/main/scala/sbt/Defaults.scala
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ object Defaults extends BuildCommon
outputStrategy :== None,
exportJars :== false,
fork :== false,
testForkedParallel :== false,
javaOptions :== Nil,
sbtPlugin :== false,
crossPaths :== true,
Expand Down Expand Up @@ -358,7 +359,7 @@ object Defaults extends BuildCommon
definedTests <<= detectTests,
definedTestNames <<= definedTests map ( _.map(_.name).distinct) storeAs definedTestNames triggeredBy compile,
testFilter in testQuick <<= testQuickFilter,
executeTests <<= (streams in test, loadedTestFrameworks, testLoader, testGrouping in test, testExecution in test, fullClasspath in test, javaHome in test) flatMap allTestGroupsTask,
executeTests <<= (streams in test, loadedTestFrameworks, testLoader, testGrouping in test, testExecution in test, fullClasspath in test, javaHome in test, testForkedParallel) flatMap allTestGroupsTask,
test := {
implicit val display = Project.showContextKey(state.value)
Tests.showResults(streams.value.log, executeTests.value, noTestsMessage(resolvedScoped.value))
Expand Down Expand Up @@ -468,15 +469,15 @@ object Defaults extends BuildCommon
implicit val display = Project.showContextKey(state.value)
val modifiedOpts = Tests.Filters(filter(selected)) +: Tests.Argument(frameworkOptions : _*) +: config.options
val newConfig = config.copy(options = modifiedOpts)
val output = allTestGroupsTask(s, loadedTestFrameworks.value, testLoader.value, testGrouping.value, newConfig, fullClasspath.value, javaHome.value)
val output = allTestGroupsTask(s, loadedTestFrameworks.value, testLoader.value, testGrouping.value, newConfig, fullClasspath.value, javaHome.value, testForkedParallel.value)
val processed =
for(out <- output) yield
Tests.showResults(s.log, out, noTestsMessage(resolvedScoped.value))
Def.value(processed)
}
}

def createTestRunners(frameworks: Map[TestFramework,Framework], loader: ClassLoader, config: Tests.Execution) = {
def createTestRunners(frameworks: Map[TestFramework,Framework], loader: ClassLoader, config: Tests.Execution) : Map[TestFramework, Runner] = {
import Tests.Argument
val opts = config.options.toList
frameworks.map { case (tf, f) =>
Expand All @@ -490,12 +491,18 @@ object Defaults extends BuildCommon
}

def allTestGroupsTask(s: TaskStreams, frameworks: Map[TestFramework,Framework], loader: ClassLoader, groups: Seq[Tests.Group], config: Tests.Execution, cp: Classpath, javaHome: Option[File]): Task[Tests.Output] = {
allTestGroupsTask(s,frameworks,loader, groups, config, cp, javaHome, forkedParallelExecution = false)
}

def allTestGroupsTask(s: TaskStreams, frameworks: Map[TestFramework,Framework], loader: ClassLoader, groups: Seq[Tests.Group], config: Tests.Execution, cp: Classpath, javaHome: Option[File], forkedParallelExecution: Boolean): Task[Tests.Output] = {
val runners = createTestRunners(frameworks, loader, config)
val groupTasks = groups map {
case Tests.Group(name, tests, runPolicy) =>
runPolicy match {
case Tests.SubProcess(opts) =>
ForkTests(runners, tests.toList, config, cp.files, opts, s.log) tag Tags.ForkedTestGroup
val forkedConfig = config.copy(parallel = config.parallel && forkedParallelExecution)
s.log.debug(s"Forking tests - parallelism = ${forkedConfig.parallel}")
ForkTests(runners, tests.toList, forkedConfig, cp.files, opts, s.log) tag Tags.ForkedTestGroup
case Tests.InProcess =>
Tests(frameworks, loader, runners, tests, config, s.log)
}
Expand Down
1 change: 1 addition & 0 deletions main/src/main/scala/sbt/Keys.scala
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ object Keys
val testOptions = TaskKey[Seq[TestOption]]("test-options", "Options for running tests.", BPlusTask)
val testFrameworks = SettingKey[Seq[TestFramework]]("test-frameworks", "Registered, although not necessarily present, test frameworks.", CTask)
val testListeners = TaskKey[Seq[TestReportListener]]("test-listeners", "Defines test listeners.", DTask)
val testForkedParallel = SettingKey[Boolean]("test-forked-parallel", "Whether forked tests should be executed in parallel", CTask)
val testExecution = TaskKey[Tests.Execution]("test-execution", "Settings controlling test execution", DTask)
val testFilter = TaskKey[Seq[String] => Seq[String => Boolean]]("test-filter", "Filter controlling whether the test is executed", DTask)
val testGrouping = TaskKey[Seq[Tests.Group]]("test-grouping", "Collects discovered tests into groups. Whether to fork and the options for forking are configurable on a per-group basis.", BMinusTask)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import sbt._
import Keys._
import Tests._
import Defaults._

object ForkParallelTest extends Build {
val check = taskKey[Unit]("Check that tests are executed in parallel")

lazy val root = Project("root", file("."), settings = defaultSettings ++ Seq(
scalaVersion := "2.9.2",
libraryDependencies += "com.novocode" % "junit-interface" % "0.10" % "test",
fork in Test := true,
check := {
if( ! (file("max-concurrent-tests_3").exists() || file("max-concurrent-tests_4").exists() )) {
sys.error("Forked tests were not executed in parallel!")
}
}
))
}
53 changes: 53 additions & 0 deletions sbt/src/sbt-test/tests/fork-parallel/src/test/scala/tests.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@

import java.io.File
import java.util.concurrent.atomic.AtomicInteger
import org.junit.Test
import scala.annotation.tailrec

object ParallelTest {
val nbConcurrentTests = new AtomicInteger(0)
val maxConcurrentTests = new AtomicInteger(0)

private def updateMaxConcurrentTests(currentMax: Int, newMax: Int) : Boolean = {
if( maxConcurrentTests.compareAndSet(currentMax, newMax) ) {
val f = new File("max-concurrent-tests_" + newMax)
f.createNewFile
true
} else {
false
}
}

@tailrec
def execute(f : => Unit) {
val nb = nbConcurrentTests.incrementAndGet()
val max = maxConcurrentTests.get()
if( nb <= max || updateMaxConcurrentTests(max, nb)) {
f
nbConcurrentTests.getAndDecrement
} else {
nbConcurrentTests.getAndDecrement
execute(f)
}
}
}

class Test1 {
@Test
def slow() { ParallelTest.execute { Thread.sleep(1000) } }
}

class Test2 {
@Test
def slow() { ParallelTest.execute { Thread.sleep(1000) } }
}

class Test3 {
@Test
def slow() { ParallelTest.execute { Thread.sleep(1000) } }
}

class Test4 {
@Test
def slow() { ParallelTest.execute { Thread.sleep(1000) } }
}
7 changes: 7 additions & 0 deletions sbt/src/sbt-test/tests/fork-parallel/test
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
> test
-> check

> clean
> set testForkedParallel := true
> test
> check
2 changes: 1 addition & 1 deletion sbt/src/sbt-test/tests/fork/project/ForkTestsTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ object ForkTestsTest extends Build {
val (exist, absent) = files.partition(_.exists)
exist.foreach(_.delete())
if(absent.nonEmpty)
error("Files were not created:\n\t" + absent.mkString("\n\t"))
sys.error("Files were not created:\n\t" + absent.mkString("\n\t"))
},
concurrentRestrictions := Tags.limit(Tags.ForkedTestGroup, 2) :: Nil,
libraryDependencies += "org.scalatest" %% "scalatest" % "1.8" % "test"
Expand Down
21 changes: 21 additions & 0 deletions testing/agent/src/main/java/sbt/ForkConfiguration.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package sbt;

import java.io.Serializable;

public final class ForkConfiguration implements Serializable {
private boolean ansiCodesSupported;
private boolean parallel;

public ForkConfiguration(boolean ansiCodesSupported, boolean parallel) {
this.ansiCodesSupported = ansiCodesSupported;
this.parallel = parallel;
}

public boolean isAnsiCodesSupported() {
return ansiCodesSupported;
}

public boolean isParallel() {
return parallel;
}
}
Loading

0 comments on commit 4bb0873

Please sign in to comment.