Skip to content

Commit

Permalink
[SPARK-42974][CORE] Restore Utils.createTempDir to use the `Shutdow…
Browse files Browse the repository at this point in the history
…nHookManager` and clean up `JavaUtils.createTempDir` method

### What changes were proposed in this pull request?
The main change of this pr as follows:

1. Make `Utils.createTempDir` and `JavaUtils.createTempDir` back to two independent implementations to restore `Utils.createTempDir` to use the `spark.util.ShutdownHookManager` mechanism.
2. Use `Utils.createTempDir` or `JavaUtils.createDirectory` instead for testing where `JavaUtils.createTempDir` is used.
3. Clean up `JavaUtils.createTempDir` method

### Why are the changes needed?
Restore `Utils.createTempDir` to use the `spark.util.ShutdownHookManager` mechanism and clean up `JavaUtils.createTempDir` method.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Pass GitHub Actions

Closes apache#40613 from LuciferYang/revert-SPARK-39204.

Authored-by: yangjie01 <yangjie01@baidu.com>
Signed-off-by: Sean Owen <srowen@gmail.com>
  • Loading branch information
LuciferYang authored and srowen committed Apr 3, 2023
1 parent f57c368 commit 707408d
Show file tree
Hide file tree
Showing 9 changed files with 22 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -368,26 +368,6 @@ public static byte[] bufferToArray(ByteBuffer buffer) {
}
}

/**
* Create a temporary directory inside `java.io.tmpdir` with default namePrefix "spark".
* The directory will be automatically deleted when the VM shuts down.
*/
public static File createTempDir() throws IOException {
return createTempDir(System.getProperty("java.io.tmpdir"), "spark");
}

/**
* Create a temporary directory inside the given parent directory. The directory will be
* automatically deleted when the VM shuts down.
*/
public static File createTempDir(String root, String namePrefix) throws IOException {
if (root == null) root = System.getProperty("java.io.tmpdir");
if (namePrefix == null) namePrefix = "spark";
File dir = createDirectory(root, namePrefix);
dir.deleteOnExit();
return dir;
}

/**
* Create a directory inside the given parent directory with default namePrefix "spark".
* The directory is guaranteed to be newly created, and is not marked for automatic deletion.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ private static ByteBuffer createBuffer(int bufSize) {
}

StreamTestHelper() throws Exception {
tempDir = JavaUtils.createTempDir();
tempDir = JavaUtils.createDirectory(System.getProperty("java.io.tmpdir"), "spark");
emptyBuffer = createBuffer(0);
smallBuffer = createBuffer(100);
largeBuffer = createBuffer(100000);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ private void checkDiagnosisResult(
int reduceId = 0;

// prepare the checksum file
File tmpDir = JavaUtils.createTempDir();
File tmpDir = JavaUtils.createDirectory(System.getProperty("java.io.tmpdir"), "spark");
File checksumFile = new File(tmpDir,
"shuffle_" + shuffleId + "_" + mapId + "_" + reduceId + ".checksum." + algorithm);
DataOutputStream out = new DataOutputStream(new FileOutputStream(checksumFile));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,9 @@ public TestShuffleDataContext(int numLocalDirs, int subDirsPerLocalDir) {
}

public void create() throws IOException {
String root = System.getProperty("java.io.tmpdir");
for (int i = 0; i < localDirs.length; i ++) {
localDirs[i] = JavaUtils.createTempDir().getAbsolutePath();
localDirs[i] = JavaUtils.createDirectory(root, "spark").getAbsolutePath();

for (int p = 0; p < subDirsPerLocalDir; p ++) {
new File(localDirs[i], String.format("%02x", p)).mkdirs();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,9 @@ protected void serviceInit(Configuration externalConf) throws Exception {
boolean stopOnFailure = _conf.getBoolean(STOP_ON_FAILURE_KEY, DEFAULT_STOP_ON_FAILURE);

if (_recoveryPath == null && _conf.getBoolean(INTEGRATION_TESTING, false)) {
_recoveryPath = new Path(JavaUtils.createTempDir().toURI());
File tempDir = JavaUtils.createDirectory(System.getProperty("java.io.tmpdir"), "spark");
tempDir.deleteOnExit();
_recoveryPath = new Path(tempDir.toURI());
}

if (_recoveryPath != null) {
Expand Down
11 changes: 10 additions & 1 deletion core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -323,14 +323,23 @@ private[spark] object Utils extends Logging {
JavaUtils.createDirectory(root, namePrefix)
}

/**
* Create a temporary directory inside the `java.io.tmpdir` prefixed with `spark`.
* The directory will be automatically deleted when the VM shuts down.
*/
def createTempDir(): File =
createTempDir(System.getProperty("java.io.tmpdir"), "spark")

