Skip to content

Commit

Permalink
Implement logic to handle partial writes on Posix write() calls.
Browse files Browse the repository at this point in the history
  • Loading branch information
tonystone committed Jan 22, 2019
1 parent 04cccbe commit 2109c7f
Show file tree
Hide file tree
Showing 7 changed files with 162 additions and 62 deletions.
Empty file.
75 changes: 63 additions & 12 deletions Sources/TraceLog/Utilities/Streams/FileOutputStream.swift
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,7 @@ internal class FileOutputStream {
///
func close() {
if self.fd >= 0 && closeFd {
#if os(macOS) || os(iOS) || os(watchOS) || os(tvOS)
Darwin.close(self.fd)
#elseif os(Linux) || CYGWIN
Glibc.close(self.fd)
#endif
self.close(self.fd)
self.fd = -1
}
}
Expand All @@ -129,7 +125,6 @@ internal class FileOutputStream {
/// Should we close this file descriptor on deinit and on request or not.
///
private let closeFd: Bool

}

/// OutputStream conformance for FileOutputStream.
Expand Down Expand Up @@ -158,18 +153,74 @@ extension FileOutputStream: OutputStream {

/// Writes the byte block to the File.
///
/// - Note: We are forgoing locking of the file here since we write the data
/// nearly 100% of the time in a single write() call. POSIX guarantees that
/// if the file is opened in append mode, individual write() operations will
/// be atomic. Partial writes are the only time that we can't write in a
/// single call but with the minimal write sizes that are written, these
/// will be almost non-existent.
/// \
/// Per the [POSIX standard](http://pubs.opengroup.org/onlinepubs/9699919799/functions/write.html):
/// \
/// "If the O_APPEND flag of the file status flags is set, the file offset
/// shall be set to the end of the file prior to each write and no intervening
/// file modification operation shall occur between changing the file offset
/// and the write operation."
///
func write(_ bytes: [UInt8]) -> Result<Int, OutputStreamError> {

var buffer = UnsafePointer(bytes)
var length = bytes.count

var written: Int = 0

/// Handle partial writes.
///
repeat {
written = self.write(self.fd, buffer, length)

if written == -1 {
if errno == EINTR { /// Always retry if interrupted.
continue
}
return .failure(OutputStreamError.error(for: errno))
}
length -= written
buffer += written

/// Exit if there are no more bytes (length != 0) or
/// we wrote zero bytes (written != 0)
///
} while (length != 0 && written != 0)

return .success(written)
}

func flush() {
fsync(self.fd)
}
}

// Private extension to work around Swifts confusion around similar function names.
///
private extension FileOutputStream {

@inline(__always)
func write(_ fd: Int32, _ buffer: UnsafePointer<UInt8>, _ nbytes: Int) -> Int {
#if os(macOS) || os(iOS) || os(watchOS) || os(tvOS)
let written = Darwin.write(self.fd, UnsafePointer(bytes), bytes.count)
return Darwin.write(fd, buffer, nbytes)
#elseif os(Linux) || CYGWIN
let written = Glibc.write(self.fd, UnsafePointer(bytes), bytes.count)
return Glibc.write(fd, buffer, nbytes)
#endif
}

guard written != -1
else { return .failure(OutputStreamError.error(for: errno)) }

return .success(written)
@inline(__always)
func close(_ fd: Int32) {
#if os(macOS) || os(iOS) || os(watchOS) || os(tvOS)
Darwin.close(fd)
#elseif os(Linux) || CYGWIN
Glibc.close(fd)
#endif
}
}

Expand Down
11 changes: 5 additions & 6 deletions Sources/TraceLog/Utilities/Streams/OutputStream.swift
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ internal protocol OutputStream {
/// - Returns: A Result<Int, OutputStreamError> value holding the number of bytes written if .successful or an OutputStreamError if failed.
///
func write(_ bytes: [UInt8]) -> Result<Int, OutputStreamError>

/// If there is buffering for this stream, flush the
/// buffer to disk.
///
func flush()
}

/// Errors returned by OutputStreams
Expand All @@ -50,12 +55,6 @@ internal enum OutputStreamError: Error {
///
case networkDown(String)

/// The write operation was interrupted before it could be completed.
///
/// - Note: this is a re-triable error case.
///
case interrupted(String)

/// The stream was disconnected from its endpoint.
///
case disconnected(String)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,6 @@ internal extension OutputStreamError {
ENETUNREACH: /// A write was attempted on a socket and no route to the network is present.
return .networkDown(message)

case EAGAIN, EWOULDBLOCK, /// The file descriptor is for a socket, is marked O_NONBLOCK, and write would block.
EINTR: /// The write operation was terminated due to the receipt of a signal, and no data was transferred.
return .interrupted(message)

case EBADF, /// The fd argument is not a valid file descriptor open for writing.
EPIPE, /// A write was attempted on a socket that is shut down for writing, or is no longer connected. In the latter case, if the socket is of type SOCK_STREAM, a SIGPIPE signal shall also be sent to the thread.
ECONNRESET, /// A write was attempted on a socket that is not connected.
Expand All @@ -55,6 +51,9 @@ internal extension OutputStreamError {
case EINVAL: /// The STREAM or multiplexer referenced by fd is linked (directly or indirectly) downstream from a multiplexer.
return .invalidArgument(message)

case EAGAIN, EWOULDBLOCK, /// The file descriptor is for a socket, is marked O_NONBLOCK, and write would block.
EINTR: /// The write operation was terminated due to the receipt of a signal, and no data was transferred.
fallthrough
default:
return .unknownError(errorNumber, message)
}
Expand Down
23 changes: 23 additions & 0 deletions Tests/TraceLogTests/Utilities/Streams/FileOutputStreamTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -156,5 +156,28 @@ class FileOutputStreamTests: XCTestCase {
XCTFail("Error while deleting temporary file: \(error)")
}
}

func testWritePerformance() throws {
let inputURL = self.temporaryFileURL()
defer { self.removeFileIfExists(url: inputURL) }

let inputBytes = Array<UInt8>(repeating: 128, count: 128)

let stream = try FileOutputStream(url: inputURL, options: [.create])

self.measure {
for _ in 0..<1000000 {

switch stream.write(inputBytes) {
case .success(_):
break
default:
XCTFail()
}
}
}
}


}

Original file line number Diff line number Diff line change
Expand Up @@ -69,42 +69,6 @@ class OutputStreamErrorPosixTests: XCTestCase {
else { XCTFail(".networkDown was expected"); return }
}

func testErrorForEAGAIN() {

/// Note: We rely on the strerror(errno) from the system
/// to provide the actual message so we ignore it in these
/// tests at the moment. IN the current implementation
/// of Swift the message is slightly different between
/// Darwin and Linux.
///
guard case .interrupted(_) = OutputStreamError.error(for: EAGAIN)
else { XCTFail(".interrupted was expected"); return }
}

func testErrorForEWOULDBLOCK() {

/// Note: We rely on the strerror(errno) from the system
/// to provide the actual message so we ignore it in these
/// tests at the moment. IN the current implementation
/// of Swift the message is slightly different between
/// Darwin and Linux.
///
guard case .interrupted(_) = OutputStreamError.error(for: EWOULDBLOCK)
else { XCTFail(".interrupted was expected"); return }
}

func testErrorForEINTR() {

/// Note: We rely on the strerror(errno) from the system
/// to provide the actual message so we ignore it in these
/// tests at the moment. IN the current implementation
/// of Swift the message is slightly different between
/// Darwin and Linux.
///
guard case .interrupted(_) = OutputStreamError.error(for: EINTR)
else { XCTFail(".interrupted was expected"); return }
}

func testErrorForEBADF() {

/// Note: We rely on the strerror(errno) from the system
Expand Down Expand Up @@ -237,5 +201,50 @@ class OutputStreamErrorPosixTests: XCTestCase {
else { XCTFail(".invalidArgument was expected"); return }
}

func testErrorForEAGAIN() {

/// Note: We rely on the strerror(errno) from the system
/// to provide the actual message so we ignore it in these
/// tests at the moment. IN the current implementation
/// of Swift the message is slightly different between
/// Darwin and Linux.
///
if case .unknownError(let code, _) = FileOutputStreamError.error(for: EAGAIN) {
XCTAssertEqual(code, EAGAIN)
} else {
XCTFail(".unknownError was expected")
}
}

func testErrorForEWOULDBLOCK() {

/// Note: We rely on the strerror(errno) from the system
/// to provide the actual message so we ignore it in these
/// tests at the moment. IN the current implementation
/// of Swift the message is slightly different between
/// Darwin and Linux.
///
if case .unknownError(let code, _) = FileOutputStreamError.error(for: EWOULDBLOCK) {
XCTAssertEqual(code, EWOULDBLOCK)
} else {
XCTFail(".unknownError was expected")
}
}

func testErrorForEINTR() {

/// Note: We rely on the strerror(errno) from the system
/// to provide the actual message so we ignore it in these
/// tests at the moment. IN the current implementation
/// of Swift the message is slightly different between
/// Darwin and Linux.
///
if case .unknownError(let code, _) = FileOutputStreamError.error(for: EINTR) {
XCTAssertEqual(code, EINTR)
} else {
XCTFail(".unknownError was expected")
}
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,6 @@ import XCTest
["ENETDOWN", "networkDown", "Network is down"],
["ENETUNREACH", "networkDown", "Network is unreachable"],

["EAGAIN", "interrupted", "Resource temporarily unavailable"],
["EWOULDBLOCK", "interrupted", "Resource temporarily unavailable"],
["EINTR", "interrupted", "Interrupted system call"],

["EBADF", "disconnected", "Bad file descriptor"],
["EPIPE", "disconnected", "Broken pipe"],
["ECONNRESET", "disconnected", "Connection reset by peer"],
Expand All @@ -51,6 +47,12 @@ import XCTest

["EINVAL", "invalidArgument", "Invalid argument"]
]

TwoArgVariants = [
["EAGAIN", "unknownError", "Resource temporarily unavailable"],
["EWOULDBLOCK", "unknownError", "Resource temporarily unavailable"],
["EINTR", "unknownError", "Interrupted system call"]
]
}%

class OutputStreamErrorPosixTests: XCTestCase {
Expand Down Expand Up @@ -83,6 +85,23 @@ class OutputStreamErrorPosixTests: XCTestCase {
else { XCTFail(".${CaseValue} was expected"); return }
}

% end
% for (CodeConstant, CaseValue, Message) in TwoArgVariants:
func testErrorFor${CodeConstant}() {

/// Note: We rely on the strerror(errno) from the system
/// to provide the actual message so we ignore it in these
/// tests at the moment. IN the current implementation
/// of Swift the message is slightly different between
/// Darwin and Linux.
///
if case .${CaseValue}(let code, _) = FileOutputStreamError.error(for: ${CodeConstant}) {
XCTAssertEqual(code, ${CodeConstant})
} else {
XCTFail(".${CaseValue} was expected")
}
}

% end

}

0 comments on commit 2109c7f

Please sign in to comment.