Skip to content

Commit

Permalink
Merge pull request grpc#7190 from dgquintas/bb_reader_error
Browse files Browse the repository at this point in the history
Return success status of grpc_byte_buffer_reader_init
  • Loading branch information
nicolasnoble authored Jul 7, 2016
2 parents 8fddced + b44b385 commit e705dc4
Show file tree
Hide file tree
Showing 19 changed files with 104 additions and 40 deletions.
4 changes: 2 additions & 2 deletions include/grpc++/impl/codegen/core_codegen.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ class CoreCodegen : public CoreCodegenInterface {

void grpc_byte_buffer_destroy(grpc_byte_buffer* bb) GRPC_OVERRIDE;

void grpc_byte_buffer_reader_init(grpc_byte_buffer_reader* reader,
grpc_byte_buffer* buffer) GRPC_OVERRIDE;
int grpc_byte_buffer_reader_init(grpc_byte_buffer_reader* reader,
grpc_byte_buffer* buffer) GRPC_OVERRIDE;
void grpc_byte_buffer_reader_destroy(grpc_byte_buffer_reader* reader)
GRPC_OVERRIDE;
int grpc_byte_buffer_reader_next(grpc_byte_buffer_reader* reader,
Expand Down
5 changes: 3 additions & 2 deletions include/grpc++/impl/codegen/core_codegen_interface.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,9 @@ class CoreCodegenInterface {

virtual void grpc_byte_buffer_destroy(grpc_byte_buffer* bb) = 0;

virtual void grpc_byte_buffer_reader_init(grpc_byte_buffer_reader* reader,
grpc_byte_buffer* buffer) = 0;
virtual int grpc_byte_buffer_reader_init(grpc_byte_buffer_reader* reader,
grpc_byte_buffer* buffer)
GRPC_MUST_USE_RESULT = 0;
virtual void grpc_byte_buffer_reader_destroy(
grpc_byte_buffer_reader* reader) = 0;
virtual int grpc_byte_buffer_reader_next(grpc_byte_buffer_reader* reader,
Expand Down
17 changes: 15 additions & 2 deletions include/grpc++/impl/codegen/proto_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,14 +111,21 @@ class GrpcBufferReader GRPC_FINAL
: public ::grpc::protobuf::io::ZeroCopyInputStream {
public:
explicit GrpcBufferReader(grpc_byte_buffer* buffer)
: byte_count_(0), backup_count_(0) {
g_core_codegen_interface->grpc_byte_buffer_reader_init(&reader_, buffer);
: byte_count_(0), backup_count_(0), status_() {
if (!g_core_codegen_interface->grpc_byte_buffer_reader_init(&reader_,
buffer)) {
status_ = Status(StatusCode::INTERNAL,
"Couldn't initialize byte buffer reader");
}
}
~GrpcBufferReader() GRPC_OVERRIDE {
g_core_codegen_interface->grpc_byte_buffer_reader_destroy(&reader_);
}

bool Next(const void** data, int* size) GRPC_OVERRIDE {
if (!status_.ok()) {
return false;
}
if (backup_count_ > 0) {
*data = GPR_SLICE_START_PTR(slice_) + GPR_SLICE_LENGTH(slice_) -
backup_count_;
Expand All @@ -139,6 +146,8 @@ class GrpcBufferReader GRPC_FINAL
return true;
}

Status status() const { return status_; }

void BackUp(int count) GRPC_OVERRIDE { backup_count_ = count; }

bool Skip(int count) GRPC_OVERRIDE {
Expand All @@ -165,6 +174,7 @@ class GrpcBufferReader GRPC_FINAL
int64_t backup_count_;
grpc_byte_buffer_reader reader_;
gpr_slice slice_;
Status status_;
};
} // namespace internal

Expand Down Expand Up @@ -202,6 +212,9 @@ class SerializationTraits<T, typename std::enable_if<std::is_base_of<
Status result = g_core_codegen_interface->ok();
{
internal::GrpcBufferReader reader(buffer);
if (!reader.status().ok()) {
return reader.status();
}
::grpc::protobuf::io::CodedInputStream decoder(&reader);
if (max_message_size > 0) {
decoder.SetTotalBytesLimit(max_message_size, max_message_size);
Expand Down
2 changes: 1 addition & 1 deletion include/grpc++/support/byte_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class ByteBuffer GRPC_FINAL {
ByteBuffer& operator=(const ByteBuffer&);

/// Dump (read) the buffer contents into \a slices.
void Dump(std::vector<Slice>* slices) const;
Status Dump(std::vector<Slice>* slices) const;

/// Remove all data.
void Clear();
Expand Down
9 changes: 5 additions & 4 deletions include/grpc/impl/codegen/byte_buffer.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
*
* Copyright 2015, Google Inc.
* Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
Expand Down Expand Up @@ -93,9 +93,10 @@ GRPCAPI void grpc_byte_buffer_destroy(grpc_byte_buffer *byte_buffer);
struct grpc_byte_buffer_reader;
typedef struct grpc_byte_buffer_reader grpc_byte_buffer_reader;

/** Initialize \a reader to read over \a buffer */
GRPCAPI void grpc_byte_buffer_reader_init(grpc_byte_buffer_reader *reader,
grpc_byte_buffer *buffer);
/** Initialize \a reader to read over \a buffer.
* Returns 1 upon success, 0 otherwise. */
GRPCAPI int grpc_byte_buffer_reader_init(grpc_byte_buffer_reader *reader,
grpc_byte_buffer *buffer);

/** Cleanup and destroy \a reader */
GRPCAPI void grpc_byte_buffer_reader_destroy(grpc_byte_buffer_reader *reader);
Expand Down
10 changes: 6 additions & 4 deletions src/core/lib/surface/byte_buffer_reader.c
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ static int is_compressed(grpc_byte_buffer *buffer) {
return 1 /* GPR_TRUE */;
}

void grpc_byte_buffer_reader_init(grpc_byte_buffer_reader *reader,
grpc_byte_buffer *buffer) {
int grpc_byte_buffer_reader_init(grpc_byte_buffer_reader *reader,
grpc_byte_buffer *buffer) {
gpr_slice_buffer decompressed_slices_buffer;
reader->buffer_in = buffer;
switch (reader->buffer_in->type) {
Expand All @@ -67,9 +67,10 @@ void grpc_byte_buffer_reader_init(grpc_byte_buffer_reader *reader,
&decompressed_slices_buffer) == 0) {
gpr_log(GPR_ERROR,
"Unexpected error decompressing data for algorithm with enum "
"value '%d'. Reading data as if it were uncompressed.",
"value '%d'.",
reader->buffer_in->data.raw.compression);
reader->buffer_out = reader->buffer_in;
memset(reader, 0, sizeof(*reader));
return 0;
} else { /* all fine */
reader->buffer_out =
grpc_raw_byte_buffer_create(decompressed_slices_buffer.slices,
Expand All @@ -82,6 +83,7 @@ void grpc_byte_buffer_reader_init(grpc_byte_buffer_reader *reader,
reader->current.index = 0;
break;
}
return 1;
}

void grpc_byte_buffer_reader_destroy(grpc_byte_buffer_reader *reader) {
Expand Down
6 changes: 3 additions & 3 deletions src/cpp/common/core_codegen.cc
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,9 @@ void CoreCodegen::grpc_byte_buffer_destroy(grpc_byte_buffer* bb) {
::grpc_byte_buffer_destroy(bb);
}

void CoreCodegen::grpc_byte_buffer_reader_init(grpc_byte_buffer_reader* reader,
grpc_byte_buffer* buffer) {
::grpc_byte_buffer_reader_init(reader, buffer);
int CoreCodegen::grpc_byte_buffer_reader_init(grpc_byte_buffer_reader* reader,
grpc_byte_buffer* buffer) {
return ::grpc_byte_buffer_reader_init(reader, buffer);
}

void CoreCodegen::grpc_byte_buffer_reader_destroy(
Expand Down
10 changes: 7 additions & 3 deletions src/cpp/util/byte_buffer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -58,18 +58,22 @@ void ByteBuffer::Clear() {
}
}

void ByteBuffer::Dump(std::vector<Slice>* slices) const {
Status ByteBuffer::Dump(std::vector<Slice>* slices) const {
slices->clear();
if (!buffer_) {
return;
return Status(StatusCode::FAILED_PRECONDITION, "Buffer not initialized");
}
grpc_byte_buffer_reader reader;
grpc_byte_buffer_reader_init(&reader, buffer_);
if (!grpc_byte_buffer_reader_init(&reader, buffer_)) {
return Status(StatusCode::INTERNAL,
"Couldn't initialize byte buffer reader");
}
gpr_slice s;
while (grpc_byte_buffer_reader_next(&reader, &s)) {
slices->push_back(Slice(s, Slice::STEAL_REF));
}
grpc_byte_buffer_reader_destroy(&reader);
return Status::OK;
}

size_t ByteBuffer::Length() const {
Expand Down
2 changes: 2 additions & 0 deletions src/csharp/ext/grpc_csharp_ext.c
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ GPR_EXPORT intptr_t GPR_CALLTYPE grpcsharp_batch_context_recv_message_length(
if (!ctx->recv_message) {
return -1;
}
/* TODO(issue:#7206): check return value of grpc_byte_buffer_reader_init. */
grpc_byte_buffer_reader_init(&reader, ctx->recv_message);
return (intptr_t)grpc_byte_buffer_length(reader.buffer_out);
}
Expand All @@ -267,6 +268,7 @@ GPR_EXPORT void GPR_CALLTYPE grpcsharp_batch_context_recv_message_to_buffer(
gpr_slice slice;
size_t offset = 0;

/* TODO(issue:#7206): check return value of grpc_byte_buffer_reader_init. */
grpc_byte_buffer_reader_init(&reader, ctx->recv_message);

while (grpc_byte_buffer_reader_next(&reader, &slice)) {
Expand Down
5 changes: 4 additions & 1 deletion src/node/ext/byte_buffer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,10 @@ Local<Value> ByteBufferToBuffer(grpc_byte_buffer *buffer) {
return scope.Escape(Nan::Null());
}
grpc_byte_buffer_reader reader;
grpc_byte_buffer_reader_init(&reader, buffer);
if (!grpc_byte_buffer_reader_init(&reader, buffer)) {
Nan::ThrowError("Error initializing byte buffer reader.");
return scope.Escape(Nan::Undefined());
}
gpr_slice slice = grpc_byte_buffer_reader_readall(&reader);
size_t length = GPR_SLICE_LENGTH(slice);
char *result = new char[length];
Expand Down
10 changes: 9 additions & 1 deletion src/objective-c/GRPCClient/private/NSData+GRPC.m
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,15 @@
static void MallocAndCopyByteBufferToCharArray(grpc_byte_buffer *buffer,
size_t *length, char **array) {
grpc_byte_buffer_reader reader;
grpc_byte_buffer_reader_init(&reader, buffer);
if (!grpc_byte_buffer_reader_init(&reader, buffer)) {
// grpc_byte_buffer_reader_init can fail if the data sent by the server
// could not be decompressed for any reason. This is an issue with the data
// coming from the server and thus we want the RPC to fail with error code
// INTERNAL.
*array = NULL;
*length = 0;
return;
}
// The slice contains uncompressed data even if compressed data was received
// because the reader takes care of automatically decompressing it
gpr_slice slice = grpc_byte_buffer_reader_readall(&reader);
Expand Down
6 changes: 3 additions & 3 deletions src/php/ext/grpc/byte_buffer.c
Original file line number Diff line number Diff line change
Expand Up @@ -58,16 +58,16 @@ grpc_byte_buffer *string_to_byte_buffer(char *string, size_t length) {

void byte_buffer_to_string(grpc_byte_buffer *buffer, char **out_string,
size_t *out_length) {
if (buffer == NULL) {
grpc_byte_buffer_reader reader;
if (buffer == NULL || !grpc_byte_buffer_reader_init(&reader, buffer)) {
/* TODO(dgq): distinguish between the error cases. */
*out_string = NULL;
*out_length = 0;
return;
}
size_t length = grpc_byte_buffer_length(buffer);
char *string = ecalloc(length + 1, sizeof(char));
size_t offset = 0;
grpc_byte_buffer_reader reader;
grpc_byte_buffer_reader_init(&reader, buffer);
gpr_slice next;
while (grpc_byte_buffer_reader_next(&reader, &next) != 0) {
memcpy(string + offset, GPR_SLICE_START_PTR(next), GPR_SLICE_LENGTH(next));
Expand Down
4 changes: 2 additions & 2 deletions src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,8 @@ cdef extern from "grpc/_cython/loader.h":
size_t grpc_byte_buffer_length(grpc_byte_buffer *bb) nogil
void grpc_byte_buffer_destroy(grpc_byte_buffer *byte_buffer) nogil

void grpc_byte_buffer_reader_init(grpc_byte_buffer_reader *reader,
grpc_byte_buffer *buffer) nogil
int grpc_byte_buffer_reader_init(grpc_byte_buffer_reader *reader,
grpc_byte_buffer *buffer) nogil
int grpc_byte_buffer_reader_next(grpc_byte_buffer_reader *reader,
gpr_slice *slice) nogil
void grpc_byte_buffer_reader_destroy(grpc_byte_buffer_reader *reader) nogil
Expand Down
6 changes: 5 additions & 1 deletion src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -252,9 +252,13 @@ cdef class ByteBuffer:
cdef gpr_slice data_slice
cdef size_t data_slice_length
cdef void *data_slice_pointer
cdef bint reader_status
if self.c_byte_buffer != NULL:
with nogil:
grpc_byte_buffer_reader_init(&reader, self.c_byte_buffer)
reader_status = grpc_byte_buffer_reader_init(
&reader, self.c_byte_buffer)
if not reader_status:
return None
result = bytearray()
with nogil:
while grpc_byte_buffer_reader_next(&reader, &data_slice):
Expand Down
2 changes: 1 addition & 1 deletion src/python/grpcio/grpc/_cython/imports.generated.h
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,7 @@ extern grpc_byte_buffer_length_type grpc_byte_buffer_length_import;
typedef void(*grpc_byte_buffer_destroy_type)(grpc_byte_buffer *byte_buffer);
extern grpc_byte_buffer_destroy_type grpc_byte_buffer_destroy_import;
#define grpc_byte_buffer_destroy grpc_byte_buffer_destroy_import
typedef void(*grpc_byte_buffer_reader_init_type)(grpc_byte_buffer_reader *reader, grpc_byte_buffer *buffer);
typedef int(*grpc_byte_buffer_reader_init_type)(grpc_byte_buffer_reader *reader, grpc_byte_buffer *buffer);
extern grpc_byte_buffer_reader_init_type grpc_byte_buffer_reader_init_import;
#define grpc_byte_buffer_reader_init grpc_byte_buffer_reader_init_import
typedef void(*grpc_byte_buffer_reader_destroy_type)(grpc_byte_buffer_reader *reader);
Expand Down
5 changes: 4 additions & 1 deletion src/ruby/ext/grpc/rb_byte_buffer.c
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,10 @@ VALUE grpc_rb_byte_buffer_to_s(grpc_byte_buffer *buffer) {
return Qnil;
}
rb_string = rb_str_buf_new(grpc_byte_buffer_length(buffer));
grpc_byte_buffer_reader_init(&reader, buffer);
if (!grpc_byte_buffer_reader_init(&reader, buffer)) {
rb_raise(rb_eRuntimeError, "Error initializing byte buffer reader.");
return Qnil;
}
while (grpc_byte_buffer_reader_next(&reader, &next) != 0) {
rb_str_cat(rb_string, (const char *) GPR_SLICE_START_PTR(next),
GPR_SLICE_LENGTH(next));
Expand Down
2 changes: 1 addition & 1 deletion src/ruby/ext/grpc/rb_grpc_imports.generated.h
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,7 @@ extern grpc_byte_buffer_length_type grpc_byte_buffer_length_import;
typedef void(*grpc_byte_buffer_destroy_type)(grpc_byte_buffer *byte_buffer);
extern grpc_byte_buffer_destroy_type grpc_byte_buffer_destroy_import;
#define grpc_byte_buffer_destroy grpc_byte_buffer_destroy_import
typedef void(*grpc_byte_buffer_reader_init_type)(grpc_byte_buffer_reader *reader, grpc_byte_buffer *buffer);
typedef int(*grpc_byte_buffer_reader_init_type)(grpc_byte_buffer_reader *reader, grpc_byte_buffer *buffer);
extern grpc_byte_buffer_reader_init_type grpc_byte_buffer_reader_init_import;
#define grpc_byte_buffer_reader_init grpc_byte_buffer_reader_init_import
typedef void(*grpc_byte_buffer_reader_destroy_type)(grpc_byte_buffer_reader *reader);
Expand Down
3 changes: 2 additions & 1 deletion test/core/end2end/cq_verifier.c
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,8 @@ int byte_buffer_eq_string(grpc_byte_buffer *bb, const char *str) {
grpc_byte_buffer *rbb;
int res;

grpc_byte_buffer_reader_init(&reader, bb);
GPR_ASSERT(grpc_byte_buffer_reader_init(&reader, bb) &&
"Couldn't init byte buffer reader");
rbb = grpc_raw_byte_buffer_from_reader(&reader);
res = byte_buffer_eq_slice(rbb, gpr_slice_from_copied_string(str));
grpc_byte_buffer_reader_destroy(&reader);
Expand Down
Loading

0 comments on commit e705dc4

Please sign in to comment.