Skip to content
Snippets Groups Projects
Commit e107b0d4 authored by ncteisen's avatar ncteisen
Browse files

Add to codegen interface, refactor proto serialization

This change allows for some internal proto serialization
changes to be made
parent c63a119f
No related branches found
No related tags found
No related merge requests found
...@@ -34,6 +34,8 @@ ...@@ -34,6 +34,8 @@
#ifndef GRPCXX_IMPL_CODEGEN_CONFIG_PROTOBUF_H #ifndef GRPCXX_IMPL_CODEGEN_CONFIG_PROTOBUF_H
#define GRPCXX_IMPL_CODEGEN_CONFIG_PROTOBUF_H #define GRPCXX_IMPL_CODEGEN_CONFIG_PROTOBUF_H
#define GRPC_OPEN_SOURCE_PROTO
#ifndef GRPC_CUSTOM_PROTOBUF_INT64 #ifndef GRPC_CUSTOM_PROTOBUF_INT64
#include <google/protobuf/stubs/common.h> #include <google/protobuf/stubs/common.h>
#define GRPC_CUSTOM_PROTOBUF_INT64 ::google::protobuf::int64 #define GRPC_CUSTOM_PROTOBUF_INT64 ::google::protobuf::int64
... ...
......
...@@ -79,11 +79,15 @@ class CoreCodegen : public CoreCodegenInterface { ...@@ -79,11 +79,15 @@ class CoreCodegen : public CoreCodegenInterface {
grpc_byte_buffer* grpc_raw_byte_buffer_create(grpc_slice* slice, grpc_byte_buffer* grpc_raw_byte_buffer_create(grpc_slice* slice,
size_t nslices) override; size_t nslices) override;
grpc_slice grpc_slice_new_with_user_data(void* p, size_t len,
void (*destroy)(void*),
void* user_data) override;
grpc_slice grpc_empty_slice() override; grpc_slice grpc_empty_slice() override;
grpc_slice grpc_slice_malloc(size_t length) override; grpc_slice grpc_slice_malloc(size_t length) override;
void grpc_slice_unref(grpc_slice slice) override; void grpc_slice_unref(grpc_slice slice) override;
grpc_slice grpc_slice_ref(grpc_slice slice) override;
grpc_slice grpc_slice_split_tail(grpc_slice* s, size_t split) override; grpc_slice grpc_slice_split_tail(grpc_slice* s, size_t split) override;
grpc_slice grpc_slice_split_head(grpc_slice* s, size_t split) override;
void grpc_slice_buffer_add(grpc_slice_buffer* sb, grpc_slice slice) override; void grpc_slice_buffer_add(grpc_slice_buffer* sb, grpc_slice slice) override;
void grpc_slice_buffer_pop(grpc_slice_buffer* sb) override; void grpc_slice_buffer_pop(grpc_slice_buffer* sb) override;
grpc_slice grpc_slice_from_static_buffer(const void* buffer, grpc_slice grpc_slice_from_static_buffer(const void* buffer,
... ...
......
...@@ -95,11 +95,15 @@ class CoreCodegenInterface { ...@@ -95,11 +95,15 @@ class CoreCodegenInterface {
virtual grpc_byte_buffer* grpc_raw_byte_buffer_create(grpc_slice* slice, virtual grpc_byte_buffer* grpc_raw_byte_buffer_create(grpc_slice* slice,
size_t nslices) = 0; size_t nslices) = 0;
virtual grpc_slice grpc_slice_new_with_user_data(void* p, size_t len,
void (*destroy)(void*),
void* user_data) = 0;
virtual grpc_slice grpc_empty_slice() = 0; virtual grpc_slice grpc_empty_slice() = 0;
virtual grpc_slice grpc_slice_malloc(size_t length) = 0; virtual grpc_slice grpc_slice_malloc(size_t length) = 0;
virtual void grpc_slice_unref(grpc_slice slice) = 0; virtual void grpc_slice_unref(grpc_slice slice) = 0;
virtual grpc_slice grpc_slice_ref(grpc_slice slice) = 0;
virtual grpc_slice grpc_slice_split_tail(grpc_slice* s, size_t split) = 0; virtual grpc_slice grpc_slice_split_tail(grpc_slice* s, size_t split) = 0;
virtual grpc_slice grpc_slice_split_head(grpc_slice* s, size_t split) = 0;
virtual void grpc_slice_buffer_add(grpc_slice_buffer* sb, virtual void grpc_slice_buffer_add(grpc_slice_buffer* sb,
grpc_slice slice) = 0; grpc_slice slice) = 0;
virtual void grpc_slice_buffer_pop(grpc_slice_buffer* sb) = 0; virtual void grpc_slice_buffer_pop(grpc_slice_buffer* sb) = 0;
... ...
......
...@@ -54,8 +54,7 @@ class GrpcBufferWriterPeer; ...@@ -54,8 +54,7 @@ class GrpcBufferWriterPeer;
const int kGrpcBufferWriterMaxBufferLength = 8192; const int kGrpcBufferWriterMaxBufferLength = 8192;
class GrpcBufferWriter final class GrpcBufferWriter : public ::grpc::protobuf::io::ZeroCopyOutputStream {
: public ::grpc::protobuf::io::ZeroCopyOutputStream {
public: public:
explicit GrpcBufferWriter(grpc_byte_buffer** bp, int block_size) explicit GrpcBufferWriter(grpc_byte_buffer** bp, int block_size)
: block_size_(block_size), byte_count_(0), have_backup_(false) { : block_size_(block_size), byte_count_(0), have_backup_(false) {
...@@ -103,6 +102,8 @@ class GrpcBufferWriter final ...@@ -103,6 +102,8 @@ class GrpcBufferWriter final
grpc::protobuf::int64 ByteCount() const override { return byte_count_; } grpc::protobuf::int64 ByteCount() const override { return byte_count_; }
grpc_slice_buffer* SliceBuffer() { return slice_buffer_; }
private: private:
friend class GrpcBufferWriterPeer; friend class GrpcBufferWriterPeer;
const int block_size_; const int block_size_;
...@@ -113,8 +114,7 @@ class GrpcBufferWriter final ...@@ -113,8 +114,7 @@ class GrpcBufferWriter final
grpc_slice slice_; grpc_slice slice_;
}; };
class GrpcBufferReader final class GrpcBufferReader : public ::grpc::protobuf::io::ZeroCopyInputStream {
: public ::grpc::protobuf::io::ZeroCopyInputStream {
public: public:
explicit GrpcBufferReader(grpc_byte_buffer* buffer) explicit GrpcBufferReader(grpc_byte_buffer* buffer)
: byte_count_(0), backup_count_(0), status_() { : byte_count_(0), backup_count_(0), status_() {
...@@ -175,20 +175,17 @@ class GrpcBufferReader final ...@@ -175,20 +175,17 @@ class GrpcBufferReader final
return byte_count_ - backup_count_; return byte_count_ - backup_count_;
} }
private: protected:
int64_t byte_count_; int64_t byte_count_;
int64_t backup_count_; int64_t backup_count_;
grpc_byte_buffer_reader reader_; grpc_byte_buffer_reader reader_;
grpc_slice slice_; grpc_slice slice_;
Status status_; Status status_;
}; };
} // namespace internal
template <class T> // BufferWriter must be a subclass of io::ZeroCopyOutputStream.
class SerializationTraits<T, typename std::enable_if<std::is_base_of< template <class BufferWriter, class T>
grpc::protobuf::Message, T>::value>::type> { Status GenericSerialize(const grpc::protobuf::Message& msg,
public:
static Status Serialize(const grpc::protobuf::Message& msg,
grpc_byte_buffer** bp, bool* own_buffer) { grpc_byte_buffer** bp, bool* own_buffer) {
*own_buffer = true; *own_buffer = true;
int byte_size = msg.ByteSize(); int byte_size = msg.ByteSize();
...@@ -201,22 +198,23 @@ class SerializationTraits<T, typename std::enable_if<std::is_base_of< ...@@ -201,22 +198,23 @@ class SerializationTraits<T, typename std::enable_if<std::is_base_of<
g_core_codegen_interface->grpc_slice_unref(slice); g_core_codegen_interface->grpc_slice_unref(slice);
return g_core_codegen_interface->ok(); return g_core_codegen_interface->ok();
} else { } else {
internal::GrpcBufferWriter writer( BufferWriter writer(bp, internal::kGrpcBufferWriterMaxBufferLength);
bp, internal::kGrpcBufferWriterMaxBufferLength);
return msg.SerializeToZeroCopyStream(&writer) return msg.SerializeToZeroCopyStream(&writer)
? g_core_codegen_interface->ok() ? g_core_codegen_interface->ok()
: Status(StatusCode::INTERNAL, "Failed to serialize message"); : Status(StatusCode::INTERNAL, "Failed to serialize message");
} }
} }
static Status Deserialize(grpc_byte_buffer* buffer, // BufferReader must be a subclass of io::ZeroCopyInputStream.
template <class BufferReader, class T>
Status GenericDeserialize(grpc_byte_buffer* buffer,
grpc::protobuf::Message* msg) { grpc::protobuf::Message* msg) {
if (buffer == nullptr) { if (buffer == nullptr) {
return Status(StatusCode::INTERNAL, "No payload"); return Status(StatusCode::INTERNAL, "No payload");
} }
Status result = g_core_codegen_interface->ok(); Status result = g_core_codegen_interface->ok();
{ {
internal::GrpcBufferReader reader(buffer); BufferReader reader(buffer);
if (!reader.status().ok()) { if (!reader.status().ok()) {
return reader.status(); return reader.status();
} }
...@@ -232,7 +230,32 @@ class SerializationTraits<T, typename std::enable_if<std::is_base_of< ...@@ -232,7 +230,32 @@ class SerializationTraits<T, typename std::enable_if<std::is_base_of<
g_core_codegen_interface->grpc_byte_buffer_destroy(buffer); g_core_codegen_interface->grpc_byte_buffer_destroy(buffer);
return result; return result;
} }
} // namespace internal
// this is needed so the following class does not conflict with protobuf
// serializers that utilize internal-only tools.
#ifdef GRPC_OPEN_SOURCE_PROTO
// This class provides a protobuf serializer. It translates between protobuf
// objects and grpc_byte_buffers. More information about SerializationTraits can
// be found in include/grpc++/impl/codegen/serialization_traits.h.
template <class T>
class SerializationTraits<T, typename std::enable_if<std::is_base_of<
grpc::protobuf::Message, T>::value>::type> {
public:
static Status Serialize(const grpc::protobuf::Message& msg,
grpc_byte_buffer** bp, bool* own_buffer) {
return internal::GenericSerialize<internal::GrpcBufferWriter, T>(
msg, bp, own_buffer);
}
static Status Deserialize(grpc_byte_buffer* buffer,
grpc::protobuf::Message* msg) {
return internal::GenericDeserialize<internal::GrpcBufferReader, T>(buffer,
msg);
}
}; };
#endif
} // namespace grpc } // namespace grpc
... ...
......
...@@ -116,6 +116,12 @@ grpc_byte_buffer* CoreCodegen::grpc_raw_byte_buffer_create(grpc_slice* slice, ...@@ -116,6 +116,12 @@ grpc_byte_buffer* CoreCodegen::grpc_raw_byte_buffer_create(grpc_slice* slice,
return ::grpc_raw_byte_buffer_create(slice, nslices); return ::grpc_raw_byte_buffer_create(slice, nslices);
} }
grpc_slice CoreCodegen::grpc_slice_new_with_user_data(void* p, size_t len,
void (*destroy)(void*),
void* user_data) {
return ::grpc_slice_new_with_user_data(p, len, destroy, user_data);
}
grpc_slice CoreCodegen::grpc_empty_slice() { return ::grpc_empty_slice(); } grpc_slice CoreCodegen::grpc_empty_slice() { return ::grpc_empty_slice(); }
grpc_slice CoreCodegen::grpc_slice_malloc(size_t length) { grpc_slice CoreCodegen::grpc_slice_malloc(size_t length) {
...@@ -126,10 +132,18 @@ void CoreCodegen::grpc_slice_unref(grpc_slice slice) { ...@@ -126,10 +132,18 @@ void CoreCodegen::grpc_slice_unref(grpc_slice slice) {
::grpc_slice_unref(slice); ::grpc_slice_unref(slice);
} }
grpc_slice CoreCodegen::grpc_slice_ref(grpc_slice slice) {
return ::grpc_slice_ref(slice);
}
grpc_slice CoreCodegen::grpc_slice_split_tail(grpc_slice* s, size_t split) { grpc_slice CoreCodegen::grpc_slice_split_tail(grpc_slice* s, size_t split) {
return ::grpc_slice_split_tail(s, split); return ::grpc_slice_split_tail(s, split);
} }
grpc_slice CoreCodegen::grpc_slice_split_head(grpc_slice* s, size_t split) {
return ::grpc_slice_split_head(s, split);
}
grpc_slice CoreCodegen::grpc_slice_from_static_buffer(const void* buffer, grpc_slice CoreCodegen::grpc_slice_from_static_buffer(const void* buffer,
size_t length) { size_t length) {
return ::grpc_slice_from_static_buffer(buffer, length); return ::grpc_slice_from_static_buffer(buffer, length);
... ...
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment