Skip to content

Commit

Permalink
improve process state management (swiftlang#209)
Browse files Browse the repository at this point in the history
motivation: improve performance and robustness of process state management

changes:
* refactor Process to use a state machine to track the process execution state
* replace use of DispatchQueue with Locks to protect state
* adjust windows implementation 

rdar://76087764

Co-authored-by: Saleem Abdulrasool <compnerd@compnerd.org>
  • Loading branch information
tomerd and compnerd authored Jun 25, 2021
1 parent 8ce7964 commit 4c4ad66
Show file tree
Hide file tree
Showing 2 changed files with 171 additions and 81 deletions.
250 changes: 170 additions & 80 deletions Sources/TSCBasic/Process.swift
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,15 @@ public final class Process: ObjectIdentifierProtocol {
}
}

// process execution mutable state
private enum State {
case idle
case readingOutputThread(stdout: Thread, stderr: Thread?)
case readingOutputPipe(sync: DispatchGroup)
case outputReady(stdout: Result<[UInt8], Swift.Error>, stderr: Result<[UInt8], Swift.Error>)
case complete(ProcessResult)
}

/// Typealias for process id type.
#if !os(Windows)
public typealias ProcessID = pid_t
Expand Down Expand Up @@ -219,36 +228,36 @@ public final class Process: ObjectIdentifierProtocol {
public private(set) var processID = ProcessID()
#endif

/// If the subprocess has launched.
/// Note: This property is not protected by the serial queue because it is only mutated in `launch()`, which will be
/// called only once.
public private(set) var launched = false
// process execution mutable state
private var state: State = .idle
private let stateLock = Lock()

/// The result of the process execution. Available after process is terminated.
/// This will block while the process is awaiting result
@available(*, deprecated, message: "use waitUntilExit instead")
public var result: ProcessResult? {
return self.serialQueue.sync {
self._result
return self.stateLock.withLock {
switch self.state {
case .complete(let result):
return result
default:
return nil
}
}
}

/// How process redirects its output.
public let outputRedirection: OutputRedirection
// ideally we would use the state for this, but we need to access it while the waitForExit is locking state
private var _launched = false
private let launchedLock = Lock()

/// The result of the process execution. Available after process is terminated.
private var _result: ProcessResult?

/// If redirected, stdout result and reference to the thread reading the output.
private var stdout: (result: Result<[UInt8], Swift.Error>, thread: Thread?) = (.success([]), nil)

/// If redirected, stderr result and reference to the thread reading the output.
private var stderr: (result: Result<[UInt8], Swift.Error>, thread: Thread?) = (.success([]), nil)

/// Queue to protect concurrent reads.
private let serialQueue = DispatchQueue(label: "org.swift.swiftpm.process")
public var launched: Bool {
return self.launchedLock.withLock {
return self._launched
}
}

/// Queue to protect reading/writing on map of validated executables.
private static let executablesQueue = DispatchQueue(
label: "org.swift.swiftpm.process.findExecutable")
/// How process redirects its output.
public let outputRedirection: OutputRedirection

/// Indicates if a new progress group is created for the child process.
private let startNewProcessGroup: Bool
Expand All @@ -257,7 +266,8 @@ public final class Process: ObjectIdentifierProtocol {
///
/// Key: Executable name or path.
/// Value: Path to the executable, if found.
static private var validatedExecutablesMap = [String: AbsolutePath?]()
private static var validatedExecutablesMap = [String: AbsolutePath?]()
private static let validatedExecutablesMapLock = Lock()

/// Create a new process instance.
///
Expand Down Expand Up @@ -348,7 +358,7 @@ public final class Process: ObjectIdentifierProtocol {
}
// This should cover the most common cases, i.e. when the cache is most helpful.
if workingDirectory == localFileSystem.currentWorkingDirectory {
return Process.executablesQueue.sync {
return Process.validatedExecutablesMapLock.withLock {
if let value = Process.validatedExecutablesMap[program] {
return value
}
Expand All @@ -367,10 +377,11 @@ public final class Process: ObjectIdentifierProtocol {
@discardableResult
public func launch() throws -> WritableByteStream {
precondition(arguments.count > 0 && !arguments[0].isEmpty, "Need at least one argument to launch the process.")
precondition(!launched, "It is not allowed to launch the same process object again.")

// Set the launch bool to true.
launched = true
self.launchedLock.withLock {
precondition(!self._launched, "It is not allowed to launch the same process object again.")
self._launched = true
}

// Print the arguments if we are verbose.
if self.verbose {
Expand All @@ -393,30 +404,69 @@ public final class Process: ObjectIdentifierProtocol {
let stdinPipe = Pipe()
_process?.standardInput = stdinPipe

let group = DispatchGroup()

var stdout: [UInt8] = []
let stdoutLock = Lock()

var stderr: [UInt8] = []
let stderrLock = Lock()

if outputRedirection.redirectsOutput {
let stdoutPipe = Pipe()
let stderrPipe = Pipe()

group.enter()
stdoutPipe.fileHandleForReading.readabilityHandler = { (fh : FileHandle) -> Void in
let contents = fh.readDataToEndOfFile()
self.outputRedirection.outputClosures?.stdoutClosure([UInt8](contents))
if case .success(let data) = self.stdout.result {
self.stdout.result = .success(data + contents)
let data = fh.availableData
if (data.count == 0) {
stdoutPipe.fileHandleForReading.readabilityHandler = nil
group.leave()
} else {
let contents = data.withUnsafeBytes { Array<UInt8>($0) }
self.outputRedirection.outputClosures?.stdoutClosure(contents)
stdoutLock.withLock {
stdout += contents
}
}
}

group.enter()
stderrPipe.fileHandleForReading.readabilityHandler = { (fh : FileHandle) -> Void in
let contents = fh.readDataToEndOfFile()
self.outputRedirection.outputClosures?.stderrClosure([UInt8](contents))
if case .success(let data) = self.stderr.result {
self.stderr.result = .success(data + contents)
let data = fh.availableData
if (data.count == 0) {
stderrPipe.fileHandleForReading.readabilityHandler = nil
group.leave()
} else {
let contents = data.withUnsafeBytes { Array<UInt8>($0) }
self.outputRedirection.outputClosures?.stderrClosure(contents)
stderrLock.withLock {
stderr += contents
}
}
}

_process?.standardOutput = stdoutPipe
_process?.standardError = stderrPipe
}

// first set state then start reading threads
let sync = DispatchGroup()
sync.enter()
self.stateLock.withLock {
self.state = .readingOutputPipe(sync: sync)
}

group.notify(queue: .global()) {
self.stateLock.withLock {
self.state = .outputReady(stdout: .success(stdout), stderr: .success(stderr))
}
sync.leave()
}

try _process?.run()
return stdinPipe.fileHandleForWriting
#else
#else
// Initialize the spawn attributes.
#if canImport(Darwin) || os(Android)
var attributes: posix_spawnattr_t? = nil
Expand Down Expand Up @@ -547,72 +597,112 @@ public final class Process: ObjectIdentifierProtocol {
// Close the local read end of the input pipe.
try close(fd: stdinPipe[0])

if outputRedirection.redirectsOutput {
if !outputRedirection.redirectsOutput {
// no stdout or stderr in this case
self.stateLock.withLock {
self.state = .outputReady(stdout: .success([]), stderr: .success([]))
}
} else {
var pending: Result<[UInt8], Swift.Error>?
let pendingLock = Lock()

let outputClosures = outputRedirection.outputClosures

// Close the local write end of the output pipe.
try close(fd: outputPipe[1])

// Create a thread and start reading the output on it.
var thread = Thread { [weak self] in
let stdoutThread = Thread { [weak self] in
if let readResult = self?.readOutput(onFD: outputPipe[0], outputClosure: outputClosures?.stdoutClosure) {
self?.stdout.result = readResult
pendingLock.withLock {
if let stderrResult = pending {
self?.stateLock.withLock {
self?.state = .outputReady(stdout: readResult, stderr: stderrResult)
}
} else {
pending = readResult
}
}
} else if let stderrResult = (pendingLock.withLock { pending }) {
// TODO: this is more of an error
self?.stateLock.withLock {
self?.state = .outputReady(stdout: .success([]), stderr: stderrResult)
}
}
}
thread.start()
self.stdout.thread = thread

// Only schedule a thread for stderr if no redirect was requested.
var stderrThread: Thread? = nil
if !outputRedirection.redirectStderr {
// Close the local write end of the stderr pipe.
try close(fd: stderrPipe[1])

// Create a thread and start reading the stderr output on it.
thread = Thread { [weak self] in
stderrThread = Thread { [weak self] in
if let readResult = self?.readOutput(onFD: stderrPipe[0], outputClosure: outputClosures?.stderrClosure) {
self?.stderr.result = readResult
pendingLock.withLock {
if let stdoutResult = pending {
self?.stateLock.withLock {
self?.state = .outputReady(stdout: stdoutResult, stderr: readResult)
}
} else {
pending = readResult
}
}
} else if let stdoutResult = (pendingLock.withLock { pending }) {
// TODO: this is more of an error
self?.stateLock.withLock {
self?.state = .outputReady(stdout: stdoutResult, stderr: .success([]))
}
}
}
thread.start()
self.stderr.thread = thread
} else {
pendingLock.withLock {
pending = .success([]) // no stderr in this case
}
}
// first set state then start reading threads
self.stateLock.withLock {
self.state = .readingOutputThread(stdout: stdoutThread, stderr: stderrThread)
}
stdoutThread.start()
stderrThread?.start()
}

return stdinStream
#endif // POSIX implementation
#endif // POSIX implementation
}

/// Blocks the calling process until the subprocess finishes execution.
@discardableResult
public func waitUntilExit() throws -> ProcessResult {
#if os(Windows)
precondition(_process != nil, "The process is not yet launched.")
let p = _process!
p.waitUntilExit()
stdout.thread?.join()
stderr.thread?.join()

let executionResult = ProcessResult(
arguments: arguments,
environment: environment,
exitStatusCode: p.terminationStatus,
output: stdout.result,
stderrOutput: stderr.result
)
return executionResult
#else
return try serialQueue.sync {
precondition(launched, "The process is not yet launched.")

// If the process has already finsihed, return it.
if let existingResult = _result {
return existingResult
}

self.stateLock.lock()
switch self.state {
case .idle:
defer { self.stateLock.unlock() }
preconditionFailure("The process is not yet launched.")
case .complete(let result):
defer { self.stateLock.unlock() }
return result
case .readingOutputThread(let stdoutThread, let stderrThread):
self.stateLock.unlock() // unlock early since output read thread need to change state
// If we're reading output, make sure that is finished.
stdout.thread?.join()
stderr.thread?.join()

stdoutThread.join()
stderrThread?.join()
return try self.waitUntilExit()
case .readingOutputPipe(let sync):
self.stateLock.unlock() // unlock early since output read thread need to change state
sync.wait()
return try self.waitUntilExit()
case .outputReady(let stdoutResult, let stderrResult):
defer { self.stateLock.unlock() }
// Wait until process finishes execution.
#if os(Windows)
precondition(_process != nil, "The process is not yet launched.")
let p = _process!
p.waitUntilExit()
let exitStatusCode = p.terminationStatus
#else
var exitStatusCode: Int32 = 0
var result = waitpid(processID, &exitStatusCode, 0)
while result == -1 && errno == EINTR {
Expand All @@ -621,19 +711,19 @@ public final class Process: ObjectIdentifierProtocol {
if result == -1 {
throw SystemError.waitpid(errno)
}
#endif

// Construct the result.
let executionResult = ProcessResult(
arguments: arguments,
environment: environment,
exitStatusCode: exitStatusCode,
output: stdout.result,
stderrOutput: stderr.result
output: stdoutResult,
stderrOutput: stderrResult
)
self._result = executionResult
self.state = .complete(executionResult)
return executionResult
}
#endif
}

#if !os(Windows)
Expand Down Expand Up @@ -687,12 +777,12 @@ public final class Process: ObjectIdentifierProtocol {
public func signal(_ signal: Int32) {
#if os(Windows)
if signal == SIGINT {
_process?.interrupt()
_process?.interrupt()
} else {
_process?.terminate()
_process?.terminate()
}
#else
assert(launched, "The process is not yet launched.")
assert(self.launched, "The process is not yet launched.")
_ = TSCLibc.kill(startNewProcessGroup ? -processID : processID, signal)
#endif
}
Expand Down
2 changes: 1 addition & 1 deletion Tests/TSCBasicTests/ProcessSetTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class ProcessSetTests: XCTestCase {
threadStartCondition.signal()
}
let result = try process.waitUntilExit()
// Ensure we did termiated due to signal.
// Ensure we did terminated due to signal.
switch result.exitStatus {
case .signalled: break
default: XCTFail("Expected to exit via signal")
Expand Down

0 comments on commit 4c4ad66

Please sign in to comment.