Skip to content

Commit

Permalink
Workqueue: interface, test, and posix implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
ctiller committed Sep 9, 2015
1 parent 8f48d4e commit 73b6606
Show file tree
Hide file tree
Showing 22 changed files with 799 additions and 14 deletions.
15 changes: 15 additions & 0 deletions BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,9 @@ cc_library(
"src/core/iomgr/udp_server.h",
"src/core/iomgr/wakeup_fd_pipe.h",
"src/core/iomgr/wakeup_fd_posix.h",
"src/core/iomgr/workqueue.h",
"src/core/iomgr/workqueue_posix.h",
"src/core/iomgr/workqueue_windows.h",
"src/core/json/json.h",
"src/core/json/json_common.h",
"src/core/json/json_reader.h",
Expand Down Expand Up @@ -334,6 +337,8 @@ cc_library(
"src/core/iomgr/wakeup_fd_nospecial.c",
"src/core/iomgr/wakeup_fd_pipe.c",
"src/core/iomgr/wakeup_fd_posix.c",
"src/core/iomgr/workqueue_posix.c",
"src/core/iomgr/workqueue_windows.c",
"src/core/json/json.c",
"src/core/json/json_reader.c",
"src/core/json/json_string.c",
Expand Down Expand Up @@ -474,6 +479,9 @@ cc_library(
"src/core/iomgr/udp_server.h",
"src/core/iomgr/wakeup_fd_pipe.h",
"src/core/iomgr/wakeup_fd_posix.h",
"src/core/iomgr/workqueue.h",
"src/core/iomgr/workqueue_posix.h",
"src/core/iomgr/workqueue_windows.h",
"src/core/json/json.h",
"src/core/json/json_common.h",
"src/core/json/json_reader.h",
Expand Down Expand Up @@ -584,6 +592,8 @@ cc_library(
"src/core/iomgr/wakeup_fd_nospecial.c",
"src/core/iomgr/wakeup_fd_pipe.c",
"src/core/iomgr/wakeup_fd_posix.c",
"src/core/iomgr/workqueue_posix.c",
"src/core/iomgr/workqueue_windows.c",
"src/core/json/json.c",
"src/core/json/json_reader.c",
"src/core/json/json_string.c",
Expand Down Expand Up @@ -1098,6 +1108,8 @@ objc_library(
"src/core/iomgr/wakeup_fd_nospecial.c",
"src/core/iomgr/wakeup_fd_pipe.c",
"src/core/iomgr/wakeup_fd_posix.c",
"src/core/iomgr/workqueue_posix.c",
"src/core/iomgr/workqueue_windows.c",
"src/core/json/json.c",
"src/core/json/json_reader.c",
"src/core/json/json_string.c",
Expand Down Expand Up @@ -1235,6 +1247,9 @@ objc_library(
"src/core/iomgr/udp_server.h",
"src/core/iomgr/wakeup_fd_pipe.h",
"src/core/iomgr/wakeup_fd_posix.h",
"src/core/iomgr/workqueue.h",
"src/core/iomgr/workqueue_posix.h",
"src/core/iomgr/workqueue_windows.h",
"src/core/json/json.h",
"src/core/json/json_common.h",
"src/core/json/json_reader.h",
Expand Down
38 changes: 37 additions & 1 deletion Makefile

Large diffs are not rendered by default.

27 changes: 17 additions & 10 deletions build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ filegroups:
src/core/iomgr/socket_windows.h, src/core/iomgr/tcp_client.h, src/core/iomgr/tcp_posix.h,
src/core/iomgr/tcp_server.h, src/core/iomgr/tcp_windows.h, src/core/iomgr/time_averaged_stats.h,
src/core/iomgr/udp_server.h, src/core/iomgr/wakeup_fd_pipe.h, src/core/iomgr/wakeup_fd_posix.h,
src/core/iomgr/workqueue.h, src/core/iomgr/workqueue_posix.h, src/core/iomgr/workqueue_windows.h,
src/core/json/json.h, src/core/json/json_common.h, src/core/json/json_reader.h,
src/core/json/json_writer.h, src/core/profiling/timers.h, src/core/statistics/census_interface.h,
src/core/statistics/census_rpc_stats.h, src/core/surface/byte_buffer_queue.h,
Expand Down Expand Up @@ -104,16 +105,17 @@ filegroups:
src/core/iomgr/tcp_client_windows.c, src/core/iomgr/tcp_posix.c, src/core/iomgr/tcp_server_posix.c,
src/core/iomgr/tcp_server_windows.c, src/core/iomgr/tcp_windows.c, src/core/iomgr/time_averaged_stats.c,
src/core/iomgr/udp_server.c, src/core/iomgr/wakeup_fd_eventfd.c, src/core/iomgr/wakeup_fd_nospecial.c,
src/core/iomgr/wakeup_fd_pipe.c, src/core/iomgr/wakeup_fd_posix.c, src/core/json/json.c,
src/core/json/json_reader.c, src/core/json/json_string.c, src/core/json/json_writer.c,
src/core/profiling/basic_timers.c, src/core/profiling/stap_timers.c, src/core/surface/byte_buffer.c,
src/core/surface/byte_buffer_queue.c, src/core/surface/byte_buffer_reader.c, src/core/surface/call.c,
src/core/surface/call_details.c, src/core/surface/call_log_batch.c, src/core/surface/channel.c,
src/core/surface/channel_connectivity.c, src/core/surface/channel_create.c, src/core/surface/completion_queue.c,
src/core/surface/event_string.c, src/core/surface/init.c, src/core/surface/lame_client.c,
src/core/surface/metadata_array.c, src/core/surface/server.c, src/core/surface/server_chttp2.c,
src/core/surface/server_create.c, src/core/surface/surface_trace.c, src/core/surface/version.c,
src/core/transport/chttp2/alpn.c, src/core/transport/chttp2/bin_encoder.c, src/core/transport/chttp2/frame_data.c,
src/core/iomgr/wakeup_fd_pipe.c, src/core/iomgr/wakeup_fd_posix.c, src/core/iomgr/workqueue_posix.c,
src/core/iomgr/workqueue_windows.c, src/core/json/json.c, src/core/json/json_reader.c,
src/core/json/json_string.c, src/core/json/json_writer.c, src/core/profiling/basic_timers.c,
src/core/profiling/stap_timers.c, src/core/surface/byte_buffer.c, src/core/surface/byte_buffer_queue.c,
src/core/surface/byte_buffer_reader.c, src/core/surface/call.c, src/core/surface/call_details.c,
src/core/surface/call_log_batch.c, src/core/surface/channel.c, src/core/surface/channel_connectivity.c,
src/core/surface/channel_create.c, src/core/surface/completion_queue.c, src/core/surface/event_string.c,
src/core/surface/init.c, src/core/surface/lame_client.c, src/core/surface/metadata_array.c,
src/core/surface/server.c, src/core/surface/server_chttp2.c, src/core/surface/server_create.c,
src/core/surface/surface_trace.c, src/core/surface/version.c, src/core/transport/chttp2/alpn.c,
src/core/transport/chttp2/bin_encoder.c, src/core/transport/chttp2/frame_data.c,
src/core/transport/chttp2/frame_goaway.c, src/core/transport/chttp2/frame_ping.c,
src/core/transport/chttp2/frame_rst_stream.c, src/core/transport/chttp2/frame_settings.c,
src/core/transport/chttp2/frame_window_update.c, src/core/transport/chttp2/hpack_parser.c,
Expand Down Expand Up @@ -741,6 +743,11 @@ targets:
language: c
src: [test/core/client_config/uri_parser_test.c]
deps: [grpc_test_util, grpc, gpr_test_util, gpr]
- name: workqueue_test
build: test
language: c
src: [test/core/iomgr/workqueue_test.c]
deps: [grpc_test_util, grpc, gpr_test_util, gpr]
- name: async_end2end_test
build: test
language: c++
Expand Down
8 changes: 8 additions & 0 deletions gRPC.podspec
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,9 @@ Pod::Spec.new do |s|
'src/core/iomgr/udp_server.h',
'src/core/iomgr/wakeup_fd_pipe.h',
'src/core/iomgr/wakeup_fd_posix.h',
'src/core/iomgr/workqueue.h',
'src/core/iomgr/workqueue_posix.h',
'src/core/iomgr/workqueue_windows.h',
'src/core/json/json.h',
'src/core/json/json_common.h',
'src/core/json/json_reader.h',
Expand Down Expand Up @@ -343,6 +346,8 @@ Pod::Spec.new do |s|
'src/core/iomgr/wakeup_fd_nospecial.c',
'src/core/iomgr/wakeup_fd_pipe.c',
'src/core/iomgr/wakeup_fd_posix.c',
'src/core/iomgr/workqueue_posix.c',
'src/core/iomgr/workqueue_windows.c',
'src/core/json/json.c',
'src/core/json/json_reader.c',
'src/core/json/json_string.c',
Expand Down Expand Up @@ -480,6 +485,9 @@ Pod::Spec.new do |s|
'src/core/iomgr/udp_server.h',
'src/core/iomgr/wakeup_fd_pipe.h',
'src/core/iomgr/wakeup_fd_posix.h',
'src/core/iomgr/workqueue.h',
'src/core/iomgr/workqueue_posix.h',
'src/core/iomgr/workqueue_windows.h',
'src/core/json/json.h',
'src/core/json/json_common.h',
'src/core/json/json_reader.h',
Expand Down
2 changes: 1 addition & 1 deletion src/core/iomgr/wakeup_fd_eventfd.c
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ static void eventfd_wakeup(grpc_wakeup_fd *fd_info) {
}

static void eventfd_destroy(grpc_wakeup_fd *fd_info) {
close(fd_info->read_fd);
if (fd_info->read_fd != 0) close(fd_info->read_fd);
}

static int eventfd_check_availability(void) {
Expand Down
4 changes: 2 additions & 2 deletions src/core/iomgr/wakeup_fd_pipe.c
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ static void pipe_wakeup(grpc_wakeup_fd *fd_info) {
}

static void pipe_destroy(grpc_wakeup_fd *fd_info) {
close(fd_info->read_fd);
close(fd_info->write_fd);
if (fd_info->read_fd != 0) close(fd_info->read_fd);
if (fd_info->write_fd != 0) close(fd_info->write_fd);
}

static int pipe_check_availability(void) {
Expand Down
66 changes: 66 additions & 0 deletions src/core/iomgr/workqueue.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/

#ifndef GRPC_INTERNAL_CORE_IOMGR_WORKQUEUE_H
#define GRPC_INTERNAL_CORE_IOMGR_WORKQUEUE_H

#include "src/core/iomgr/iomgr.h"
#include "src/core/iomgr/pollset.h"

#ifdef GPR_POSIX_SOCKET
#include "src/core/iomgr/workqueue_posix.h"
#endif

#ifdef GPR_WIN32
#include "src/core/iomgr/workqueue_windows.h"
#endif

/** A workqueue represents a list of work to be executed asynchronously. */
struct grpc_workqueue;
typedef struct grpc_workqueue grpc_workqueue;

/** Create a work queue */
grpc_workqueue *grpc_workqueue_create(void);

void grpc_workqueue_ref(grpc_workqueue *workqueue);
void grpc_workqueue_unref(grpc_workqueue *workqueue);

/** Bind this workqueue to a pollset */
void grpc_workqueue_add_to_pollset(grpc_workqueue *workqueue,
grpc_pollset *pollset);

/** Add a work item to a workqueue */
void grpc_workqueue_push(grpc_workqueue *workqueue, grpc_iomgr_closure *closure,
int success);

#endif
138 changes: 138 additions & 0 deletions src/core/iomgr/workqueue_posix.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/

#include <grpc/support/port_platform.h>

#ifdef GPR_POSIX_SOCKET

#include "src/core/iomgr/workqueue.h"

#include <stdio.h>

#include <grpc/support/alloc.h>

#include "src/core/iomgr/fd_posix.h"

struct grpc_workqueue {
gpr_refcount refs;

gpr_mu mu;
grpc_iomgr_closure head;
grpc_iomgr_closure *tail;

grpc_wakeup_fd wakeup_fd;
grpc_fd *wakeup_read_fd;

grpc_iomgr_closure read_closure;
};

static void on_readable(void *arg, int success);

grpc_workqueue *grpc_workqueue_create(void) {
char name[32];
grpc_workqueue *workqueue = gpr_malloc(sizeof(grpc_workqueue));
gpr_ref_init(&workqueue->refs, 1);
gpr_mu_init(&workqueue->mu);
workqueue->head.next = NULL;
workqueue->tail = &workqueue->head;
grpc_wakeup_fd_init(&workqueue->wakeup_fd);
sprintf(name, "workqueue:%p", (void *)workqueue);
workqueue->wakeup_read_fd =
grpc_fd_create(GRPC_WAKEUP_FD_GET_READ_FD(&workqueue->wakeup_fd), name);
grpc_iomgr_closure_init(&workqueue->read_closure, on_readable, workqueue);
grpc_fd_notify_on_read(workqueue->wakeup_read_fd, &workqueue->read_closure);
return workqueue;
}

static void workqueue_destroy(grpc_workqueue *workqueue) {
grpc_fd_shutdown(workqueue->wakeup_read_fd);
}

void grpc_workqueue_ref(grpc_workqueue *workqueue) {
gpr_ref(&workqueue->refs);
}

void grpc_workqueue_unref(grpc_workqueue *workqueue) {
if (gpr_unref(&workqueue->refs)) {
workqueue_destroy(workqueue);
}
}

void grpc_workqueue_add_to_pollset(grpc_workqueue *workqueue,
grpc_pollset *pollset) {
grpc_pollset_add_fd(pollset, workqueue->wakeup_read_fd);
}

static void on_readable(void *arg, int success) {
grpc_workqueue *workqueue = arg;
grpc_iomgr_closure *todo;

if (!success) {
gpr_mu_destroy(&workqueue->mu);
/* HACK: let wakeup_fd code know that we stole the fd */
workqueue->wakeup_fd.read_fd = 0;
grpc_wakeup_fd_destroy(&workqueue->wakeup_fd);
grpc_fd_orphan(workqueue->wakeup_read_fd, NULL, "destroy");
gpr_free(workqueue);
return;
} else {
gpr_mu_lock(&workqueue->mu);
todo = workqueue->head.next;
workqueue->head.next = NULL;
workqueue->tail = &workqueue->head;
grpc_wakeup_fd_consume_wakeup(&workqueue->wakeup_fd);
gpr_mu_unlock(&workqueue->mu);
grpc_fd_notify_on_read(workqueue->wakeup_read_fd, &workqueue->read_closure);

while (todo) {
grpc_iomgr_closure *next = todo->next;
todo->cb(todo->cb_arg, todo->success);
todo = next;
}
}
}

void grpc_workqueue_push(grpc_workqueue *workqueue, grpc_iomgr_closure *closure,
int success) {
closure->success = success;
closure->next = NULL;
gpr_mu_lock(&workqueue->mu);
if (workqueue->tail == &workqueue->head) {
grpc_wakeup_fd_wakeup(&workqueue->wakeup_fd);
}
workqueue->tail->next = closure;
workqueue->tail = closure;
gpr_mu_unlock(&workqueue->mu);
}

#endif /* GPR_POSIX_SOCKET */
37 changes: 37 additions & 0 deletions src/core/iomgr/workqueue_posix.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/

#ifndef GRPC_INTERNAL_CORE_IOMGR_WORKQUEUE_POSIX_H
#define GRPC_INTERNAL_CORE_IOMGR_WORKQUEUE_POSIX_H

#endif /* GRPC_INTERNAL_CORE_IOMGR_WORKQUEUE_POSIX_H */
Loading

0 comments on commit 73b6606

Please sign in to comment.