/**
* Create a temporary directory inside the given parent directory. The directory will be
* automatically deleted when the VM shuts down.
*/
def createTempDir(
root: String = System.getProperty("java.io.tmpdir"),
namePrefix: String = "spark"): File = {
JavaUtils.createTempDir(root, namePrefix)
val dir = createDirectory(root, namePrefix)
ShutdownHookManager.registerShutdownDeleteDir(dir)
dir
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.io.Serializable;
import java.util.*;

import org.apache.spark.network.util.JavaUtils;
import scala.Tuple2;

import com.google.common.collect.Iterables;
Expand Down Expand Up @@ -246,7 +245,7 @@ public void mapPartitions() {

@Test
public void sequenceFile() throws IOException {
File tempDir = JavaUtils.createTempDir();
File tempDir = Utils.createTempDir();
tempDir.deleteOnExit();
String outputDir = new File(tempDir, "output").getAbsolutePath();
List<Tuple2<Integer, String>> pairs = Arrays.asList(
Expand Down
4 changes: 2 additions & 2 deletions core/src/test/java/test/org/apache/spark/JavaAPISuite.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import org.apache.spark.SparkConf;
import org.apache.spark.TaskContext;
import org.apache.spark.TaskContext$;
import org.apache.spark.network.util.JavaUtils;
import scala.Tuple2;
import scala.Tuple3;
import scala.Tuple4;
Expand Down Expand Up @@ -82,6 +81,7 @@
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.util.LongAccumulator;
import org.apache.spark.util.StatCounter;
import org.apache.spark.util.Utils;

// The test suite itself is Serializable so that anonymous Function implementations can be
// serialized, as an alternative to converting these anonymous classes to static inner classes;
Expand All @@ -93,7 +93,7 @@ public class JavaAPISuite implements Serializable {
@Before
public void setUp() throws IOException {
sc = new JavaSparkContext("local", "JavaAPISuite");
tempDir = JavaUtils.createTempDir();
tempDir = Utils.createTempDir();
tempDir.deleteOnExit();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.spark.network.util.JavaUtils;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.JavaCheckpointTestUtils;
import org.apache.spark.streaming.JavaTestUtils;
Expand Down Expand Up @@ -1476,7 +1475,7 @@ public void testCheckpointMasterRecovery() throws InterruptedException, IOExcept
Arrays.asList(1,4),
Arrays.asList(8,7));

File tempDir = JavaUtils.createTempDir();
File tempDir = Utils.createTempDir();
tempDir.deleteOnExit();
ssc.checkpoint(tempDir.getAbsolutePath());

Expand Down Expand Up @@ -1507,7 +1506,7 @@ public void testContextGetOrCreate() throws IOException {
.setAppName("test")
.set("newContext", "true");

File emptyDir = JavaUtils.createTempDir();
File emptyDir = Utils.createTempDir();
emptyDir.deleteOnExit();
StreamingContextSuite contextSuite = new StreamingContextSuite();
String corruptedCheckpointDir = contextSuite.createCorruptedCheckpoint();
Expand Down

0 comments on commit 707408d

Please sign in to comment.