From 707408d98dce8a0f56c29bef0ecd0010f7d9f3c2 Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Mon, 3 Apr 2023 08:28:25 -0500 Subject: [PATCH] [SPARK-42974][CORE] Restore `Utils.createTempDir` to use the `ShutdownHookManager` and clean up `JavaUtils.createTempDir` method ### What changes were proposed in this pull request? The main change of this pr as follows: 1. Make `Utils.createTempDir` and `JavaUtils.createTempDir` back to two independent implementations to restore `Utils.createTempDir` to use the `spark.util.ShutdownHookManager` mechanism. 2. Use `Utils.createTempDir` or `JavaUtils.createDirectory` instead for testing where `JavaUtils.createTempDir` is used. 3. Clean up `JavaUtils.createTempDir` method ### Why are the changes needed? Restore `Utils.createTempDir` to use the `spark.util.ShutdownHookManager` mechanism and clean up `JavaUtils.createTempDir` method. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Pass GitHub Actions Closes #40613 from LuciferYang/revert-SPARK-39204. Authored-by: yangjie01 Signed-off-by: Sean Owen --- .../apache/spark/network/util/JavaUtils.java | 20 ------------------- .../spark/network/StreamTestHelper.java | 2 +- .../shuffle/ExternalBlockHandlerSuite.java | 2 +- .../shuffle/TestShuffleDataContext.java | 3 ++- .../network/yarn/YarnShuffleService.java | 4 +++- .../scala/org/apache/spark/util/Utils.scala | 11 +++++++++- .../org/apache/spark/Java8RDDAPISuite.java | 3 +-- .../test/org/apache/spark/JavaAPISuite.java | 4 ++-- .../apache/spark/streaming/JavaAPISuite.java | 5 ++--- 9 files changed, 22 insertions(+), 32 deletions(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/JavaUtils.java b/common/network-common/src/main/java/org/apache/spark/network/util/JavaUtils.java index 544fe16a56915..7e410e9eab223 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/util/JavaUtils.java +++ b/common/network-common/src/main/java/org/apache/spark/network/util/JavaUtils.java @@ -368,26 +368,6 @@ public static byte[] bufferToArray(ByteBuffer buffer) { } } - /** - * Create a temporary directory inside `java.io.tmpdir` with default namePrefix "spark". - * The directory will be automatically deleted when the VM shuts down. - */ - public static File createTempDir() throws IOException { - return createTempDir(System.getProperty("java.io.tmpdir"), "spark"); - } - - /** - * Create a temporary directory inside the given parent directory. The directory will be - * automatically deleted when the VM shuts down. - */ - public static File createTempDir(String root, String namePrefix) throws IOException { - if (root == null) root = System.getProperty("java.io.tmpdir"); - if (namePrefix == null) namePrefix = "spark"; - File dir = createDirectory(root, namePrefix); - dir.deleteOnExit(); - return dir; - } - /** * Create a directory inside the given parent directory with default namePrefix "spark". * The directory is guaranteed to be newly created, and is not marked for automatic deletion. diff --git a/common/network-common/src/test/java/org/apache/spark/network/StreamTestHelper.java b/common/network-common/src/test/java/org/apache/spark/network/StreamTestHelper.java index 3ba6a58565382..da83e549d1c17 100644 --- a/common/network-common/src/test/java/org/apache/spark/network/StreamTestHelper.java +++ b/common/network-common/src/test/java/org/apache/spark/network/StreamTestHelper.java @@ -49,7 +49,7 @@ private static ByteBuffer createBuffer(int bufSize) { } StreamTestHelper() throws Exception { - tempDir = JavaUtils.createTempDir(); + tempDir = JavaUtils.createDirectory(System.getProperty("java.io.tmpdir"), "spark"); emptyBuffer = createBuffer(0); smallBuffer = createBuffer(100); largeBuffer = createBuffer(100000); diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalBlockHandlerSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalBlockHandlerSuite.java index 54f29fedf832c..44dcb71f7536f 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalBlockHandlerSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalBlockHandlerSuite.java @@ -125,7 +125,7 @@ private void checkDiagnosisResult( int reduceId = 0; // prepare the checksum file - File tmpDir = JavaUtils.createTempDir(); + File tmpDir = JavaUtils.createDirectory(System.getProperty("java.io.tmpdir"), "spark"); File checksumFile = new File(tmpDir, "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId + ".checksum." + algorithm); DataOutputStream out = new DataOutputStream(new FileOutputStream(checksumFile)); diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java index 5661504d621d7..05b6c1235c645 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java @@ -47,8 +47,9 @@ public TestShuffleDataContext(int numLocalDirs, int subDirsPerLocalDir) { } public void create() throws IOException { + String root = System.getProperty("java.io.tmpdir"); for (int i = 0; i < localDirs.length; i ++) { - localDirs[i] = JavaUtils.createTempDir().getAbsolutePath(); + localDirs[i] = JavaUtils.createDirectory(root, "spark").getAbsolutePath(); for (int p = 0; p < subDirsPerLocalDir; p ++) { new File(localDirs[i], String.format("%02x", p)).mkdirs(); diff --git a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java index 25adc1da32e5c..1fa0eebb7f8bd 100644 --- a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java +++ b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java @@ -243,7 +243,9 @@ protected void serviceInit(Configuration externalConf) throws Exception { boolean stopOnFailure = _conf.getBoolean(STOP_ON_FAILURE_KEY, DEFAULT_STOP_ON_FAILURE); if (_recoveryPath == null && _conf.getBoolean(INTEGRATION_TESTING, false)) { - _recoveryPath = new Path(JavaUtils.createTempDir().toURI()); + File tempDir = JavaUtils.createDirectory(System.getProperty("java.io.tmpdir"), "spark"); + tempDir.deleteOnExit(); + _recoveryPath = new Path(tempDir.toURI()); } if (_recoveryPath != null) { diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index a6a1f7d7b1e9b..166f93be05bed 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -323,6 +323,13 @@ private[spark] object Utils extends Logging { JavaUtils.createDirectory(root, namePrefix) } + /** + * Create a temporary directory inside the `java.io.tmpdir` prefixed with `spark`. + * The directory will be automatically deleted when the VM shuts down. + */ + def createTempDir(): File = + createTempDir(System.getProperty("java.io.tmpdir"), "spark") + /** * Create a temporary directory inside the given parent directory. The directory will be * automatically deleted when the VM shuts down. @@ -330,7 +337,9 @@ private[spark] object Utils extends Logging { def createTempDir( root: String = System.getProperty("java.io.tmpdir"), namePrefix: String = "spark"): File = { - JavaUtils.createTempDir(root, namePrefix) + val dir = createDirectory(root, namePrefix) + ShutdownHookManager.registerShutdownDeleteDir(dir) + dir } /** diff --git a/core/src/test/java/test/org/apache/spark/Java8RDDAPISuite.java b/core/src/test/java/test/org/apache/spark/Java8RDDAPISuite.java index 51758979fc5b5..c56cb09de05f8 100644 --- a/core/src/test/java/test/org/apache/spark/Java8RDDAPISuite.java +++ b/core/src/test/java/test/org/apache/spark/Java8RDDAPISuite.java @@ -22,7 +22,6 @@ import java.io.Serializable; import java.util.*; -import org.apache.spark.network.util.JavaUtils; import scala.Tuple2; import com.google.common.collect.Iterables; @@ -246,7 +245,7 @@ public void mapPartitions() { @Test public void sequenceFile() throws IOException { - File tempDir = JavaUtils.createTempDir(); + File tempDir = Utils.createTempDir(); tempDir.deleteOnExit(); String outputDir = new File(tempDir, "output").getAbsolutePath(); List> pairs = Arrays.asList( diff --git a/core/src/test/java/test/org/apache/spark/JavaAPISuite.java b/core/src/test/java/test/org/apache/spark/JavaAPISuite.java index 5c8529d51357b..1c63800982a18 100644 --- a/core/src/test/java/test/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/java/test/org/apache/spark/JavaAPISuite.java @@ -39,7 +39,6 @@ import org.apache.spark.SparkConf; import org.apache.spark.TaskContext; import org.apache.spark.TaskContext$; -import org.apache.spark.network.util.JavaUtils; import scala.Tuple2; import scala.Tuple3; import scala.Tuple4; @@ -82,6 +81,7 @@ import org.apache.spark.storage.StorageLevel; import org.apache.spark.util.LongAccumulator; import org.apache.spark.util.StatCounter; +import org.apache.spark.util.Utils; // The test suite itself is Serializable so that anonymous Function implementations can be // serialized, as an alternative to converting these anonymous classes to static inner classes; @@ -93,7 +93,7 @@ public class JavaAPISuite implements Serializable { @Before public void setUp() throws IOException { sc = new JavaSparkContext("local", "JavaAPISuite"); - tempDir = JavaUtils.createTempDir(); + tempDir = Utils.createTempDir(); tempDir.deleteOnExit(); } diff --git a/streaming/src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java index 3bc704661ba0a..2e55064bb4974 100644 --- a/streaming/src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java @@ -22,7 +22,6 @@ import java.util.*; import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.spark.network.util.JavaUtils; import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.JavaCheckpointTestUtils; import org.apache.spark.streaming.JavaTestUtils; @@ -1476,7 +1475,7 @@ public void testCheckpointMasterRecovery() throws InterruptedException, IOExcept Arrays.asList(1,4), Arrays.asList(8,7)); - File tempDir = JavaUtils.createTempDir(); + File tempDir = Utils.createTempDir(); tempDir.deleteOnExit(); ssc.checkpoint(tempDir.getAbsolutePath()); @@ -1507,7 +1506,7 @@ public void testContextGetOrCreate() throws IOException { .setAppName("test") .set("newContext", "true"); - File emptyDir = JavaUtils.createTempDir(); + File emptyDir = Utils.createTempDir(); emptyDir.deleteOnExit(); StreamingContextSuite contextSuite = new StreamingContextSuite(); String corruptedCheckpointDir = contextSuite.createCorruptedCheckpoint();