Skip to content

Commit

Permalink
Always ref writable streams
Browse files Browse the repository at this point in the history
We suffered a bug whereby doing a follow-up write to another write could
resurrect a deleted stream, causing all sorts of crash.

Fix: when a stream becomes writable (vs when we start writing) take a
ref on the stream, and only relinquish it once we're done writing.
  • Loading branch information
ctiller committed Mar 3, 2016
1 parent bfc8a8d commit 0cb803d
Show file tree
Hide file tree
Showing 17 changed files with 127 additions and 71 deletions.
1 change: 1 addition & 0 deletions grpc.def
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ EXPORTS
gpr_event_wait
gpr_ref_init
gpr_ref
gpr_ref_non_zero
gpr_refn
gpr_unref
gpr_stats_init
Expand Down
4 changes: 4 additions & 0 deletions include/grpc/impl/codegen/sync.h
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,10 @@ GPRAPI void gpr_ref_init(gpr_refcount *r, int n);
/* Increment the reference count *r. Requires *r initialized. */
GPRAPI void gpr_ref(gpr_refcount *r);

/* Increment the reference count *r. Requires *r initialized.
Crashes if refcount is zero */
GPRAPI void gpr_ref_non_zero(gpr_refcount *r);

/* Increment the reference count *r by n. Requires *r initialized, n > 0. */
GPRAPI void gpr_refn(gpr_refcount *r, int n);

Expand Down
16 changes: 16 additions & 0 deletions src/core/iomgr/iomgr.c
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,11 @@
#include <grpc/support/string_util.h>
#include <grpc/support/sync.h>
#include <grpc/support/thd.h>
#include <grpc/support/useful.h>

#include "src/core/iomgr/iomgr_internal.h"
#include "src/core/iomgr/timer.h"
#include "src/core/support/env.h"
#include "src/core/support/string.h"

static gpr_mu g_mu;
Expand Down Expand Up @@ -116,6 +118,9 @@ void grpc_iomgr_shutdown(void) {
"memory leaks are likely",
count_objects());
dump_objects("LEAKED");
if (grpc_iomgr_abort_on_leaks()) {
abort();
}
}
break;
}
Expand Down Expand Up @@ -154,3 +159,14 @@ void grpc_iomgr_unregister_object(grpc_iomgr_object *obj) {
gpr_mu_unlock(&g_mu);
gpr_free(obj->name);
}

bool grpc_iomgr_abort_on_leaks(void) {
char *env = gpr_getenv("GRPC_ABORT_ON_LEAKS");
if (env == NULL) return false;
static const char *truthy[] = {"yes", "Yes", "YES", "true",
"True", "TRUE", "1"};
for (size_t i = 0; i < GPR_ARRAY_SIZE(truthy); i++) {
if (0 == strcmp(env, truthy[i])) return true;
}
return false;
}
6 changes: 5 additions & 1 deletion src/core/iomgr/iomgr_internal.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
*
* Copyright 2015, Google Inc.
* Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
Expand Down Expand Up @@ -34,6 +34,8 @@
#ifndef GRPC_INTERNAL_CORE_IOMGR_IOMGR_INTERNAL_H
#define GRPC_INTERNAL_CORE_IOMGR_IOMGR_INTERNAL_H

#include <stdbool.h>

#include "src/core/iomgr/iomgr.h"
#include <grpc/support/sync.h>

Expand All @@ -55,4 +57,6 @@ void grpc_iomgr_platform_flush(void);
/** tear down all platform specific global iomgr structures */
void grpc_iomgr_platform_shutdown(void);

bool grpc_iomgr_abort_on_leaks(void);

#endif /* GRPC_INTERNAL_CORE_IOMGR_IOMGR_INTERNAL_H */
7 changes: 6 additions & 1 deletion src/core/support/sync.c
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
*
* Copyright 2015, Google Inc.
* Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
Expand Down Expand Up @@ -98,6 +98,11 @@ void gpr_ref_init(gpr_refcount *r, int n) { gpr_atm_rel_store(&r->count, n); }

void gpr_ref(gpr_refcount *r) { gpr_atm_no_barrier_fetch_add(&r->count, 1); }

void gpr_ref_non_zero(gpr_refcount *r) {
gpr_atm prior = gpr_atm_no_barrier_fetch_add(&r->count, 1);
GPR_ASSERT(prior > 0);
}

