Skip to content
Snippets Groups Projects
Commit 616dafd5 authored by ncteisen's avatar ncteisen
Browse files
parent 1bec15ee
No related branches found
No related tags found
No related merge requests found
...@@ -19,8 +19,6 @@ ...@@ -19,8 +19,6 @@
#ifndef GRPCXX_IMPL_CODEGEN_CONFIG_PROTOBUF_H #ifndef GRPCXX_IMPL_CODEGEN_CONFIG_PROTOBUF_H
#define GRPCXX_IMPL_CODEGEN_CONFIG_PROTOBUF_H #define GRPCXX_IMPL_CODEGEN_CONFIG_PROTOBUF_H
#define GRPC_OPEN_SOURCE_PROTO
#ifndef GRPC_CUSTOM_PROTOBUF_INT64 #ifndef GRPC_CUSTOM_PROTOBUF_INT64
#include <google/protobuf/stubs/common.h> #include <google/protobuf/stubs/common.h>
#define GRPC_CUSTOM_PROTOBUF_INT64 ::google::protobuf::int64 #define GRPC_CUSTOM_PROTOBUF_INT64 ::google::protobuf::int64
......
...@@ -39,7 +39,8 @@ class GrpcBufferWriterPeer; ...@@ -39,7 +39,8 @@ class GrpcBufferWriterPeer;
const int kGrpcBufferWriterMaxBufferLength = 1024 * 1024; const int kGrpcBufferWriterMaxBufferLength = 1024 * 1024;
class GrpcBufferWriter : public ::grpc::protobuf::io::ZeroCopyOutputStream { class GrpcBufferWriter final
: public ::grpc::protobuf::io::ZeroCopyOutputStream {
public: public:
explicit GrpcBufferWriter(grpc_byte_buffer** bp, int block_size) explicit GrpcBufferWriter(grpc_byte_buffer** bp, int block_size)
: block_size_(block_size), byte_count_(0), have_backup_(false) { : block_size_(block_size), byte_count_(0), have_backup_(false) {
...@@ -87,8 +88,6 @@ class GrpcBufferWriter : public ::grpc::protobuf::io::ZeroCopyOutputStream { ...@@ -87,8 +88,6 @@ class GrpcBufferWriter : public ::grpc::protobuf::io::ZeroCopyOutputStream {
grpc::protobuf::int64 ByteCount() const override { return byte_count_; } grpc::protobuf::int64 ByteCount() const override { return byte_count_; }
grpc_slice_buffer* SliceBuffer() { return slice_buffer_; }
private: private:
friend class GrpcBufferWriterPeer; friend class GrpcBufferWriterPeer;
const int block_size_; const int block_size_;
...@@ -99,7 +98,8 @@ class GrpcBufferWriter : public ::grpc::protobuf::io::ZeroCopyOutputStream { ...@@ -99,7 +98,8 @@ class GrpcBufferWriter : public ::grpc::protobuf::io::ZeroCopyOutputStream {
grpc_slice slice_; grpc_slice slice_;
}; };
class GrpcBufferReader : public ::grpc::protobuf::io::ZeroCopyInputStream { class GrpcBufferReader final
: public ::grpc::protobuf::io::ZeroCopyInputStream {
public: public:
explicit GrpcBufferReader(grpc_byte_buffer* buffer) explicit GrpcBufferReader(grpc_byte_buffer* buffer)
: byte_count_(0), backup_count_(0), status_() { : byte_count_(0), backup_count_(0), status_() {
...@@ -160,7 +160,7 @@ class GrpcBufferReader : public ::grpc::protobuf::io::ZeroCopyInputStream { ...@@ -160,7 +160,7 @@ class GrpcBufferReader : public ::grpc::protobuf::io::ZeroCopyInputStream {
return byte_count_ - backup_count_; return byte_count_ - backup_count_;
} }
protected: private:
int64_t byte_count_; int64_t byte_count_;
int64_t backup_count_; int64_t backup_count_;
grpc_byte_buffer_reader reader_; grpc_byte_buffer_reader reader_;
...@@ -168,83 +168,57 @@ class GrpcBufferReader : public ::grpc::protobuf::io::ZeroCopyInputStream { ...@@ -168,83 +168,57 @@ class GrpcBufferReader : public ::grpc::protobuf::io::ZeroCopyInputStream {
Status status_; Status status_;
}; };
template <class BufferWriter, class T>
Status GenericSerialize(const grpc::protobuf::Message& msg,
grpc_byte_buffer** bp, bool* own_buffer) {
static_assert(
std::is_base_of<protobuf::io::ZeroCopyOutputStream, BufferWriter>::value,
"BufferWriter must be a subclass of io::ZeroCopyOutputStream");
*own_buffer = true;
int byte_size = msg.ByteSize();
if (byte_size <= internal::kGrpcBufferWriterMaxBufferLength) {
grpc_slice slice = g_core_codegen_interface->grpc_slice_malloc(byte_size);
GPR_CODEGEN_ASSERT(
GRPC_SLICE_END_PTR(slice) ==
msg.SerializeWithCachedSizesToArray(GRPC_SLICE_START_PTR(slice)));
*bp = g_core_codegen_interface->grpc_raw_byte_buffer_create(&slice, 1);
g_core_codegen_interface->grpc_slice_unref(slice);
return g_core_codegen_interface->ok();
} else {
BufferWriter writer(bp, internal::kGrpcBufferWriterMaxBufferLength);
return msg.SerializeToZeroCopyStream(&writer)
? g_core_codegen_interface->ok()
: Status(StatusCode::INTERNAL, "Failed to serialize message");
}
}
template <class BufferReader, class T>
Status GenericDeserialize(grpc_byte_buffer* buffer,
grpc::protobuf::Message* msg) {
static_assert(
std::is_base_of<protobuf::io::ZeroCopyInputStream, BufferReader>::value,
"BufferReader must be a subclass of io::ZeroCopyInputStream");
if (buffer == nullptr) {
return Status(StatusCode::INTERNAL, "No payload");
}
Status result = g_core_codegen_interface->ok();
{
BufferReader reader(buffer);
if (!reader.status().ok()) {
return reader.status();
}
::grpc::protobuf::io::CodedInputStream decoder(&reader);
decoder.SetTotalBytesLimit(INT_MAX, INT_MAX);
if (!msg->ParseFromCodedStream(&decoder)) {
result = Status(StatusCode::INTERNAL, msg->InitializationErrorString());
}
if (!decoder.ConsumedEntireMessage()) {
result = Status(StatusCode::INTERNAL, "Did not read entire message");
}
}
g_core_codegen_interface->grpc_byte_buffer_destroy(buffer);
return result;
}
} // namespace internal } // namespace internal
// this is needed so the following class does not conflict with protobuf
// serializers that utilize internal-only tools.
#ifdef GRPC_OPEN_SOURCE_PROTO
// This class provides a protobuf serializer. It translates between protobuf
// objects and grpc_byte_buffers. More information about SerializationTraits can
// be found in include/grpc++/impl/codegen/serialization_traits.h.
template <class T> template <class T>
class SerializationTraits<T, typename std::enable_if<std::is_base_of< class SerializationTraits<T, typename std::enable_if<std::is_base_of<
grpc::protobuf::Message, T>::value>::type> { grpc::protobuf::Message, T>::value>::type> {
public: public:
static Status Serialize(const grpc::protobuf::Message& msg, static Status Serialize(const grpc::protobuf::Message& msg,
grpc_byte_buffer** bp, bool* own_buffer) { grpc_byte_buffer** bp, bool* own_buffer) {
return internal::GenericSerialize<internal::GrpcBufferWriter, T>( *own_buffer = true;
msg, bp, own_buffer); int byte_size = msg.ByteSize();
if (byte_size <= internal::kGrpcBufferWriterMaxBufferLength) {
grpc_slice slice = g_core_codegen_interface->grpc_slice_malloc(byte_size);
GPR_CODEGEN_ASSERT(
GRPC_SLICE_END_PTR(slice) ==
msg.SerializeWithCachedSizesToArray(GRPC_SLICE_START_PTR(slice)));
*bp = g_core_codegen_interface->grpc_raw_byte_buffer_create(&slice, 1);
g_core_codegen_interface->grpc_slice_unref(slice);
return g_core_codegen_interface->ok();
} else {
internal::GrpcBufferWriter writer(
bp, internal::kGrpcBufferWriterMaxBufferLength);
return msg.SerializeToZeroCopyStream(&writer)
? g_core_codegen_interface->ok()
: Status(StatusCode::INTERNAL, "Failed to serialize message");
}
} }
static Status Deserialize(grpc_byte_buffer* buffer, static Status Deserialize(grpc_byte_buffer* buffer,
grpc::protobuf::Message* msg) { grpc::protobuf::Message* msg) {
return internal::GenericDeserialize<internal::GrpcBufferReader, T>(buffer, if (buffer == nullptr) {
msg); return Status(StatusCode::INTERNAL, "No payload");
}
Status result = g_core_codegen_interface->ok();
{
internal::GrpcBufferReader reader(buffer);
if (!reader.status().ok()) {
return reader.status();
}
::grpc::protobuf::io::CodedInputStream decoder(&reader);
decoder.SetTotalBytesLimit(INT_MAX, INT_MAX);
if (!msg->ParseFromCodedStream(&decoder)) {
result = Status(StatusCode::INTERNAL, msg->InitializationErrorString());
}
if (!decoder.ConsumedEntireMessage()) {
result = Status(StatusCode::INTERNAL, "Did not read entire message");
}
}
g_core_codegen_interface->grpc_byte_buffer_destroy(buffer);
return result;
} }
}; };
#endif
} // namespace grpc } // namespace grpc
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment