Skip to content

Commit

Permalink
[SPARK-37891][CORE] Add scalastyle check to disable scala.concurrent.…
Browse files Browse the repository at this point in the history
…ExecutionContext.Implicits.global

### What changes were proposed in this pull request?
Add scalastyle check to disable internal use of scala.concurrent.ExecutionContext.Implicits.global.
The reason is that user queries can also use this thread pool, causing competing in resource and starvation. Spark-internal APIs should thus not use the global thread pool.

### Why are the changes needed?
Forbid Spark internal API from using global thread pool

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
PR tests

Closes apache#35187 from tianhanhu/SPARK-37891.

Authored-by: tianhanhu <adrianhu96@gmail.com>
Signed-off-by: Xingbo Jiang <xingbo.jiang@databricks.com>
  • Loading branch information
tianhanhu authored and jiangxb1987 committed Jan 24, 2022
1 parent 8fef5bb commit bcaab62
Show file tree
Hide file tree
Showing 12 changed files with 30 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ import java.util.concurrent.TimeoutException

import scala.collection.mutable.ListBuffer
import scala.concurrent.{Future, Promise}
// scalastyle:off executioncontextglobal
import scala.concurrent.ExecutionContext.Implicits.global
// scalastyle:on executioncontextglobal
import scala.concurrent.duration._
import scala.sys.process._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,9 @@ private[spark] class RestSubmissionClient(master: String) extends Logging {
* Exposed for testing.
*/
private[rest] def readResponse(connection: HttpURLConnection): SubmitRestProtocolResponse = {
// scalastyle:off executioncontextglobal
import scala.concurrent.ExecutionContext.Implicits.global
// scalastyle:on executioncontextglobal
val responseFuture = Future {
val responseCode = connection.getResponseCode

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,9 @@ private[storage] class FallbackStorage(conf: SparkConf) extends Logging {
}

private[storage] class NoopRpcEndpointRef(conf: SparkConf) extends RpcEndpointRef(conf) {
// scalastyle:off executioncontextglobal
import scala.concurrent.ExecutionContext.Implicits.global
// scalastyle:on executioncontextglobal
override def address: RpcAddress = null
override def name: String = "fallback"
override def send(message: Any): Unit = {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ package org.apache.spark
import java.util.concurrent.{Semaphore, TimeUnit}
import java.util.concurrent.atomic.AtomicInteger

// scalastyle:off executioncontextglobal
import scala.concurrent.ExecutionContext.Implicits.global
// scalastyle:on executioncontextglobal
import scala.concurrent.Future
import scala.concurrent.duration._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ package org.apache.spark.rdd
import java.util.concurrent.Semaphore

import scala.concurrent._
// scalastyle:off executioncontextglobal
import scala.concurrent.ExecutionContext.Implicits.global
// scalastyle:on executioncontextglobal
import scala.concurrent.duration.Duration

import org.scalatest.BeforeAndAfterAll
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,9 @@ abstract class SchedulerIntegrationSuite[T <: MockBackend: ClassTag] extends Spa
func: (TaskContext, Iterator[_]) => _ = jobComputeFunc): Future[Any] = {
val waiter: JobWaiter[Any] = scheduler.submitJob(rdd, func, partitions.toSeq, CallSite("", ""),
(index, res) => results(index) = res, new Properties())
// scalastyle:off executioncontextglobal
import scala.concurrent.ExecutionContext.Implicits.global
// scalastyle:on executioncontextglobal
waiter.completionFuture.recover { case ex =>
failure = ex
}
Expand Down Expand Up @@ -697,7 +699,9 @@ class BasicSchedulerIntegrationSuite extends SchedulerIntegrationSuite[SingleCor
withBackend(runBackend _) {
// Submit a job containing an RDD which will hang in getPartitions() until we release
// the countdown latch:
// scalastyle:off executioncontextglobal
import scala.concurrent.ExecutionContext.Implicits.global
// scalastyle:on executioncontextglobal
val slowJobFuture = Future { submit(rddWithSlowGetPartitions, Array(0)) }.flatten

// Block the current thread until the other thread has started the getPartitions() call:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
package org.apache.spark.serializer

import scala.concurrent._
// scalastyle:off executioncontextglobal
import scala.concurrent.ExecutionContext.Implicits.global
// scalastyle:on executioncontextglobal
import scala.concurrent.duration._

import org.apache.spark.{SparkConf, SparkContext}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2229,7 +2229,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
blockData: ManagedBuffer,
level: StorageLevel,
classTag: ClassTag[_]): Future[Unit] = {
// scalastyle:off executioncontextglobal
import scala.concurrent.ExecutionContext.Implicits.global
// scalastyle:on executioncontextglobal
Future {}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ import java.util.concurrent.{CompletableFuture, Semaphore}
import java.util.zip.CheckedInputStream

import scala.collection.mutable
// scalastyle:off executioncontextglobal
import scala.concurrent.ExecutionContext.Implicits.global
// scalastyle:on executioncontextglobal
import scala.concurrent.Future

import com.google.common.io.ByteStreams
Expand Down
6 changes: 6 additions & 0 deletions scalastyle-config.xml
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,12 @@ This file is divided into 3 sections:
of Commons Lang 2 (package org.apache.commons.lang.*)</customMessage>
</check>

<check customId="executioncontextglobal" level="error" class="org.scalastyle.file.RegexChecker" enabled="true">
<parameters><parameter name="regex">scala\.concurrent\.ExecutionContext\.Implicits\.global</parameter></parameters>
<customMessage> User queries can use global thread pool, causing starvation and eventual OOM.
Thus, Spark-internal APIs should not use this thread pool</customMessage>
</check>

<check customId="FileSystemGet" level="error" class="org.scalastyle.file.RegexChecker" enabled="true">
<parameters><parameter name="regex">FileSystem.get\([a-zA-Z_$][a-zA-Z_$0-9]*\)</parameter></parameters>
<customMessage><![CDATA[
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -447,9 +447,9 @@ class StreamingQueryManagerSuite extends StreamTest {

/** Stop a random active query either with `stop()` or with an error */
private def stopRandomQueryAsync(stopAfter: Span, withError: Boolean): StreamingQuery = {

// scalastyle:off executioncontextglobal
import scala.concurrent.ExecutionContext.Implicits.global

// scalastyle:on executioncontextglobal
val activeQueries = spark.streams.active
val queryToStop = activeQueries(Random.nextInt(activeQueries.length))
Future {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ import java.util.concurrent.ConcurrentLinkedQueue

import scala.collection.JavaConverters._
import scala.collection.mutable.HashMap
// scalastyle:off executioncontextglobal
import scala.concurrent.ExecutionContext.Implicits.global
// scalastyle:on executioncontextglobal
import scala.concurrent.Future

import org.mockito.Mockito.{mock, reset, verifyNoMoreInteractions}
Expand Down

0 comments on commit bcaab62

Please sign in to comment.