void gpr_refn(gpr_refcount *r, int n) {
gpr_atm_no_barrier_fetch_add(&r->count, n);
}
Expand Down
18 changes: 11 additions & 7 deletions src/core/transport/chttp2/internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ typedef struct {
/** HTTP2 stream id for this stream, or zero if one has not been assigned */
uint32_t id;
uint8_t fetching;
uint8_t sent_initial_metadata;
bool sent_initial_metadata;
uint8_t sent_message;
uint8_t sent_trailing_metadata;
uint8_t read_closed;
Expand Down Expand Up @@ -509,7 +509,7 @@ void grpc_chttp2_publish_reads(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport_global *global,
grpc_chttp2_transport_parsing *parsing);

void grpc_chttp2_list_add_writable_stream(
bool grpc_chttp2_list_add_writable_stream(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global);
/** Get a writable stream
Expand All @@ -519,14 +519,13 @@ int grpc_chttp2_list_pop_writable_stream(
grpc_chttp2_transport_writing *transport_writing,
grpc_chttp2_stream_global **stream_global,
grpc_chttp2_stream_writing **stream_writing);
void grpc_chttp2_list_remove_writable_stream(
bool grpc_chttp2_list_remove_writable_stream(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global);
grpc_chttp2_stream_global *stream_global) GRPC_MUST_USE_RESULT;

/* returns 1 if stream added, 0 if it was already present */
int grpc_chttp2_list_add_writing_stream(
void grpc_chttp2_list_add_writing_stream(
grpc_chttp2_transport_writing *transport_writing,
grpc_chttp2_stream_writing *stream_writing) GRPC_MUST_USE_RESULT;
grpc_chttp2_stream_writing *stream_writing);
int grpc_chttp2_list_have_writing_streams(
grpc_chttp2_transport_writing *transport_writing);
int grpc_chttp2_list_pop_writing_stream(
Expand Down Expand Up @@ -770,4 +769,9 @@ void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport_parsing *parsing,
const uint8_t *opaque_8bytes);

/** add a ref to the stream and add it to the writable list;
ref will be dropped in writing.c */
void grpc_chttp2_become_writable(grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global);

#endif
6 changes: 3 additions & 3 deletions src/core/transport/chttp2/parsing.c
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
*
* Copyright 2015, Google Inc.
* Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
Expand Down Expand Up @@ -149,7 +149,7 @@ void grpc_chttp2_publish_reads(
if (was_zero && !is_zero) {
while (grpc_chttp2_list_pop_stalled_by_transport(transport_global,
&stream_global)) {
grpc_chttp2_list_add_writable_stream(transport_global, stream_global);
grpc_chttp2_become_writable(transport_global, stream_global);
}
}

Expand Down Expand Up @@ -178,7 +178,7 @@ void grpc_chttp2_publish_reads(
outgoing_window);
is_zero = stream_global->outgoing_window <= 0;
if (was_zero && !is_zero) {
grpc_chttp2_list_add_writable_stream(transport_global, stream_global);
grpc_chttp2_become_writable(transport_global, stream_global);
}

stream_global->max_recv_bytes -= (uint32_t)GPR_MIN(
Expand Down
38 changes: 21 additions & 17 deletions src/core/transport/chttp2/stream_lists.c
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,14 @@ static void stream_list_remove(grpc_chttp2_transport *t, grpc_chttp2_stream *s,
}
}

static void stream_list_maybe_remove(grpc_chttp2_transport *t,
static bool stream_list_maybe_remove(grpc_chttp2_transport *t,
grpc_chttp2_stream *s,
grpc_chttp2_stream_list_id id) {
if (s->included[id]) {
stream_list_remove(t, s, id);
return true;
} else {
return false;
}
}

Expand All @@ -125,23 +128,24 @@ static void stream_list_add_tail(grpc_chttp2_transport *t,
s->included[id] = 1;
}

static int stream_list_add(grpc_chttp2_transport *t, grpc_chttp2_stream *s,
grpc_chttp2_stream_list_id id) {
static bool stream_list_add(grpc_chttp2_transport *t, grpc_chttp2_stream *s,
grpc_chttp2_stream_list_id id) {
if (s->included[id]) {
return 0;
return false;
}
stream_list_add_tail(t, s, id);
return 1;
return true;
}

/* wrappers for specializations */

void grpc_chttp2_list_add_writable_stream(
bool grpc_chttp2_list_add_writable_stream(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global) {
GPR_ASSERT(stream_global->id != 0);
stream_list_add(TRANSPORT_FROM_GLOBAL(transport_global),
STREAM_FROM_GLOBAL(stream_global), GRPC_CHTTP2_LIST_WRITABLE);
return stream_list_add(TRANSPORT_FROM_GLOBAL(transport_global),
STREAM_FROM_GLOBAL(stream_global),
GRPC_CHTTP2_LIST_WRITABLE);
}

int grpc_chttp2_list_pop_writable_stream(
Expand All @@ -159,20 +163,20 @@ int grpc_chttp2_list_pop_writable_stream(
return r;
}

void grpc_chttp2_list_remove_writable_stream(
bool grpc_chttp2_list_remove_writable_stream(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global) {
stream_list_maybe_remove(TRANSPORT_FROM_GLOBAL(transport_global),
STREAM_FROM_GLOBAL(stream_global),
GRPC_CHTTP2_LIST_WRITABLE);
return stream_list_maybe_remove(TRANSPORT_FROM_GLOBAL(transport_global),
STREAM_FROM_GLOBAL(stream_global),
GRPC_CHTTP2_LIST_WRITABLE);
}

int grpc_chttp2_list_add_writing_stream(
void grpc_chttp2_list_add_writing_stream(
grpc_chttp2_transport_writing *transport_writing,
grpc_chttp2_stream_writing *stream_writing) {
return stream_list_add(TRANSPORT_FROM_WRITING(transport_writing),
STREAM_FROM_WRITING(stream_writing),
GRPC_CHTTP2_LIST_WRITING);
GPR_ASSERT(stream_list_add(TRANSPORT_FROM_WRITING(transport_writing),
STREAM_FROM_WRITING(stream_writing),
GRPC_CHTTP2_LIST_WRITING));
}

int grpc_chttp2_list_have_writing_streams(
Expand Down Expand Up @@ -332,7 +336,7 @@ void grpc_chttp2_list_flush_writing_stalled_by_transport(
while (stream_list_pop(transport, &stream,
GRPC_CHTTP2_LIST_WRITING_STALLED_BY_TRANSPORT)) {
if (is_window_available) {
grpc_chttp2_list_add_writable_stream(&transport->global, &stream->global);
grpc_chttp2_become_writable(&transport->global, &stream->global);
} else {
grpc_chttp2_list_add_stalled_by_transport(transport_writing,
&stream->writing);
Expand Down
37 changes: 14 additions & 23 deletions src/core/transport/chttp2/writing.c
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ int grpc_chttp2_unlocking_check_writes(
(according to available window sizes) and add to the output buffer */
while (grpc_chttp2_list_pop_writable_stream(
transport_global, transport_writing, &stream_global, &stream_writing)) {
uint8_t sent_initial_metadata;
bool sent_initial_metadata = stream_writing->sent_initial_metadata;
bool become_writable = false;

stream_writing->id = stream_global->id;
stream_writing->read_closed = stream_global->read_closed;
Expand All @@ -92,16 +93,12 @@ int grpc_chttp2_unlocking_check_writes(
outgoing_window, stream_global,
outgoing_window);

sent_initial_metadata = stream_writing->sent_initial_metadata;
if (!sent_initial_metadata && stream_global->send_initial_metadata) {
stream_writing->send_initial_metadata =
stream_global->send_initial_metadata;
stream_global->send_initial_metadata = NULL;
if (grpc_chttp2_list_add_writing_stream(transport_writing,
stream_writing)) {
GRPC_CHTTP2_STREAM_REF(stream_global, "chttp2_writing");
}
sent_initial_metadata = 1;
become_writable = true;
sent_initial_metadata = true;
}
if (sent_initial_metadata) {
if (stream_global->send_message != NULL) {
Expand All @@ -128,10 +125,7 @@ int grpc_chttp2_unlocking_check_writes(
stream_writing->flow_controlled_buffer.length > 0) &&
stream_writing->outgoing_window > 0) {
if (transport_writing->outgoing_window > 0) {
if (grpc_chttp2_list_add_writing_stream(transport_writing,
stream_writing)) {
GRPC_CHTTP2_STREAM_REF(stream_global, "chttp2_writing");
}
become_writable = true;
} else {
grpc_chttp2_list_add_stalled_by_transport(transport_writing,
stream_writing);
Expand All @@ -141,10 +135,7 @@ int grpc_chttp2_unlocking_check_writes(
stream_writing->send_trailing_metadata =
stream_global->send_trailing_metadata;
stream_global->send_trailing_metadata = NULL;
if (grpc_chttp2_list_add_writing_stream(transport_writing,
stream_writing)) {
GRPC_CHTTP2_STREAM_REF(stream_global, "chttp2_writing");
}
become_writable = true;
}
}

Expand All @@ -153,10 +144,13 @@ int grpc_chttp2_unlocking_check_writes(
GRPC_CHTTP2_FLOW_MOVE_STREAM("write", transport_global, stream_writing,
announce_window, stream_global,
unannounced_incoming_window_for_writing);
if (grpc_chttp2_list_add_writing_stream(transport_writing,
stream_writing)) {
GRPC_CHTTP2_STREAM_REF(stream_global, "chttp2_writing");
}
become_writable = true;
}

if (become_writable) {
grpc_chttp2_list_add_writing_stream(transport_writing, stream_writing);
} else {
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "chttp2_writing");
}
}

Expand Down Expand Up @@ -310,10 +304,7 @@ static void finalize_outbuf(grpc_exec_ctx *exec_ctx,
(stream_writing->send_message && !stream_writing->fetching)) &&
stream_writing->outgoing_window > 0) {
if (transport_writing->outgoing_window > 0) {
if (grpc_chttp2_list_add_writing_stream(transport_writing,
stream_writing)) {
/* do nothing - already reffed */
}
grpc_chttp2_list_add_writing_stream(transport_writing, stream_writing);
} else {
grpc_chttp2_list_add_writing_stalled_by_transport(transport_writing,
stream_writing);
Expand Down
Loading

0 comments on commit 0cb803d

Please sign in to comment.