Skip to content

Commit

Permalink
Revert "IO: tie lifetime of handle field to container (#43218)" (#43924)
Browse files Browse the repository at this point in the history
This reverts commit 5cd31b5.
  • Loading branch information
Keno authored Jan 25, 2022
1 parent cdd2f62 commit a400a24
Show file tree
Hide file tree
Showing 9 changed files with 125 additions and 150 deletions.
30 changes: 15 additions & 15 deletions base/asyncevent.jl
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ Use [`isopen`](@ref) to check whether it is still active.
This provides an implicit acquire & release memory ordering between the sending and waiting threads.
"""
mutable struct AsyncCondition
@atomic handle::Ptr{Cvoid}
handle::Ptr{Cvoid}
cond::ThreadSynchronizer
@atomic isopen::Bool
isopen::Bool
@atomic set::Bool

function AsyncCondition()
Expand Down Expand Up @@ -71,9 +71,9 @@ Note: `interval` is subject to accumulating time skew. If you need precise event
absolute time, create a new timer at each expiration with the difference to the next time computed.
"""
mutable struct Timer
@atomic handle::Ptr{Cvoid}
handle::Ptr{Cvoid}
cond::ThreadSynchronizer
@atomic isopen::Bool
isopen::Bool
@atomic set::Bool

function Timer(timeout::Real; interval::Real = 0.0)
Expand Down Expand Up @@ -143,13 +143,12 @@ function wait(t::Union{Timer, AsyncCondition})
end


isopen(t::Union{Timer, AsyncCondition}) = t.isopen && t.handle != C_NULL
isopen(t::Union{Timer, AsyncCondition}) = t.isopen

function close(t::Union{Timer, AsyncCondition})
iolock_begin()
if isopen(t)
@atomic :monotonic t.isopen = false
preserve_handle(t)
if t.handle != C_NULL && isopen(t)
t.isopen = false
ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), t)
end
iolock_end()
Expand All @@ -160,11 +159,13 @@ function uvfinalize(t::Union{Timer, AsyncCondition})
iolock_begin()
lock(t.cond)
try
if isopen(t)
if t.handle != C_NULL
disassociate_julia_struct(t.handle) # not going to call the usual close hooks
@atomic :monotonic t.isopen = false
ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), t.handle)
@atomic :monotonic t.handle = C_NULL
if t.isopen
t.isopen = false
ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), t)
end
t.handle = C_NULL
notify(t.cond, false)
end
finally
Expand All @@ -177,9 +178,8 @@ end
function _uv_hook_close(t::Union{Timer, AsyncCondition})
lock(t.cond)
try
@atomic :monotonic t.isopen = false
unpreserve_handle(t)
@atomic :monotonic t.handle = C_NULL
t.isopen = false
t.handle = C_NULL
notify(t.cond, t.set)
finally
unlock(t.cond)
Expand Down
7 changes: 2 additions & 5 deletions base/libuv.jl
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,8 @@ function preserve_handle(x)
end
function unpreserve_handle(x)
lock(preserve_handle_lock)
v = get(uvhandles, x, 0)::Int
if v == 0
unlock(preserve_handle_lock)
error("unbalanced call to unpreserve_handle for $(typeof(x))")
elseif v == 1
v = uvhandles[x]::Int
if v == 1
pop!(uvhandles, x)
else
uvhandles[x] = v - 1
Expand Down
7 changes: 3 additions & 4 deletions base/process.jl
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ function uv_return_spawn(p::Ptr{Cvoid}, exit_status::Int64, termsignal::Int32)
proc = unsafe_pointer_to_objref(data)::Process
proc.exitcode = exit_status
proc.termsignal = termsignal
disassociate_julia_struct(proc.handle)
ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), proc.handle)
proc.handle = C_NULL
lock(proc.exitnotify)
Expand All @@ -66,7 +65,7 @@ end

# called when the libuv handle is destroyed
function _uv_hook_close(proc::Process)
Libc.free(@atomicswap :not_atomic proc.handle = C_NULL)
proc.handle = C_NULL
nothing
end

Expand Down Expand Up @@ -588,10 +587,10 @@ Get the child process ID, if it still exists.
This function requires at least Julia 1.1.
"""
function Libc.getpid(p::Process)
# TODO: due to threading, this method is only weakly synchronized with the user application
# TODO: due to threading, this method is no longer synchronized with the user application
iolock_begin()
ppid = Int32(0)
if p.handle != C_NULL # e.g. process_running
if p.handle != C_NULL
ppid = ccall(:jl_uv_process_pid, Int32, (Ptr{Cvoid},), p.handle)
end
iolock_end()
Expand Down
27 changes: 10 additions & 17 deletions base/stream.jl
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ if OS_HANDLE != RawFD
end

function isopen(x::Union{LibuvStream, LibuvServer})
if x.status == StatusUninit || x.status == StatusInit || x.handle === C_NULL
if x.status == StatusUninit || x.status == StatusInit
throw(ArgumentError("$x is not initialized"))
end
return x.status != StatusClosed
Expand Down Expand Up @@ -496,39 +496,34 @@ end

function close(stream::Union{LibuvStream, LibuvServer})
iolock_begin()
should_wait = false
if stream.status == StatusInit
preserve_handle(stream)
ccall(:jl_forceclose_uv, Cvoid, (Ptr{Cvoid},), stream.handle)
stream.status = StatusClosing
elseif isopen(stream)
should_wait = uv_handle_data(stream) != C_NULL
if stream.status != StatusClosing
preserve_handle(stream)
ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), stream.handle)
stream.status = StatusClosing
end
end
iolock_end()
wait_close(stream)
should_wait && wait_close(stream)
nothing
end

function uvfinalize(uv::Union{LibuvStream, LibuvServer})
uv.handle == C_NULL && return
iolock_begin()
if uv.handle != C_NULL
disassociate_julia_struct(uv.handle) # not going to call the usual close hooks
if uv.status == StatusUninit
Libc.free(uv.handle)
elseif uv.status == StatusInit
ccall(:jl_forceclose_uv, Cvoid, (Ptr{Cvoid},), uv.handle)
elseif isopen(uv)
if uv.status != StatusClosing
ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), uv.handle)
end
elseif uv.status == StatusClosed
if uv.status != StatusUninit
close(uv)
else
Libc.free(uv.handle)
end
uv.handle = C_NULL
uv.status = StatusClosed
uv.handle = C_NULL
end
iolock_end()
nothing
Expand Down Expand Up @@ -672,15 +667,13 @@ function uv_readcb(handle::Ptr{Cvoid}, nread::Cssize_t, buf::Ptr{Cvoid})
notify(stream.cond)
else
# underlying stream is no longer useful: begin finalization
preserve_handle(stream)
ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), stream.handle)
stream.status = StatusClosing
end
end
else
stream.readerror = _UVError("read", nread)
# This is a fatal connection error
preserve_handle(stream)
ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), stream.handle)
stream.status = StatusClosing
end
Expand Down Expand Up @@ -718,9 +711,9 @@ function reseteof(x::TTY)
end

function _uv_hook_close(uv::Union{LibuvStream, LibuvServer})
unpreserve_handle(uv)
lock(uv.cond)
try
uv.handle = C_NULL
uv.status = StatusClosed
# notify any listeners that exist on this libuv stream type
notify(uv.cond)
Expand Down
3 changes: 2 additions & 1 deletion src/init.c
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,8 @@ static void jl_close_item_atexit(uv_handle_t *handle)
switch(handle->type) {
case UV_PROCESS:
// cause Julia to forget about the Process object
handle->data = NULL;
if (handle->data)
jl_uv_call_close_callback((jl_value_t*)handle->data);
// and make libuv think it is already dead
((uv_process_t*)handle)->pid = 0;
// fall-through
Expand Down
50 changes: 27 additions & 23 deletions src/jl_uv.c
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ JL_DLLEXPORT void jl_iolock_end(void)
}


static void jl_uv_call_close_callback(jl_value_t *val)
void jl_uv_call_close_callback(jl_value_t *val)
{
jl_value_t *args[2];
args[0] = jl_get_global(jl_base_relative_to(((jl_datatype_t*)jl_typeof(val))->name->module),
Expand Down Expand Up @@ -105,7 +105,6 @@ static void jl_uv_closeHandle(uv_handle_t *handle)
ct->world_age = jl_atomic_load_acquire(&jl_world_counter);
jl_uv_call_close_callback((jl_value_t*)handle->data);
ct->world_age = last_age;
return;
}
if (handle == (uv_handle_t*)&signal_async)
return;
Expand All @@ -126,10 +125,6 @@ static void jl_uv_flush_close_callback(uv_write_t *req, int status)
free(req);
return;
}
if (uv_is_closing((uv_handle_t*)stream)) { // avoid double-close on the stream
free(req);
return;
}
if (status == 0 && uv_is_writable(stream) && stream->write_queue_size != 0) {
// new data was written, wait for it to flush too
uv_buf_t buf;
Expand All @@ -139,9 +134,12 @@ static void jl_uv_flush_close_callback(uv_write_t *req, int status)
if (uv_write(req, stream, &buf, 1, (uv_write_cb)jl_uv_flush_close_callback) == 0)
return;
}
if (stream->type == UV_TTY)
uv_tty_set_mode((uv_tty_t*)stream, UV_TTY_MODE_NORMAL);
uv_close((uv_handle_t*)stream, &jl_uv_closeHandle);
if (!uv_is_closing((uv_handle_t*)stream)) { // avoid double-close on the stream
if (stream->type == UV_TTY)
uv_tty_set_mode((uv_tty_t*)stream, UV_TTY_MODE_NORMAL);
uv_close((uv_handle_t*)stream, &jl_uv_closeHandle);
}
free(req);
}

static void uv_flush_callback(uv_write_t *req, int status)
Expand Down Expand Up @@ -224,41 +222,47 @@ static void jl_proc_exit_cleanup_cb(uv_process_t *process, int64_t exit_status,

JL_DLLEXPORT void jl_close_uv(uv_handle_t *handle)
{
JL_UV_LOCK();
if (handle->type == UV_PROCESS && ((uv_process_t*)handle)->pid != 0) {
// take ownership of this handle,
// so we can waitpid for the resource to exit and avoid leaving zombies
assert(handle->data == NULL); // make sure Julia has forgotten about it already
((uv_process_t*)handle)->exit_cb = jl_proc_exit_cleanup_cb;
return;
}
else if (handle->type == UV_FILE) {
JL_UV_LOCK();
if (handle->type == UV_FILE) {
uv_fs_t req;
jl_uv_file_t *fd = (jl_uv_file_t*)handle;
if ((ssize_t)fd->file != -1) {
uv_fs_close(handle->loop, &req, fd->file, NULL);
fd->file = (uv_os_fd_t)(ssize_t)-1;
}
jl_uv_closeHandle(handle); // synchronous (ok since the callback is known to not interact with any global state)
JL_UV_UNLOCK();
return;
}
else if (!uv_is_closing(handle)) { // avoid double-closing the stream
if (handle->type == UV_NAMED_PIPE || handle->type == UV_TCP || handle->type == UV_TTY) {
// flush the stream write-queue first
uv_write_t *req = (uv_write_t*)malloc_s(sizeof(uv_write_t));
req->handle = (uv_stream_t*)handle;
jl_uv_flush_close_callback(req, 0);
}
else {
uv_close(handle, &jl_uv_closeHandle);
}

if (handle->type == UV_NAMED_PIPE || handle->type == UV_TCP || handle->type == UV_TTY) {
uv_write_t *req = (uv_write_t*)malloc_s(sizeof(uv_write_t));
req->handle = (uv_stream_t*)handle;
jl_uv_flush_close_callback(req, 0);
JL_UV_UNLOCK();
return;
}

// avoid double-closing the stream
if (!uv_is_closing(handle)) {
uv_close(handle, &jl_uv_closeHandle);
}
JL_UV_UNLOCK();
}

JL_DLLEXPORT void jl_forceclose_uv(uv_handle_t *handle)
{
if (!uv_is_closing(handle)) { // avoid double-closing the stream
// avoid double-closing the stream
if (!uv_is_closing(handle)) {
JL_UV_LOCK();
if (!uv_is_closing(handle)) { // double-check
if (!uv_is_closing(handle)) {
uv_close(handle, &jl_uv_closeHandle);
}
JL_UV_UNLOCK();
Expand Down
1 change: 1 addition & 0 deletions src/julia_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,7 @@ JL_DLLEXPORT jl_fptr_args_t jl_get_builtin_fptr(jl_value_t *b);

extern uv_loop_t *jl_io_loop;
void jl_uv_flush(uv_stream_t *stream);
void jl_uv_call_close_callback(jl_value_t *val);

typedef struct jl_typeenv_t {
jl_tvar_t *var;
Expand Down
Loading

0 comments on commit a400a24

Please sign in to comment.