Skip to content

Commit

Permalink
Merge pull request grpc#6481 from dgquintas/compression_md_level_bis
Browse files Browse the repository at this point in the history
Allow servers to select compression level via initial MD & overall compression cleanup
  • Loading branch information
jtattermusch committed Jun 9, 2016
2 parents b81f172 + 37e516e commit 1bc2976
Show file tree
Hide file tree
Showing 83 changed files with 932 additions and 237 deletions.
8 changes: 4 additions & 4 deletions BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ cc_library(
"src/core/lib/channel/connected_channel.c",
"src/core/lib/channel/http_client_filter.c",
"src/core/lib/channel/http_server_filter.c",
"src/core/lib/compression/compression_algorithm.c",
"src/core/lib/compression/compression.c",
"src/core/lib/compression/message_compress.c",
"src/core/lib/debug/trace.c",
"src/core/lib/http/format_request.c",
Expand Down Expand Up @@ -681,7 +681,7 @@ cc_library(
"src/core/lib/channel/connected_channel.c",
"src/core/lib/channel/http_client_filter.c",
"src/core/lib/channel/http_server_filter.c",
"src/core/lib/compression/compression_algorithm.c",
"src/core/lib/compression/compression.c",
"src/core/lib/compression/message_compress.c",
"src/core/lib/debug/trace.c",
"src/core/lib/http/format_request.c",
Expand Down Expand Up @@ -1010,7 +1010,7 @@ cc_library(
"src/core/lib/channel/connected_channel.c",
"src/core/lib/channel/http_client_filter.c",
"src/core/lib/channel/http_server_filter.c",
"src/core/lib/compression/compression_algorithm.c",
"src/core/lib/compression/compression.c",
"src/core/lib/compression/message_compress.c",
"src/core/lib/debug/trace.c",
"src/core/lib/http/format_request.c",
Expand Down Expand Up @@ -1708,7 +1708,7 @@ objc_library(
"src/core/lib/channel/connected_channel.c",
"src/core/lib/channel/http_client_filter.c",
"src/core/lib/channel/http_server_filter.c",
"src/core/lib/compression/compression_algorithm.c",
"src/core/lib/compression/compression.c",
"src/core/lib/compression/message_compress.c",
"src/core/lib/debug/trace.c",
"src/core/lib/http/format_request.c",
Expand Down
6 changes: 3 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2487,7 +2487,7 @@ LIBGRPC_SRC = \
src/core/lib/channel/connected_channel.c \
src/core/lib/channel/http_client_filter.c \
src/core/lib/channel/http_server_filter.c \
src/core/lib/compression/compression_algorithm.c \
src/core/lib/compression/compression.c \
src/core/lib/compression/message_compress.c \
src/core/lib/debug/trace.c \
src/core/lib/http/format_request.c \
Expand Down Expand Up @@ -2753,7 +2753,7 @@ LIBGRPC_CRONET_SRC = \
src/core/lib/channel/connected_channel.c \
src/core/lib/channel/http_client_filter.c \
src/core/lib/channel/http_server_filter.c \
src/core/lib/compression/compression_algorithm.c \
src/core/lib/compression/compression.c \
src/core/lib/compression/message_compress.c \
src/core/lib/debug/trace.c \
src/core/lib/http/format_request.c \
Expand Down Expand Up @@ -3088,7 +3088,7 @@ LIBGRPC_UNSECURE_SRC = \
src/core/lib/channel/connected_channel.c \
src/core/lib/channel/http_client_filter.c \
src/core/lib/channel/http_server_filter.c \
src/core/lib/compression/compression_algorithm.c \
src/core/lib/compression/compression.c \
src/core/lib/compression/message_compress.c \
src/core/lib/debug/trace.c \
src/core/lib/http/format_request.c \
Expand Down
2 changes: 1 addition & 1 deletion binding.gyp
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,7 @@
'src/core/lib/channel/connected_channel.c',
'src/core/lib/channel/http_client_filter.c',
'src/core/lib/channel/http_server_filter.c',
'src/core/lib/compression/compression_algorithm.c',
'src/core/lib/compression/compression.c',
'src/core/lib/compression/message_compress.c',
'src/core/lib/debug/trace.c',
'src/core/lib/http/format_request.c',
Expand Down
2 changes: 1 addition & 1 deletion build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ filegroups:
- src/core/lib/channel/connected_channel.c
- src/core/lib/channel/http_client_filter.c
- src/core/lib/channel/http_server_filter.c
- src/core/lib/compression/compression_algorithm.c
- src/core/lib/compression/compression.c
- src/core/lib/compression/message_compress.c
- src/core/lib/debug/trace.c
- src/core/lib/http/format_request.c
Expand Down
2 changes: 1 addition & 1 deletion config.m4
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ if test "$PHP_GRPC" != "no"; then
src/core/lib/channel/connected_channel.c \
src/core/lib/channel/http_client_filter.c \
src/core/lib/channel/http_server_filter.c \
src/core/lib/compression/compression_algorithm.c \
src/core/lib/compression/compression.c \
src/core/lib/compression/message_compress.c \
src/core/lib/debug/trace.c \
src/core/lib/http/format_request.c \
Expand Down
2 changes: 1 addition & 1 deletion gRPC.podspec
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ Pod::Spec.new do |s|
'src/core/lib/channel/connected_channel.c',
'src/core/lib/channel/http_client_filter.c',
'src/core/lib/channel/http_server_filter.c',
'src/core/lib/compression/compression_algorithm.c',
'src/core/lib/compression/compression.c',
'src/core/lib/compression/message_compress.c',
'src/core/lib/debug/trace.c',
'src/core/lib/http/format_request.c',
Expand Down
2 changes: 1 addition & 1 deletion grpc.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/channel/connected_channel.c )
s.files += %w( src/core/lib/channel/http_client_filter.c )
s.files += %w( src/core/lib/channel/http_server_filter.c )
s.files += %w( src/core/lib/compression/compression_algorithm.c )
s.files += %w( src/core/lib/compression/compression.c )
s.files += %w( src/core/lib/compression/message_compress.c )
s.files += %w( src/core/lib/debug/trace.c )
s.files += %w( src/core/lib/http/format_request.c )
Expand Down
12 changes: 12 additions & 0 deletions include/grpc++/impl/codegen/call.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@
#include <grpc++/impl/codegen/serialization_traits.h>
#include <grpc++/impl/codegen/status.h>
#include <grpc++/impl/codegen/string_ref.h>

#include <grpc/impl/codegen/alloc.h>
#include <grpc/impl/codegen/compression_types.h>
#include <grpc/impl/codegen/grpc_types.h>

struct grpc_byte_buffer;
Expand Down Expand Up @@ -187,6 +189,8 @@ class CallOpSendInitialMetadata {
flags_ = flags;
initial_metadata_count_ = metadata.size();
initial_metadata_ = FillMetadataArray(metadata);
// TODO(dgq): expose compression level in API so it can be properly set.
maybe_compression_level_.is_set = false;
}

protected:
Expand All @@ -198,6 +202,10 @@ class CallOpSendInitialMetadata {
op->reserved = NULL;
op->data.send_initial_metadata.count = initial_metadata_count_;
op->data.send_initial_metadata.metadata = initial_metadata_;
op->data.send_initial_metadata.maybe_compression_level.is_set =
maybe_compression_level_.is_set;
op->data.send_initial_metadata.maybe_compression_level.level =
maybe_compression_level_.level;
}
void FinishOp(bool* status, int max_message_size) {
if (!send_) return;
Expand All @@ -209,6 +217,10 @@ class CallOpSendInitialMetadata {
uint32_t flags_;
size_t initial_metadata_count_;
grpc_metadata* initial_metadata_;
struct {
bool is_set;
grpc_compression_level level;
} maybe_compression_level_;
};

class CallOpSendMessage {
Expand Down
48 changes: 35 additions & 13 deletions include/grpc++/server_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,29 +66,43 @@ class ServerBuilder {
/// The service must exist for the lifetime of the \a Server instance returned
/// by \a BuildAndStart().
/// Matches requests with any :authority
void RegisterService(Service* service);
ServerBuilder& RegisterService(Service* service);

/// Register a generic service.
/// Matches requests with any :authority
void RegisterAsyncGenericService(AsyncGenericService* service);
ServerBuilder& RegisterAsyncGenericService(AsyncGenericService* service);

/// Register a service. This call does not take ownership of the service.
/// The service must exist for the lifetime of the \a Server instance returned
/// by BuildAndStart().
/// Only matches requests with :authority \a host
void RegisterService(const grpc::string& host, Service* service);
ServerBuilder& RegisterService(const grpc::string& host, Service* service);

/// Set max message size in bytes.
void SetMaxMessageSize(int max_message_size) {
ServerBuilder& SetMaxMessageSize(int max_message_size) {
max_message_size_ = max_message_size;
return *this;
}

/// Set the compression options to be used by the server.
void SetCompressionOptions(const grpc_compression_options& options) {
compression_options_ = options;
}
/// Set the support status for compression algorithms. All algorithms are
/// enabled by default.
///
/// Incoming calls compressed with an unsupported algorithm will fail with
/// GRPC_STATUS_UNIMPLEMENTED.
ServerBuilder& SetCompressionAlgorithmSupportStatus(
grpc_compression_algorithm algorithm, bool enabled);

/// The default compression level to use for all channel calls in the
/// absence of a call-specific level.
ServerBuilder& SetDefaultCompressionLevel(grpc_compression_level level);

/// The default compression algorithm to use for all channel calls in the
/// absence of a call-specific level. Note that it overrides any compression
/// level set by \a SetDefaultCompressionLevel.
ServerBuilder& SetDefaultCompressionAlgorithm(
grpc_compression_algorithm algorithm);

void SetOption(std::unique_ptr<ServerBuilderOption> option);
ServerBuilder& SetOption(std::unique_ptr<ServerBuilderOption> option);

/// Tries to bind \a server to the given \a addr.
///
Expand All @@ -101,9 +115,9 @@ class ServerBuilder {
/// number. \a nullptr otherwise.
///
// TODO(dgq): the "port" part seems to be a misnomer.
void AddListeningPort(const grpc::string& addr,
std::shared_ptr<ServerCredentials> creds,
int* selected_port = nullptr);
ServerBuilder& AddListeningPort(const grpc::string& addr,
std::shared_ptr<ServerCredentials> creds,
int* selected_port = nullptr);

/// Add a completion queue for handling asynchronous services
/// Caller is required to keep this completion queue live until
Expand Down Expand Up @@ -144,14 +158,22 @@ class ServerBuilder {
};

int max_message_size_;
grpc_compression_options compression_options_;
std::vector<std::unique_ptr<ServerBuilderOption>> options_;
std::vector<std::unique_ptr<NamedService>> services_;
std::vector<Port> ports_;
std::vector<ServerCompletionQueue*> cqs_;
std::shared_ptr<ServerCredentials> creds_;
std::map<grpc::string, std::unique_ptr<ServerBuilderPlugin>> plugins_;
AsyncGenericService* generic_service_;
struct {
bool is_set;
grpc_compression_level level;
} maybe_default_compression_level_;
struct {
bool is_set;
grpc_compression_algorithm algorithm;
} maybe_default_compression_algorithm_;
uint32_t enabled_compression_algorithms_bitset_;
};

} // namespace grpc
Expand Down
3 changes: 2 additions & 1 deletion include/grpc/compression.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ GRPCAPI int grpc_compression_algorithm_parse(
grpc_compression_algorithm *algorithm);

/** Updates \a name with the encoding name corresponding to a valid \a
* algorithm. Returns 1 upon success, 0 otherwise. */
* algorithm. Note that \a name is statically allocated and must *not* be freed.
* Returns 1 upon success, 0 otherwise. */
GRPCAPI int grpc_compression_algorithm_name(
grpc_compression_algorithm algorithm, char **name);

Expand Down
27 changes: 21 additions & 6 deletions include/grpc/impl/codegen/compression_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,17 @@
#define GRPC_IMPL_CODEGEN_COMPRESSION_TYPES_H

#include <grpc/impl/codegen/port_platform.h>
#include <stdbool.h>

#ifdef __cplusplus
extern "C" {
#endif

/** To be used as initial metadata key for the request of a concrete compression
* algorithm */
#define GRPC_COMPRESSION_REQUEST_ALGORITHM_MD_KEY \
"grpc-internal-encoding-request"

/** To be used in channel arguments */
#define GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM \
"grpc.default_compression_algorithm"
Expand Down Expand Up @@ -74,15 +80,24 @@ typedef struct grpc_compression_options {
*/
uint32_t enabled_algorithms_bitset;

/** The default channel compression algorithm. It'll be used in the absence of
/** The default channel compression level. It'll be used in the absence of
* call specific settings. This option corresponds to the channel argument key
* behind \a GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM */
grpc_compression_algorithm default_compression_algorithm;
* behind \a GRPC_COMPRESSION_CHANNEL_DEFAULT_LEVEL. If present, takes
* precedence over \a default_algorithm.
* TODO(dgq): currently only available for server channels. */
struct {
bool is_set;
grpc_compression_level level;
} default_level;

/** The default channel compression level. It'll be used in the absence of
/** The default channel compression algorithm. It'll be used in the absence of
* call specific settings. This option corresponds to the channel argument key
* behind \a GRPC_COMPRESSION_CHANNEL_DEFAULT_LEVEL */
grpc_compression_algorithm default_compression_level;
* behind \a GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM. */
struct {
bool is_set;
grpc_compression_algorithm algorithm;
} default_algorithm;

} grpc_compression_options;

#ifdef __cplusplus
Expand Down
6 changes: 6 additions & 0 deletions include/grpc/impl/codegen/grpc_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,12 @@ typedef struct grpc_op {
struct {
size_t count;
grpc_metadata *metadata;
/** If \a is_set, \a compression_level will be used for the call.
* Otherwise, \a compression_level won't be considered */
struct {
uint8_t is_set;
grpc_compression_level level;
} maybe_compression_level;
} send_initial_metadata;
grpc_byte_buffer *send_message;
struct {
Expand Down
2 changes: 1 addition & 1 deletion package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@
<file baseinstalldir="/" name="src/core/lib/channel/connected_channel.c" role="src" />
<file baseinstalldir="/" name="src/core/lib/channel/http_client_filter.c" role="src" />
<file baseinstalldir="/" name="src/core/lib/channel/http_server_filter.c" role="src" />
<file baseinstalldir="/" name="src/core/lib/compression/compression_algorithm.c" role="src" />
<file baseinstalldir="/" name="src/core/lib/compression/compression.c" role="src" />
<file baseinstalldir="/" name="src/core/lib/compression/message_compress.c" role="src" />
<file baseinstalldir="/" name="src/core/lib/debug/trace.c" role="src" />
<file baseinstalldir="/" name="src/core/lib/http/format_request.c" role="src" />
Expand Down
23 changes: 17 additions & 6 deletions src/core/lib/channel/channel_args.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include <grpc/grpc.h>
#include "src/core/lib/support/string.h"

#include <grpc/compression.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
Expand Down Expand Up @@ -181,6 +182,7 @@ grpc_compression_algorithm grpc_channel_args_get_compression_algorithm(

grpc_channel_args *grpc_channel_args_set_compression_algorithm(
grpc_channel_args *a, grpc_compression_algorithm algorithm) {
GPR_ASSERT(algorithm < GRPC_COMPRESS_ALGORITHMS_COUNT);
grpc_arg tmp;
tmp.type = GRPC_ARG_INTEGER;
tmp.key = GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM;
Expand All @@ -200,7 +202,8 @@ static int find_compression_algorithm_states_bitset(const grpc_channel_args *a,
!strcmp(GRPC_COMPRESSION_CHANNEL_ENABLED_ALGORITHMS_BITSET,
a->args[i].key)) {
*states_arg = &a->args[i].value.integer;
return 1; /* GPR_TRUE */
**states_arg |= 0x1; /* forcefully enable support for no compression */
return 1;
}
}
}
Expand All @@ -214,10 +217,18 @@ grpc_channel_args *grpc_channel_args_compression_algorithm_set_state(
const int states_arg_found =
find_compression_algorithm_states_bitset(*a, &states_arg);

if (states_arg_found) {
if (grpc_channel_args_get_compression_algorithm(*a) == algorithm &&
state == 0) {
char *algo_name = NULL;
GPR_ASSERT(grpc_compression_algorithm_name(algorithm, &algo_name) != 0);
gpr_log(GPR_ERROR,
"Tried to disable default compression algorithm '%s'. The "
"operation has been ignored.",
algo_name);
} else if (states_arg_found) {
if (state != 0) {
GPR_BITSET((unsigned *)states_arg, algorithm);
} else {
} else if (algorithm != GRPC_COMPRESS_NONE) {
GPR_BITCLEAR((unsigned *)states_arg, algorithm);
}
} else {
Expand All @@ -229,7 +240,7 @@ grpc_channel_args *grpc_channel_args_compression_algorithm_set_state(
tmp.value.integer = (1u << GRPC_COMPRESS_ALGORITHMS_COUNT) - 1;
if (state != 0) {
GPR_BITSET((unsigned *)&tmp.value.integer, algorithm);
} else {
} else if (algorithm != GRPC_COMPRESS_NONE) {
GPR_BITCLEAR((unsigned *)&tmp.value.integer, algorithm);
}
result = grpc_channel_args_copy_and_add(*a, &tmp, 1);
Expand All @@ -239,11 +250,11 @@ grpc_channel_args *grpc_channel_args_compression_algorithm_set_state(
return result;
}

int grpc_channel_args_compression_algorithm_get_states(
uint32_t grpc_channel_args_compression_algorithm_get_states(
const grpc_channel_args *a) {
int *states_arg;
if (find_compression_algorithm_states_bitset(a, &states_arg)) {
return *states_arg;
return (uint32_t)*states_arg;
} else {
return (1u << GRPC_COMPRESS_ALGORITHMS_COUNT) - 1; /* All algs. enabled */
}
Expand Down
2 changes: 1 addition & 1 deletion src/core/lib/channel/channel_args.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ grpc_channel_args *grpc_channel_args_compression_algorithm_set_state(
*
* The i-th bit of the returned bitset corresponds to the i-th entry in the
* grpc_compression_algorithm enum. */
int grpc_channel_args_compression_algorithm_get_states(
uint32_t grpc_channel_args_compression_algorithm_get_states(
const grpc_channel_args *a);

int grpc_channel_args_compare(const grpc_channel_args *a,
Expand Down
Loading

0 comments on commit 1bc2976

Please sign in to comment.