diff --git a/include/grpc/support/slice_buffer.h b/include/grpc/support/slice_buffer.h index 56f71ef2349c711507c9f92b1b2f3a7c55ca6d0e..c7e5dbc64708449c0a3ef0c17ab2d8562b1c957e 100644 --- a/include/grpc/support/slice_buffer.h +++ b/include/grpc/support/slice_buffer.h @@ -74,6 +74,8 @@ void gpr_slice_buffer_addn(gpr_slice_buffer *sb, gpr_slice *slices, size_t n); /* add a very small (less than 8 bytes) amount of data to the end of a slice buffer: returns a pointer into which to add the data */ gpr_uint8 *gpr_slice_buffer_tiny_add(gpr_slice_buffer *sb, unsigned len); +/* pop the last buffer, but don't unref it */ +void gpr_slice_buffer_pop(gpr_slice_buffer *sb); /* clear a slice buffer, unref all elements */ void gpr_slice_buffer_reset_and_unref(gpr_slice_buffer *sb); diff --git a/src/core/support/slice_buffer.c b/src/core/support/slice_buffer.c index 6cd51f925c3b46b083dddc19ad4d8a3b360b14e5..b280e4bd020014b26f08de866fa75a2ddb460566 100644 --- a/src/core/support/slice_buffer.c +++ b/src/core/support/slice_buffer.c @@ -143,6 +143,13 @@ void gpr_slice_buffer_addn(gpr_slice_buffer *sb, gpr_slice *s, size_t n) { } } +void gpr_slice_buffer_pop(gpr_slice_buffer *sb) { + if (sb->count != 0) { + size_t count = --sb->count; + sb->length -= GPR_SLICE_LENGTH(sb->slices[count]); + } +} + void gpr_slice_buffer_reset_and_unref(gpr_slice_buffer *sb) { size_t i; diff --git a/src/cpp/proto/proto_utils.cc b/src/cpp/proto/proto_utils.cc index 72f1bf74417e7210cf989eff0c1c6dc88e48066d..e4e51bfebf5e78d048cf6ce447b105307dbc48b1 100644 --- a/src/cpp/proto/proto_utils.cc +++ b/src/cpp/proto/proto_utils.cc @@ -35,38 +35,135 @@ #include <grpc++/config.h> #include <grpc/grpc.h> +#include <grpc/byte_buffer.h> #include <grpc/support/slice.h> +#include <grpc/support/slice_buffer.h> +#include <grpc/support/port_platform.h> +#include <google/protobuf/io/zero_copy_stream.h> -namespace grpc { +const int kMaxBufferLength = 8192; -bool SerializeProto(const grpc::protobuf::Message &msg, - grpc_byte_buffer **bp) { - grpc::string msg_str; - bool success = msg.SerializeToString(&msg_str); - if (success) { - gpr_slice slice = - gpr_slice_from_copied_buffer(msg_str.data(), msg_str.length()); - *bp = grpc_byte_buffer_create(&slice, 1); - gpr_slice_unref(slice); +class GrpcBufferWriter GRPC_FINAL + : public ::google::protobuf::io::ZeroCopyOutputStream { + public: + explicit GrpcBufferWriter(grpc_byte_buffer **bp, + int block_size = kMaxBufferLength) + : block_size_(block_size), byte_count_(0), have_backup_(false) { + *bp = grpc_byte_buffer_create(NULL, 0); + slice_buffer_ = &(*bp)->data.slice_buffer; + } + + ~GrpcBufferWriter() GRPC_OVERRIDE { + if (have_backup_) { + gpr_slice_unref(backup_slice_); + } + } + + bool Next(void **data, int *size) GRPC_OVERRIDE { + if (have_backup_) { + slice_ = backup_slice_; + have_backup_ = false; + } else { + slice_ = gpr_slice_malloc(block_size_); + } + *data = GPR_SLICE_START_PTR(slice_); + byte_count_ += *size = GPR_SLICE_LENGTH(slice_); + gpr_slice_buffer_add(slice_buffer_, slice_); + return true; + } + + void BackUp(int count) GRPC_OVERRIDE { + gpr_slice_buffer_pop(slice_buffer_); + if (count == block_size_) { + backup_slice_ = slice_; + } else { + backup_slice_ = + gpr_slice_split_tail(&slice_, GPR_SLICE_LENGTH(slice_) - count); + gpr_slice_buffer_add(slice_buffer_, slice_); + } + have_backup_ = true; + byte_count_ -= count; + } + + gpr_int64 ByteCount() const GRPC_OVERRIDE { return byte_count_; } + + private: + const int block_size_; + gpr_int64 byte_count_; + gpr_slice_buffer *slice_buffer_; + bool have_backup_; + gpr_slice backup_slice_; + gpr_slice slice_; +}; + +class GrpcBufferReader GRPC_FINAL + : public ::google::protobuf::io::ZeroCopyInputStream { + public: + explicit GrpcBufferReader(grpc_byte_buffer *buffer) + : byte_count_(0), backup_count_(0) { + reader_ = grpc_byte_buffer_reader_create(buffer); + } + ~GrpcBufferReader() GRPC_OVERRIDE { + grpc_byte_buffer_reader_destroy(reader_); } - return success; -} -bool DeserializeProto(grpc_byte_buffer *buffer, - grpc::protobuf::Message *msg) { - grpc::string msg_string; - grpc_byte_buffer_reader *reader = grpc_byte_buffer_reader_create(buffer); - gpr_slice slice; - while (grpc_byte_buffer_reader_next(reader, &slice)) { - const char *data = reinterpret_cast<const char *>( - slice.refcount ? slice.data.refcounted.bytes - : slice.data.inlined.bytes); - msg_string.append(data, slice.refcount ? slice.data.refcounted.length - : slice.data.inlined.length); - gpr_slice_unref(slice); + bool Next(const void **data, int *size) GRPC_OVERRIDE { + if (backup_count_ > 0) { + *data = GPR_SLICE_START_PTR(slice_) + GPR_SLICE_LENGTH(slice_) - + backup_count_; + *size = backup_count_; + backup_count_ = 0; + return true; + } + if (!grpc_byte_buffer_reader_next(reader_, &slice_)) { + return false; + } + gpr_slice_unref(slice_); + *data = GPR_SLICE_START_PTR(slice_); + byte_count_ += *size = GPR_SLICE_LENGTH(slice_); + return true; } - grpc_byte_buffer_reader_destroy(reader); - return msg->ParseFromString(msg_string); + + void BackUp(int count) GRPC_OVERRIDE { + backup_count_ = count; + } + + bool Skip(int count) GRPC_OVERRIDE { + const void *data; + int size; + while (Next(&data, &size)) { + if (size >= count) { + BackUp(size - count); + return true; + } + // size < count; + count -= size; + } + // error or we have too large count; + return false; + } + + gpr_int64 ByteCount() const GRPC_OVERRIDE { + return byte_count_ - backup_count_; + } + + private: + gpr_int64 byte_count_; + gpr_int64 backup_count_; + grpc_byte_buffer_reader *reader_; + gpr_slice slice_; +}; + +namespace grpc { + +bool SerializeProto(const grpc::protobuf::Message &msg, grpc_byte_buffer **bp) { + GrpcBufferWriter writer(bp); + return msg.SerializeToZeroCopyStream(&writer); +} + +bool DeserializeProto(grpc_byte_buffer *buffer, grpc::protobuf::Message *msg) { + GrpcBufferReader reader(buffer); + return msg->ParseFromZeroCopyStream(&reader); } } // namespace grpc diff --git a/test/core/support/slice_buffer_test.c b/test/core/support/slice_buffer_test.c index 8301795dbfda7b275785c8a71e8f9f4c728b73d5..a48278434f1be3e0a81ebbb403a3c807c8f0b37a 100644 --- a/test/core/support/slice_buffer_test.c +++ b/test/core/support/slice_buffer_test.c @@ -62,8 +62,13 @@ int main(int argc, char **argv) { } GPR_ASSERT(buf.count > 0); GPR_ASSERT(buf.length == 50); - gpr_slice_unref(aaa); - gpr_slice_unref(bb); + for (i = 0; i < 10; i++) { + gpr_slice_buffer_pop(&buf); + gpr_slice_unref(aaa); + gpr_slice_unref(bb); + } + GPR_ASSERT(buf.count == 0); + GPR_ASSERT(buf.length == 0); gpr_slice_buffer_destroy(&buf); return 0;