From 81fafa8971d3dc1a070dd77834dc120974a62418 Mon Sep 17 00:00:00 2001
From: Craig Tiller <ctiller@google.com>
Date: Thu, 4 Jun 2015 08:51:17 -0700
Subject: [PATCH] Beginning the cleanup

---
 include/grpc++/async_unary_call.h             |  47 +-
 include/grpc++/completion_queue.h             |   9 +-
 include/grpc++/config.h                       |  45 +-
 include/grpc++/config_protobuf.h              |  75 +++
 include/grpc++/impl/call.h                    | 132 +++++-
 include/grpc++/impl/client_unary_call.h       |  27 +-
 .../grpc++/impl/serialization_traits.h        |  37 +-
 include/grpc++/impl/service_type.h            |  36 +-
 include/grpc++/server.h                       |  18 +-
 include/grpc++/stream.h                       | 438 +++++++++---------
 src/compiler/config.h                         |   1 +
 11 files changed, 521 insertions(+), 344 deletions(-)
 create mode 100644 include/grpc++/config_protobuf.h
 rename src/cpp/client/client_unary_call.cc => include/grpc++/impl/serialization_traits.h (62%)

diff --git a/include/grpc++/async_unary_call.h b/include/grpc++/async_unary_call.h
index 786f8c7184..a296102c3a 100644
--- a/include/grpc++/async_unary_call.h
+++ b/include/grpc++/async_unary_call.h
@@ -58,40 +58,41 @@ template <class R>
 class ClientAsyncResponseReader GRPC_FINAL
     : public ClientAsyncResponseReaderInterface<R> {
  public:
+  template <class W>
   ClientAsyncResponseReader(ChannelInterface* channel, CompletionQueue* cq,
                             const RpcMethod& method, ClientContext* context,
-                            const grpc::protobuf::Message& request)
+                            const W& request)
       : context_(context), call_(channel->CreateCall(method, context, cq)) {
-    init_buf_.AddSendInitialMetadata(&context->send_initial_metadata_);
-    init_buf_.AddSendMessage(request);
-    init_buf_.AddClientSendClose();
+    init_buf_.SendInitialMetadata(context->send_initial_metadata_);
+    init_buf_.SendMessage(request);
+    init_buf_.ClientSendClose();
     call_.PerformOps(&init_buf_);
   }
 
   void ReadInitialMetadata(void* tag) {
     GPR_ASSERT(!context_->initial_metadata_received_);
 
-    meta_buf_.Reset(tag);
-    meta_buf_.AddRecvInitialMetadata(context_);
+    meta_buf_.SetOutputTag(tag);
+    meta_buf_.RecvInitialMetadata(context_);
     call_.PerformOps(&meta_buf_);
   }
 
   void Finish(R* msg, Status* status, void* tag) {
-    finish_buf_.Reset(tag);
+    finish_buf_.SetOutputTag(tag);
     if (!context_->initial_metadata_received_) {
       finish_buf_.AddRecvInitialMetadata(context_);
     }
-    finish_buf_.AddRecvMessage(msg);
-    finish_buf_.AddClientRecvStatus(context_, status);
+    finish_buf_.RecvMessage(msg);
+    finish_buf_.ClientRecvStatus(context_, status);
     call_.PerformOps(&finish_buf_);
   }
 
  private:
   ClientContext* context_;
   Call call_;
-  SneakyCallOpBuffer init_buf_;
-  CallOpBuffer meta_buf_;
-  CallOpBuffer finish_buf_;
+  SneakyCallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpClientSendClose> init_buf_;
+  CallOpSet<CallOpRecvInitialMetadata> meta_buf_;
+  CallOpSet<CallOpRecvMessage<R>, CallOpClientRecvStatus> finish_buf_;
 };
 
 template <class W>
@@ -104,34 +105,34 @@ class ServerAsyncResponseWriter GRPC_FINAL
   void SendInitialMetadata(void* tag) GRPC_OVERRIDE {
     GPR_ASSERT(!ctx_->sent_initial_metadata_);
 
-    meta_buf_.Reset(tag);
-    meta_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
+    meta_buf_.SetOutputTag(tag);
+    meta_buf_.SendInitialMetadata(ctx_->initial_metadata_);
     ctx_->sent_initial_metadata_ = true;
     call_.PerformOps(&meta_buf_);
   }
 
   void Finish(const W& msg, const Status& status, void* tag) {
-    finish_buf_.Reset(tag);
+    finish_buf_.SetOutputTag(tag);
     if (!ctx_->sent_initial_metadata_) {
-      finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
+      finish_buf_.SendInitialMetadata(ctx_->initial_metadata_);
       ctx_->sent_initial_metadata_ = true;
     }
     // The response is dropped if the status is not OK.
     if (status.IsOk()) {
-      finish_buf_.AddSendMessage(msg);
+      finish_buf_.SendMessage(msg);
     }
-    finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status);
+    finish_buf_.ServerSendStatus(ctx_->trailing_metadata_, status);
     call_.PerformOps(&finish_buf_);
   }
 
   void FinishWithError(const Status& status, void* tag) {
     GPR_ASSERT(!status.IsOk());
-    finish_buf_.Reset(tag);
+    finish_buf_.SetOutputTag(tag);
     if (!ctx_->sent_initial_metadata_) {
-      finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
+      finish_buf_.SendInitialMetadata(ctx_->initial_metadata_);
       ctx_->sent_initial_metadata_ = true;
     }
-    finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status);
+    finish_buf_.ServerSendStatus(ctx_->trailing_metadata_, status);
     call_.PerformOps(&finish_buf_);
   }
 
@@ -140,8 +141,8 @@ class ServerAsyncResponseWriter GRPC_FINAL
 
   Call call_;
   ServerContext* ctx_;
-  CallOpBuffer meta_buf_;
-  CallOpBuffer finish_buf_;
+  CallOpSet<CallOpSendInitialMetadata> meta_buf_;
+  CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpServerSendStatus> finish_buf_;
 };
 
 }  // namespace grpc
diff --git a/include/grpc++/completion_queue.h b/include/grpc++/completion_queue.h
index e8429c8f41..b45b5e2e71 100644
--- a/include/grpc++/completion_queue.h
+++ b/include/grpc++/completion_queue.h
@@ -35,7 +35,6 @@
 #define GRPCXX_COMPLETION_QUEUE_H
 
 #include <grpc/support/time.h>
-#include <grpc++/impl/client_unary_call.h>
 #include <grpc++/impl/grpc_library.h>
 #include <grpc++/time.h>
 
@@ -56,7 +55,10 @@ class ServerWriter;
 template <class R, class W>
 class ServerReaderWriter;
 
+class ChannelInterface;
+class ClientContext;
 class CompletionQueue;
+class RpcMethod;
 class Server;
 class ServerBuilder;
 class ServerContext;
@@ -120,11 +122,12 @@ class CompletionQueue : public GrpcLibrary {
   friend class ::grpc::ServerReaderWriter;
   friend class ::grpc::Server;
   friend class ::grpc::ServerContext;
+  template <class InputMessage, class OutputMessage>
   friend Status BlockingUnaryCall(ChannelInterface* channel,
                                   const RpcMethod& method,
                                   ClientContext* context,
-                                  const grpc::protobuf::Message& request,
-                                  grpc::protobuf::Message* result);
+                                  const InputMessage& request,
+                                  OutputMessage* result);
 
   NextStatus AsyncNextInternal(void** tag, bool* ok, gpr_timespec deadline);
 
diff --git a/include/grpc++/config.h b/include/grpc++/config.h
index 55b2a64482..af248b66e3 100644
--- a/include/grpc++/config.h
+++ b/include/grpc++/config.h
@@ -77,33 +77,6 @@
 #define GRPC_OVERRIDE override
 #endif
 
-#ifndef GRPC_CUSTOM_PROTOBUF_INT64
-#include <google/protobuf/stubs/common.h>
-#define GRPC_CUSTOM_PROTOBUF_INT64 ::google::protobuf::int64
-#endif
-
-#ifndef GRPC_CUSTOM_MESSAGE
-#include <google/protobuf/message.h>
-#define GRPC_CUSTOM_MESSAGE ::google::protobuf::Message
-#endif
-
-#ifndef GRPC_CUSTOM_STRING
-#include <string>
-#define GRPC_CUSTOM_STRING std::string
-#endif
-
-#ifndef GRPC_CUSTOM_ZEROCOPYOUTPUTSTREAM
-#include <google/protobuf/io/coded_stream.h>
-#include <google/protobuf/io/zero_copy_stream.h>
-#define GRPC_CUSTOM_ZEROCOPYOUTPUTSTREAM \
-  ::google::protobuf::io::ZeroCopyOutputStream
-#define GRPC_CUSTOM_ZEROCOPYINPUTSTREAM \
-  ::google::protobuf::io::ZeroCopyInputStream
-#define GRPC_CUSTOM_CODEDINPUTSTREAM \
-  ::google::protobuf::io::CodedInputStream
-#endif
-
-
 #ifdef GRPC_CXX0X_NO_NULLPTR
 #include <memory>
 const class {
@@ -121,23 +94,15 @@ private:
 } nullptr = {};
 #endif
 
+#ifndef GRPC_CUSTOM_STRING
+#include <string>
+#define GRPC_CUSTOM_STRING std::string
+#endif
+
 namespace grpc {
 
 typedef GRPC_CUSTOM_STRING string;
 
-namespace protobuf {
-
-typedef GRPC_CUSTOM_MESSAGE Message;
-typedef GRPC_CUSTOM_PROTOBUF_INT64 int64;
-
-namespace io {
-typedef GRPC_CUSTOM_ZEROCOPYOUTPUTSTREAM ZeroCopyOutputStream;
-typedef GRPC_CUSTOM_ZEROCOPYINPUTSTREAM ZeroCopyInputStream;
-typedef GRPC_CUSTOM_CODEDINPUTSTREAM CodedInputStream;
-}  // namespace io
-
-}  // namespace protobuf
-
 }  // namespace grpc
 
 #endif  // GRPCXX_CONFIG_H
diff --git a/include/grpc++/config_protobuf.h b/include/grpc++/config_protobuf.h
new file mode 100644
index 0000000000..5ef1be1aa9
--- /dev/null
+++ b/include/grpc++/config_protobuf.h
@@ -0,0 +1,75 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ *     * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ *     * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPCXX_CONFIG_PROTOBUF_H
+#define GRPCXX_CONFIG_PROTOBUF_H
+
+#include <grpc++/impl/serialization_traits.h>
+
+#ifndef GRPC_CUSTOM_PROTOBUF_INT64
+#include <google/protobuf/stubs/common.h>
+#define GRPC_CUSTOM_PROTOBUF_INT64 ::google::protobuf::int64
+#endif
+
+#ifndef GRPC_CUSTOM_MESSAGE
+#include <google/protobuf/message.h>
+#define GRPC_CUSTOM_MESSAGE ::google::protobuf::Message
+#endif
+
+#ifndef GRPC_CUSTOM_ZEROCOPYOUTPUTSTREAM
+#include <google/protobuf/io/coded_stream.h>
+#include <google/protobuf/io/zero_copy_stream.h>
+#define GRPC_CUSTOM_ZEROCOPYOUTPUTSTREAM \
+  ::google::protobuf::io::ZeroCopyOutputStream
+#define GRPC_CUSTOM_ZEROCOPYINPUTSTREAM \
+  ::google::protobuf::io::ZeroCopyInputStream
+#define GRPC_CUSTOM_CODEDINPUTSTREAM \
+  ::google::protobuf::io::CodedInputStream
+#endif
+
+namespace grpc {
+namespace protobuf {
+
+typedef GRPC_CUSTOM_MESSAGE Message;
+typedef GRPC_CUSTOM_PROTOBUF_INT64 int64;
+
+namespace io {
+typedef GRPC_CUSTOM_ZEROCOPYOUTPUTSTREAM ZeroCopyOutputStream;
+typedef GRPC_CUSTOM_ZEROCOPYINPUTSTREAM ZeroCopyInputStream;
+typedef GRPC_CUSTOM_CODEDINPUTSTREAM CodedInputStream;
+}  // namespace io
+
+}  // namespace protobuf
+}  // namespace grpc
+
+#endif  // GRPCXX_CONFIG_PROTOBUF_H
diff --git a/include/grpc++/impl/call.h b/include/grpc++/impl/call.h
index aae199db1b..f8b290a851 100644
--- a/include/grpc++/impl/call.h
+++ b/include/grpc++/impl/call.h
@@ -50,6 +50,128 @@ namespace grpc {
 class ByteBuffer;
 class Call;
 
+class CallNoOp {
+ protected:
+  void AddOp(grpc_op* ops, size_t* nops) {}
+  void FinishOp(void* tag, bool* status) {}
+};
+
+class CallOpSendInitialMetadata {
+ public:
+  void SendInitialMetadata(const std::multimap<grpc::string, grpc::string>& metadata);
+
+ protected:
+  void AddOp(grpc_op* ops, size_t* nops);
+  void FinishOp(void* tag, bool* status);
+};
+
+class CallOpSendMessage {
+ public:
+  template <class M>
+  void SendMessage(const M& message);
+
+ protected:
+  void AddOp(grpc_op* ops, size_t* nops);
+  void FinishOp(void* tag, bool* status);
+};
+
+template <class M>
+class CallOpRecvMessage {
+ protected:
+  void AddOp(grpc_op* ops, size_t* nops);
+  void FinishOp(void* tag, bool* status);
+};
+
+class CallOpGenericRecvMessage {
+ public:
+  template <class R>
+  void RecvMessage(R* message);
+
+ protected:
+  void AddOp(grpc_op* ops, size_t* nops);
+  void FinishOp(void* tag, bool* status);
+};
+
+class CallOpClientSendClose {
+ public:
+  void ClientSendClose();
+
+ protected:
+  void AddOp(grpc_op* ops, size_t* nops);
+  void FinishOp(void* tag, bool* status);
+};
+
+class CallOpServerSendStatus {
+ public:
+  void ServerSendStatus(const std::multimap<grpc::string, grpc::string>& trailing_metadata, const Status& status);
+
+ protected:
+  void AddOp(grpc_op* ops, size_t* nops);
+  void FinishOp(void* tag, bool* status);
+};
+
+class CallOpRecvInitialMetadata {
+ public:
+  void RecvInitialMetadata(ClientContext* context);
+
+ protected:
+  void AddOp(grpc_op* ops, size_t* nops);
+  void FinishOp(void* tag, bool* status);
+};
+
+class CallOpClientRecvStatus {
+ public:
+  void ClientRecvStatus(ClientContext* context, Status* status);
+
+ protected:
+  void AddOp(grpc_op* ops, size_t* nops);
+  void FinishOp(void* tag, bool* status);
+};
+
+class CallOpSetInterface : public CompletionQueueTag {
+ public:
+  virtual void FillOps(grpc_op* ops, size_t* nops) = 0;
+};
+
+template <class T, int I>
+class WrapAndDerive : public T {};
+
+template <class Op1 = CallNoOp, class Op2 = CallNoOp, class Op3 = CallNoOp, class Op4 = CallNoOp, class Op5 = CallNoOp, class Op6 = CallNoOp>
+class CallOpSet : public CallOpSetInterface, 
+public WrapAndDerive<Op1, 1>, 
+public WrapAndDerive<Op2, 2>, 
+public WrapAndDerive<Op3, 3>, 
+public WrapAndDerive<Op4, 4>, 
+public WrapAndDerive<Op5, 5>, 
+public WrapAndDerive<Op6, 6> {
+ public:
+  void FillOps(grpc_op* ops, size_t* nops) GRPC_OVERRIDE {
+    this->Op1::AddOp(ops, nops);
+    this->Op2::AddOp(ops, nops);
+    this->Op3::AddOp(ops, nops);
+    this->Op4::AddOp(ops, nops);
+    this->Op5::AddOp(ops, nops);
+    this->Op6::AddOp(ops, nops);
+  }
+
+  bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE {
+    this->Op1::FinishOp(*tag, status);
+    this->Op2::FinishOp(*tag, status);
+    this->Op3::FinishOp(*tag, status);
+    this->Op4::FinishOp(*tag, status);
+    this->Op5::FinishOp(*tag, status);
+    this->Op6::FinishOp(*tag, status);
+    *tag = return_tag_;
+    return true;
+  }
+
+  void SetOutputTag(void* return_tag) { return_tag_ = return_tag; }
+
+ private:
+  void *return_tag_;
+};
+
+#if 0
 class CallOpBuffer : public CompletionQueueTag {
  public:
   CallOpBuffer();
@@ -122,12 +244,14 @@ class CallOpBuffer : public CompletionQueueTag {
   int cancelled_buf_;
   bool* recv_closed_;
 };
+#endif
 
 // SneakyCallOpBuffer does not post completions to the completion queue
-class SneakyCallOpBuffer GRPC_FINAL : public CallOpBuffer {
+template <class Op1 = CallNoOp, class Op2 = CallNoOp, class Op3 = CallNoOp, class Op4 = CallNoOp, class Op5 = CallNoOp, class Op6 = CallNoOp>
+class SneakyCallOpSet GRPC_FINAL : public CallOpSet<Op1, Op2, Op3, Op4, Op5, Op6> {
  public:
   bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE {
-    return CallOpBuffer::FinalizeResult(tag, status) && false;
+    return CallOpSet<Op1, Op2, Op3, Op4, Op5, Op6>::FinalizeResult(tag, status) && false;
   }
 };
 
@@ -135,7 +259,7 @@ class SneakyCallOpBuffer GRPC_FINAL : public CallOpBuffer {
 class CallHook {
  public:
   virtual ~CallHook() {}
-  virtual void PerformOpsOnCall(CallOpBuffer* ops, Call* call) = 0;
+  virtual void PerformOpsOnCall(CallOpSetInterface* ops, Call* call) = 0;
 };
 
 // Straightforward wrapping of the C call object
@@ -146,7 +270,7 @@ class Call GRPC_FINAL {
   Call(grpc_call* call, CallHook* call_hook_, CompletionQueue* cq,
        int max_message_size);
 
-  void PerformOps(CallOpBuffer* buffer);
+  void PerformOps(CallOpSetInterface* ops);
 
   grpc_call* call() { return call_; }
   CompletionQueue* cq() { return cq_; }
diff --git a/include/grpc++/impl/client_unary_call.h b/include/grpc++/impl/client_unary_call.h
index 0e8aeed781..561c4721ef 100644
--- a/include/grpc++/impl/client_unary_call.h
+++ b/include/grpc++/impl/client_unary_call.h
@@ -36,6 +36,8 @@
 
 #include <grpc++/config.h>
 
+#include <grpc++/impl/call.h>
+
 namespace grpc {
 
 class ChannelInterface;
@@ -45,10 +47,31 @@ class RpcMethod;
 class Status;
 
 // Wrapper that performs a blocking unary call
+template <class InputMessage, class OutputMessage>
 Status BlockingUnaryCall(ChannelInterface* channel, const RpcMethod& method,
                          ClientContext* context,
-                         const grpc::protobuf::Message& request,
-                         grpc::protobuf::Message* result);
+                         const InputMessage& request,
+                         OutputMessage* result) {
+  CompletionQueue cq;
+  Call call(channel->CreateCall(method, context, &cq));
+  CallOpSet<
+  		CallOpSendInitialMetadata, 
+  		CallOpSendMessage, 
+  		CallOpRecvInitialMetadata, 
+  		CallOpRecvMessage<OutputMessage>, 
+  		CallOpClientSendClose, 
+  		CallOpClientRecvStatus> ops;
+  Status status;
+  ops.AddSendInitialMetadata(context);
+  ops.AddSendMessage(request);
+  ops.AddRecvInitialMetadata(context);
+  ops.AddRecvMessage(result);
+  ops.AddClientSendClose();
+  ops.AddClientRecvStatus(context, &status);
+  call.PerformOps(&ops);
+  GPR_ASSERT((cq.Pluck(&ops) && ops.got_message) || !status.IsOk());
+  return status;
+}
 
 }  // namespace grpc
 
diff --git a/src/cpp/client/client_unary_call.cc b/include/grpc++/impl/serialization_traits.h
similarity index 62%
rename from src/cpp/client/client_unary_call.cc
rename to include/grpc++/impl/serialization_traits.h
index 7e7ea78bcd..d21ad92475 100644
--- a/src/cpp/client/client_unary_call.cc
+++ b/include/grpc++/impl/serialization_traits.h
@@ -31,34 +31,19 @@
  *
  */
 
-#include <grpc++/impl/client_unary_call.h>
-#include <grpc++/impl/call.h>
-#include <grpc++/channel_interface.h>
-#include <grpc++/client_context.h>
-#include <grpc++/completion_queue.h>
-#include <grpc++/status.h>
-#include <grpc/support/log.h>
+#ifndef GRPCXX_IMPL_SERIALIZATION_TRAITS_H
+#define GRPCXX_IMPL_SERIALIZATION_TRAITS_H
+
+struct grpc_byte_buffer;
 
 namespace grpc {
 
-// Wrapper that performs a blocking unary call
-Status BlockingUnaryCall(ChannelInterface* channel, const RpcMethod& method,
-                         ClientContext* context,
-                         const grpc::protobuf::Message& request,
-                         grpc::protobuf::Message* result) {
-  CompletionQueue cq;
-  Call call(channel->CreateCall(method, context, &cq));
-  CallOpBuffer buf;
-  Status status;
-  buf.AddSendInitialMetadata(context);
-  buf.AddSendMessage(request);
-  buf.AddRecvInitialMetadata(context);
-  buf.AddRecvMessage(result);
-  buf.AddClientSendClose();
-  buf.AddClientRecvStatus(context, &status);
-  call.PerformOps(&buf);
-  GPR_ASSERT((cq.Pluck(&buf) && buf.got_message) || !status.IsOk());
-  return status;
-}
+template <class Message>
+class SerializationTraits;
+
+typedef bool (*SerializationTraitsReadFunction)(grpc_byte_buffer* src, void* dest);
+typedef bool (*SerializationTraitsWriteFunction)(const void* src, grpc_byte_buffer* dst);
 
 }  // namespace grpc
+
+#endif // GRPCXX_IMPL_SERIALIZATION_TRAITS_H
diff --git a/include/grpc++/impl/service_type.h b/include/grpc++/impl/service_type.h
index bc39bb82ac..af21d9b8cf 100644
--- a/include/grpc++/impl/service_type.h
+++ b/include/grpc++/impl/service_type.h
@@ -35,6 +35,8 @@
 #define GRPCXX_IMPL_SERVICE_TYPE_H
 
 #include <grpc++/config.h>
+#include <grpc++/impl/serialization_traits.h>
+#include <grpc++/server.h>
 
 namespace grpc {
 
@@ -65,20 +67,8 @@ class ServerAsyncStreamingInterface {
 
 class AsynchronousService {
  public:
-  // this is Server, but in disguise to avoid a link dependency
-  class DispatchImpl {
-   public:
-    virtual void RequestAsyncCall(void* registered_method,
-                                  ServerContext* context,
-                                  ::grpc::protobuf::Message* request,
-                                  ServerAsyncStreamingInterface* stream,
-                                  CompletionQueue* call_cq,
-                                  ServerCompletionQueue* notification_cq,
-                                  void* tag) = 0;
-  };
-
   AsynchronousService(const char** method_names, size_t method_count)
-      : dispatch_impl_(nullptr),
+      : server_(nullptr),
         method_names_(method_names),
         method_count_(method_count),
         request_args_(nullptr) {}
@@ -86,42 +76,44 @@ class AsynchronousService {
   ~AsynchronousService() { delete[] request_args_; }
 
  protected:
+  template <class Message>
   void RequestAsyncUnary(int index, ServerContext* context,
-                         grpc::protobuf::Message* request,
+                         Message* request,
                          ServerAsyncStreamingInterface* stream,
                          CompletionQueue* call_cq,
                          ServerCompletionQueue* notification_cq, void* tag) {
-    dispatch_impl_->RequestAsyncCall(request_args_[index], context, request,
-                                     stream, call_cq, notification_cq, tag);
+    server_->RequestAsyncCall(request_args_[index], context,
+                                     stream, call_cq, notification_cq, tag, request);
   }
   void RequestClientStreaming(int index, ServerContext* context,
                               ServerAsyncStreamingInterface* stream,
                               CompletionQueue* call_cq,
                               ServerCompletionQueue* notification_cq,
                               void* tag) {
-    dispatch_impl_->RequestAsyncCall(request_args_[index], context, nullptr,
+    server_->RequestAsyncCall(request_args_[index], context, 
                                      stream, call_cq, notification_cq, tag);
   }
+  template <class Message>
   void RequestServerStreaming(int index, ServerContext* context,
-                              grpc::protobuf::Message* request,
+                              Message* request,
                               ServerAsyncStreamingInterface* stream,
                               CompletionQueue* call_cq,
                               ServerCompletionQueue* notification_cq,
                               void* tag) {
-    dispatch_impl_->RequestAsyncCall(request_args_[index], context, request,
-                                     stream, call_cq, notification_cq, tag);
+    server_->RequestAsyncCall(request_args_[index], context, 
+                                     stream, call_cq, notification_cq, tag, request);
   }
   void RequestBidiStreaming(int index, ServerContext* context,
                             ServerAsyncStreamingInterface* stream,
                             CompletionQueue* call_cq,
                             ServerCompletionQueue* notification_cq, void* tag) {
-    dispatch_impl_->RequestAsyncCall(request_args_[index], context, nullptr,
+    server_->RequestAsyncCall(request_args_[index], context, 
                                      stream, call_cq, notification_cq, tag);
   }
 
  private:
   friend class Server;
-  DispatchImpl* dispatch_impl_;
+  Server* server_;
   const char** const method_names_;
   size_t method_count_;
   void** request_args_;
diff --git a/include/grpc++/server.h b/include/grpc++/server.h
index 2cfeb359fc..e0599ee768 100644
--- a/include/grpc++/server.h
+++ b/include/grpc++/server.h
@@ -53,13 +53,13 @@ class GenericServerContext;
 class AsyncGenericService;
 class RpcService;
 class RpcServiceMethod;
+class ServerAsyncStreamingInterface;
 class ServerCredentials;
 class ThreadPoolInterface;
 
 // Currently it only supports handling rpcs in a single thread.
 class Server GRPC_FINAL : public GrpcLibrary,
-                          private CallHook,
-                          private AsynchronousService::DispatchImpl {
+                          private CallHook {
  public:
   ~Server();
 
@@ -73,6 +73,7 @@ class Server GRPC_FINAL : public GrpcLibrary,
 
  private:
   friend class AsyncGenericService;
+  friend class AsynchronousService;
   friend class ServerBuilder;
 
   class SyncRequest;
@@ -96,15 +97,20 @@ class Server GRPC_FINAL : public GrpcLibrary,
   void RunRpc();
   void ScheduleCallback();
 
-  void PerformOpsOnCall(CallOpBuffer* ops, Call* call) GRPC_OVERRIDE;
+  void PerformOpsOnCall(CallOpSetInterface *ops, Call* call) GRPC_OVERRIDE;
 
-  // DispatchImpl
+  template <class Message>
   void RequestAsyncCall(void* registered_method, ServerContext* context,
-                        grpc::protobuf::Message* request,
                         ServerAsyncStreamingInterface* stream,
                         CompletionQueue* call_cq,
                         ServerCompletionQueue* notification_cq,
-                        void* tag) GRPC_OVERRIDE;
+                        void* tag, Message *message);
+
+  void RequestAsyncCall(void* registered_method, ServerContext* context,
+                        ServerAsyncStreamingInterface* stream,
+                        CompletionQueue* call_cq,
+                        ServerCompletionQueue* notification_cq,
+                        void* tag);
 
   void RequestAsyncGenericCall(GenericServerContext* context,
                                ServerAsyncStreamingInterface* stream,
diff --git a/include/grpc++/stream.h b/include/grpc++/stream.h
index c836f98c2a..32ba03f8d8 100644
--- a/include/grpc++/stream.h
+++ b/include/grpc++/stream.h
@@ -93,15 +93,16 @@ template <class R>
 class ClientReader GRPC_FINAL : public ClientReaderInterface<R> {
  public:
   // Blocking create a stream and write the first request out.
+  template <class W>
   ClientReader(ChannelInterface* channel, const RpcMethod& method,
-               ClientContext* context, const grpc::protobuf::Message& request)
+               ClientContext* context, const W& request)
       : context_(context), call_(channel->CreateCall(method, context, &cq_)) {
-    CallOpBuffer buf;
-    buf.AddSendInitialMetadata(&context->send_initial_metadata_);
-    buf.AddSendMessage(request);
-    buf.AddClientSendClose();
-    call_.PerformOps(&buf);
-    cq_.Pluck(&buf);
+    CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpClientSendClose> ops;
+    ops.SendInitialMetadata(context->send_initial_metadata_);
+    ops.SendMessage(request);
+    ops.ClientSendClose();
+    call_.PerformOps(&ops);
+    cq_.Pluck(&ops);
   }
 
   // Blocking wait for initial metadata from server. The received metadata
@@ -111,28 +112,28 @@ class ClientReader GRPC_FINAL : public ClientReaderInterface<R> {
   void WaitForInitialMetadata() {
     GPR_ASSERT(!context_->initial_metadata_received_);
 
-    CallOpBuffer buf;
-    buf.AddRecvInitialMetadata(context_);
-    call_.PerformOps(&buf);
-    cq_.Pluck(&buf);  // status ignored
+    CallOpSet<CallOpRecvInitialMetadata> ops;
+    ops.RecvInitialMetadata(context_);
+    call_.PerformOps(&ops);
+    cq_.Pluck(&ops);  // status ignored
   }
 
   bool Read(R* msg) GRPC_OVERRIDE {
-    CallOpBuffer buf;
+    CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> ops;
     if (!context_->initial_metadata_received_) {
-      buf.AddRecvInitialMetadata(context_);
+      ops.RecvInitialMetadata(context_);
     }
-    buf.AddRecvMessage(msg);
-    call_.PerformOps(&buf);
-    return cq_.Pluck(&buf) && buf.got_message;
+    ops.RecvMessage(msg);
+    call_.PerformOps(&ops);
+    return cq_.Pluck(&ops) && ops.got_message;
   }
 
   Status Finish() GRPC_OVERRIDE {
-    CallOpBuffer buf;
+    CallOpSet<CallOpClientRecvStatus> ops;
     Status status;
-    buf.AddClientRecvStatus(context_, &status);
-    call_.PerformOps(&buf);
-    GPR_ASSERT(cq_.Pluck(&buf));
+    ops.ClientRecvStatus(context_, &status);
+    call_.PerformOps(&ops);
+    GPR_ASSERT(cq_.Pluck(&ops));
     return status;
   }
 
@@ -150,48 +151,48 @@ class ClientWriterInterface : public ClientStreamingInterface,
 };
 
 template <class W>
-class ClientWriter GRPC_FINAL : public ClientWriterInterface<W> {
+class ClientWriter : public ClientWriterInterface<W> {
  public:
   // Blocking create a stream.
+  template <class R>
   ClientWriter(ChannelInterface* channel, const RpcMethod& method,
-               ClientContext* context, grpc::protobuf::Message* response)
+               ClientContext* context, R* response)
       : context_(context),
-        response_(response),
         call_(channel->CreateCall(method, context, &cq_)) {
-    CallOpBuffer buf;
-    buf.AddSendInitialMetadata(&context->send_initial_metadata_);
-    call_.PerformOps(&buf);
-    cq_.Pluck(&buf);
+    finish_ops_.RecvMessage(response);
+
+    CallOpSet<CallOpRecvMessage<R>> ops;
+    ops.AddSendInitialMetadata(&context->send_initial_metadata_);
+    call_.PerformOps(&ops);
+    cq_.Pluck(&ops);
   }
 
   bool Write(const W& msg) GRPC_OVERRIDE {
-    CallOpBuffer buf;
-    buf.AddSendMessage(msg);
-    call_.PerformOps(&buf);
-    return cq_.Pluck(&buf);
+    CallOpSet<CallOpSendMessage> ops;
+    ops.SendMessage(msg);
+    call_.PerformOps(&ops);
+    return cq_.Pluck(&ops);
   }
 
   bool WritesDone() GRPC_OVERRIDE {
-    CallOpBuffer buf;
-    buf.AddClientSendClose();
-    call_.PerformOps(&buf);
-    return cq_.Pluck(&buf);
+    CallOpSet<CallOpClientSendClose> ops;
+    ops.ClientSendClose();
+    call_.PerformOps(&ops);
+    return cq_.Pluck(&ops);
   }
 
   // Read the final response and wait for the final status.
   Status Finish() GRPC_OVERRIDE {
-    CallOpBuffer buf;
     Status status;
-    buf.AddRecvMessage(response_);
-    buf.AddClientRecvStatus(context_, &status);
-    call_.PerformOps(&buf);
-    GPR_ASSERT(cq_.Pluck(&buf));
+    finish_ops_.ClientRecvStatus(context_, &status);
+    call_.PerformOps(&finish_ops_);
+    GPR_ASSERT(cq_.Pluck(&finish_ops_));
     return status;
   }
 
  private:
   ClientContext* context_;
-  grpc::protobuf::Message* const response_;
+  CallOpSet<CallOpGenericRecvMessage, CallOpClientRecvStatus> finish_ops_;
   CompletionQueue cq_;
   Call call_;
 };
@@ -213,10 +214,10 @@ class ClientReaderWriter GRPC_FINAL : public ClientReaderWriterInterface<W, R> {
   ClientReaderWriter(ChannelInterface* channel, const RpcMethod& method,
                      ClientContext* context)
       : context_(context), call_(channel->CreateCall(method, context, &cq_)) {
-    CallOpBuffer buf;
-    buf.AddSendInitialMetadata(&context->send_initial_metadata_);
-    call_.PerformOps(&buf);
-    cq_.Pluck(&buf);
+    CallOpSet<CallOpSendInitialMetadata> ops;
+    ops.SendInitialMetadata(context->send_initial_metadata_);
+    call_.PerformOps(&ops);
+    cq_.Pluck(&ops);
   }
 
   // Blocking wait for initial metadata from server. The received metadata
@@ -226,42 +227,42 @@ class ClientReaderWriter GRPC_FINAL : public ClientReaderWriterInterface<W, R> {
   void WaitForInitialMetadata() {
     GPR_ASSERT(!context_->initial_metadata_received_);
 
-    CallOpBuffer buf;
-    buf.AddRecvInitialMetadata(context_);
-    call_.PerformOps(&buf);
-    cq_.Pluck(&buf);  // status ignored
+    CallOpSet<CallOpRecvInitialMetadata> ops;
+    ops.RecvInitialMetadata(context_);
+    call_.PerformOps(&ops);
+    cq_.Pluck(&ops);  // status ignored
   }
 
   bool Read(R* msg) GRPC_OVERRIDE {
-    CallOpBuffer buf;
+    CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> ops;
     if (!context_->initial_metadata_received_) {
-      buf.AddRecvInitialMetadata(context_);
+      ops.RecvInitialMetadata(context_);
     }
-    buf.AddRecvMessage(msg);
-    call_.PerformOps(&buf);
-    return cq_.Pluck(&buf) && buf.got_message;
+    ops.RecvMessage(msg);
+    call_.PerformOps(&ops);
+    return cq_.Pluck(&ops) && ops.got_message;
   }
 
   bool Write(const W& msg) GRPC_OVERRIDE {
-    CallOpBuffer buf;
-    buf.AddSendMessage(msg);
-    call_.PerformOps(&buf);
-    return cq_.Pluck(&buf);
+    CallOpSet<CallOpSendMessage> ops;
+    ops.SendMessage(msg);
+    call_.PerformOps(&ops);
+    return cq_.Pluck(&ops);
   }
 
   bool WritesDone() GRPC_OVERRIDE {
-    CallOpBuffer buf;
-    buf.AddClientSendClose();
-    call_.PerformOps(&buf);
-    return cq_.Pluck(&buf);
+    CallOpSet<CallOpClientSendClose> ops;
+    ops.ClientSendClose();
+    call_.PerformOps(&ops);
+    return cq_.Pluck(&ops);
   }
 
   Status Finish() GRPC_OVERRIDE {
-    CallOpBuffer buf;
+    CallOpSet<CallOpClientRecvStatus> ops;
     Status status;
-    buf.AddClientRecvStatus(context_, &status);
-    call_.PerformOps(&buf);
-    GPR_ASSERT(cq_.Pluck(&buf));
+    ops.ClientRecvStatus(context_, &status);
+    call_.PerformOps(&ops);
+    GPR_ASSERT(cq_.Pluck(&ops));
     return status;
   }
 
@@ -279,18 +280,18 @@ class ServerReader GRPC_FINAL : public ReaderInterface<R> {
   void SendInitialMetadata() {
     GPR_ASSERT(!ctx_->sent_initial_metadata_);
 
-    CallOpBuffer buf;
-    buf.AddSendInitialMetadata(&ctx_->initial_metadata_);
+    CallOpSet<CallOpSendInitialMetadata> ops;
+    ops.SendInitialMetadata(ctx_->initial_metadata_);
     ctx_->sent_initial_metadata_ = true;
-    call_->PerformOps(&buf);
-    call_->cq()->Pluck(&buf);
+    call_->PerformOps(&ops);
+    call_->cq()->Pluck(&ops);
   }
 
   bool Read(R* msg) GRPC_OVERRIDE {
-    CallOpBuffer buf;
-    buf.AddRecvMessage(msg);
-    call_->PerformOps(&buf);
-    return call_->cq()->Pluck(&buf) && buf.got_message;
+    CallOpSet<CallOpRecvMessage<R>> ops;
+    ops.RecvMessage(msg);
+    call_->PerformOps(&ops);
+    return call_->cq()->Pluck(&ops) && ops.got_message;
   }
 
  private:
@@ -306,22 +307,22 @@ class ServerWriter GRPC_FINAL : public WriterInterface<W> {
   void SendInitialMetadata() {
     GPR_ASSERT(!ctx_->sent_initial_metadata_);
 
-    CallOpBuffer buf;
-    buf.AddSendInitialMetadata(&ctx_->initial_metadata_);
+    CallOpSet<CallOpSendInitialMetadata> ops;
+    ops.SendInitialMetadata(ctx_->initial_metadata_);
     ctx_->sent_initial_metadata_ = true;
-    call_->PerformOps(&buf);
-    call_->cq()->Pluck(&buf);
+    call_->PerformOps(&ops);
+    call_->cq()->Pluck(&ops);
   }
 
   bool Write(const W& msg) GRPC_OVERRIDE {
-    CallOpBuffer buf;
+    CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> ops;
     if (!ctx_->sent_initial_metadata_) {
-      buf.AddSendInitialMetadata(&ctx_->initial_metadata_);
+      ops.SendInitialMetadata(ctx_->initial_metadata_);
       ctx_->sent_initial_metadata_ = true;
     }
-    buf.AddSendMessage(msg);
-    call_->PerformOps(&buf);
-    return call_->cq()->Pluck(&buf);
+    ops.SendMessage(msg);
+    call_->PerformOps(&ops);
+    return call_->cq()->Pluck(&ops);
   }
 
  private:
@@ -339,29 +340,29 @@ class ServerReaderWriter GRPC_FINAL : public WriterInterface<W>,
   void SendInitialMetadata() {
     GPR_ASSERT(!ctx_->sent_initial_metadata_);
 
-    CallOpBuffer buf;
-    buf.AddSendInitialMetadata(&ctx_->initial_metadata_);
+    CallOpSet<CallOpSendInitialMetadata> ops;
+    ops.SendInitialMetadata(ctx_->initial_metadata_);
     ctx_->sent_initial_metadata_ = true;
-    call_->PerformOps(&buf);
-    call_->cq()->Pluck(&buf);
+    call_->PerformOps(&ops);
+    call_->cq()->Pluck(&ops);
   }
 
   bool Read(R* msg) GRPC_OVERRIDE {
-    CallOpBuffer buf;
-    buf.AddRecvMessage(msg);
-    call_->PerformOps(&buf);
-    return call_->cq()->Pluck(&buf) && buf.got_message;
+    CallOpSet<CallOpRecvMessage<R>> ops;
+    ops.RecvMessage(msg);
+    call_->PerformOps(&ops);
+    return call_->cq()->Pluck(&ops) && ops.got_message;
   }
 
   bool Write(const W& msg) GRPC_OVERRIDE {
-    CallOpBuffer buf;
+    CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> ops;
     if (!ctx_->sent_initial_metadata_) {
-      buf.AddSendInitialMetadata(&ctx_->initial_metadata_);
+      ops.SendInitialMetadata(ctx_->initial_metadata_);
       ctx_->sent_initial_metadata_ = true;
     }
-    buf.AddSendMessage(msg);
-    call_->PerformOps(&buf);
-    return call_->cq()->Pluck(&buf);
+    ops.SendMessage(msg);
+    call_->PerformOps(&ops);
+    return call_->cq()->Pluck(&ops);
   }
 
  private:
@@ -407,50 +408,51 @@ template <class R>
 class ClientAsyncReader GRPC_FINAL : public ClientAsyncReaderInterface<R> {
  public:
   // Create a stream and write the first request out.
+  template <class W>
   ClientAsyncReader(ChannelInterface* channel, CompletionQueue* cq,
                     const RpcMethod& method, ClientContext* context,
-                    const grpc::protobuf::Message& request, void* tag)
+                    const W& request, void* tag)
       : context_(context), call_(channel->CreateCall(method, context, cq)) {
-    init_buf_.Reset(tag);
-    init_buf_.AddSendInitialMetadata(&context->send_initial_metadata_);
-    init_buf_.AddSendMessage(request);
-    init_buf_.AddClientSendClose();
-    call_.PerformOps(&init_buf_);
+    init_ops_.SetOutputTag(tag);
+    init_ops_.SendInitialMetadata(context->send_initial_metadata_);
+    init_ops_.SendMessage(request);
+    init_ops_.ClientSendClose();
+    call_.PerformOps(&init_ops_);
   }
 
   void ReadInitialMetadata(void* tag) GRPC_OVERRIDE {
     GPR_ASSERT(!context_->initial_metadata_received_);
 
-    meta_buf_.Reset(tag);
-    meta_buf_.AddRecvInitialMetadata(context_);
-    call_.PerformOps(&meta_buf_);
+    meta_ops_.SetOutputTag(tag);
+    meta_ops_.RecvInitialMetadata(context_);
+    call_.PerformOps(&meta_ops_);
   }
 
   void Read(R* msg, void* tag) GRPC_OVERRIDE {
-    read_buf_.Reset(tag);
+    read_ops_.SetOutputTag(tag);
     if (!context_->initial_metadata_received_) {
-      read_buf_.AddRecvInitialMetadata(context_);
+      read_ops_.RecvInitialMetadata(context_);
     }
-    read_buf_.AddRecvMessage(msg);
-    call_.PerformOps(&read_buf_);
+    read_ops_.RecvMessage(msg);
+    call_.PerformOps(&read_ops_);
   }
 
   void Finish(Status* status, void* tag) GRPC_OVERRIDE {
-    finish_buf_.Reset(tag);
+    finish_ops_.SetOutputTag(tag);
     if (!context_->initial_metadata_received_) {
-      finish_buf_.AddRecvInitialMetadata(context_);
+      finish_ops_.RecvInitialMetadata(context_);
     }
-    finish_buf_.AddClientRecvStatus(context_, status);
-    call_.PerformOps(&finish_buf_);
+    finish_ops_.ClientRecvStatus(context_, status);
+    call_.PerformOps(&finish_ops_);
   }
 
  private:
   ClientContext* context_;
   Call call_;
-  CallOpBuffer init_buf_;
-  CallOpBuffer meta_buf_;
-  CallOpBuffer read_buf_;
-  CallOpBuffer finish_buf_;
+  CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpClientSendClose> init_ops_;
+  CallOpSet<CallOpRecvInitialMetadata> meta_ops_;
+  CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> read_ops_;
+  CallOpSet<CallOpRecvInitialMetadata, CallOpClientRecvStatus> finish_ops_;
 };
 
 template <class W>
@@ -463,56 +465,56 @@ class ClientAsyncWriterInterface : public ClientAsyncStreamingInterface,
 template <class W>
 class ClientAsyncWriter GRPC_FINAL : public ClientAsyncWriterInterface<W> {
  public:
+  template <class R>
   ClientAsyncWriter(ChannelInterface* channel, CompletionQueue* cq,
                     const RpcMethod& method, ClientContext* context,
-                    grpc::protobuf::Message* response, void* tag)
+                    R* response, void* tag)
       : context_(context),
-        response_(response),
         call_(channel->CreateCall(method, context, cq)) {
-    init_buf_.Reset(tag);
-    init_buf_.AddSendInitialMetadata(&context->send_initial_metadata_);
-    call_.PerformOps(&init_buf_);
+    finish_ops_.RecvMessage(response);
+
+    init_ops_.SetOutputTag(tag);
+    init_ops_.SendInitialMetadata(context->send_initial_metadata_);
+    call_.PerformOps(&init_ops_);
   }
 
   void ReadInitialMetadata(void* tag) GRPC_OVERRIDE {
     GPR_ASSERT(!context_->initial_metadata_received_);
 
-    meta_buf_.Reset(tag);
-    meta_buf_.AddRecvInitialMetadata(context_);
-    call_.PerformOps(&meta_buf_);
+    meta_ops_.SetOutputTag(tag);
+    meta_ops_.RecvInitialMetadata(context_);
+    call_.PerformOps(&meta_ops_);
   }
 
   void Write(const W& msg, void* tag) GRPC_OVERRIDE {
-    write_buf_.Reset(tag);
-    write_buf_.AddSendMessage(msg);
-    call_.PerformOps(&write_buf_);
+    write_ops_.SetOutputTag(tag);
+    write_ops_.SendMessage(msg);
+    call_.PerformOps(&write_ops_);
   }
 
   void WritesDone(void* tag) GRPC_OVERRIDE {
-    writes_done_buf_.Reset(tag);
-    writes_done_buf_.AddClientSendClose();
-    call_.PerformOps(&writes_done_buf_);
+    writes_done_ops_.SetOutputTag(tag);
+    writes_done_ops_.ClientSendClose();
+    call_.PerformOps(&writes_done_ops_);
   }
 
   void Finish(Status* status, void* tag) GRPC_OVERRIDE {
-    finish_buf_.Reset(tag);
+    finish_ops_.SetOutputTag(tag);
     if (!context_->initial_metadata_received_) {
-      finish_buf_.AddRecvInitialMetadata(context_);
+      finish_ops_.RecvInitialMetadata(context_);
     }
-    finish_buf_.AddRecvMessage(response_);
-    finish_buf_.AddClientRecvStatus(context_, status);
-    call_.PerformOps(&finish_buf_);
+    finish_ops_.ClientRecvStatus(context_, status);
+    call_.PerformOps(&finish_ops_);
   }
 
  private:
   ClientContext* context_;
-  grpc::protobuf::Message* const response_;
   Call call_;
-  CallOpBuffer init_buf_;
-  CallOpBuffer meta_buf_;
-  CallOpBuffer write_buf_;
-  CallOpBuffer writes_done_buf_;
-  CallOpBuffer finish_buf_;
+  CallOpSet<CallOpSendInitialMetadata> init_ops_;
+  CallOpSet<CallOpRecvInitialMetadata> meta_ops_;
+  CallOpSet<CallOpSendMessage> write_ops_;
+  CallOpSet<CallOpClientSendClose> writes_done_ops_;
+  CallOpSet<CallOpRecvInitialMetadata, CallOpGenericRecvMessage, CallOpClientRecvStatus> finish_ops_;
 };
 
 // Client-side interface for bi-directional streaming.
@@ -532,58 +534,58 @@ class ClientAsyncReaderWriter GRPC_FINAL
                           const RpcMethod& method, ClientContext* context,
                           void* tag)
       : context_(context), call_(channel->CreateCall(method, context, cq)) {
-    init_buf_.Reset(tag);
-    init_buf_.AddSendInitialMetadata(&context->send_initial_metadata_);
-    call_.PerformOps(&init_buf_);
+    init_ops_.SetOutputTag(tag);
+    init_ops_.SendInitialMetadata(context->send_initial_metadata_);
+    call_.PerformOps(&init_ops_);
   }
 
   void ReadInitialMetadata(void* tag) GRPC_OVERRIDE {
     GPR_ASSERT(!context_->initial_metadata_received_);
 
-    meta_buf_.Reset(tag);
-    meta_buf_.AddRecvInitialMetadata(context_);
-    call_.PerformOps(&meta_buf_);
+    meta_ops_.SetOutputTag(tag);
+    meta_ops_.RecvInitialMetadata(context_);
+    call_.PerformOps(&meta_ops_);
   }
 
   void Read(R* msg, void* tag) GRPC_OVERRIDE {
-    read_buf_.Reset(tag);
+    read_ops_.SetOutputTag(tag);
     if (!context_->initial_metadata_received_) {
-      read_buf_.AddRecvInitialMetadata(context_);
+      read_ops_.RecvInitialMetadata(context_);
     }
-    read_buf_.AddRecvMessage(msg);
-    call_.PerformOps(&read_buf_);
+    read_ops_.AddRecvMessage(msg);
+    call_.PerformOps(&read_ops_);
   }
 
   void Write(const W& msg, void* tag) GRPC_OVERRIDE {
-    write_buf_.Reset(tag);
-    write_buf_.AddSendMessage(msg);
-    call_.PerformOps(&write_buf_);
+    write_ops_.SetOutputTag(tag);
+    write_ops_.SendMessage(msg);
+    call_.PerformOps(&write_ops_);
   }
 
   void WritesDone(void* tag) GRPC_OVERRIDE {
-    writes_done_buf_.Reset(tag);
-    writes_done_buf_.AddClientSendClose();
-    call_.PerformOps(&writes_done_buf_);
+    writes_done_ops_.SetOutputTag(tag);
+    writes_done_ops_.ClientSendClose();
+    call_.PerformOps(&writes_done_ops_);
   }
 
   void Finish(Status* status, void* tag) GRPC_OVERRIDE {
-    finish_buf_.Reset(tag);
+    finish_ops_.SetOutputTag(tag);
     if (!context_->initial_metadata_received_) {
-      finish_buf_.AddRecvInitialMetadata(context_);
+      finish_ops_.RecvInitialMetadata(context_);
     }
-    finish_buf_.AddClientRecvStatus(context_, status);
-    call_.PerformOps(&finish_buf_);
+    finish_ops_.ClientRecvStatus(context_, status);
+    call_.PerformOps(&finish_ops_);
   }
 
  private:
   ClientContext* context_;
   Call call_;
-  CallOpBuffer init_buf_;
-  CallOpBuffer meta_buf_;
-  CallOpBuffer read_buf_;
-  CallOpBuffer write_buf_;
-  CallOpBuffer writes_done_buf_;
-  CallOpBuffer finish_buf_;
+  CallOpSet<CallOpSendInitialMetadata> init_ops_;
+  CallOpSet<CallOpRecvInitialMetadata> meta_ops_;
+  CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> read_ops_;
+  CallOpSet<CallOpSendMessage> write_ops_;
+  CallOpSet<CallOpClientSendClose> writes_done_ops_;
+  CallOpSet<CallOpRecvInitialMetadata, CallOpClientRecvStatus> finish_ops_;
 };
 
 template <class W, class R>
@@ -596,41 +598,41 @@ class ServerAsyncReader GRPC_FINAL : public ServerAsyncStreamingInterface,
   void SendInitialMetadata(void* tag) GRPC_OVERRIDE {
     GPR_ASSERT(!ctx_->sent_initial_metadata_);
 
-    meta_buf_.Reset(tag);
-    meta_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
+    meta_ops_.SetOutputTag(tag);
+    meta_ops_.SendInitialMetadata(ctx_->initial_metadata_);
     ctx_->sent_initial_metadata_ = true;
-    call_.PerformOps(&meta_buf_);
+    call_.PerformOps(&meta_ops_);
   }
 
   void Read(R* msg, void* tag) GRPC_OVERRIDE {
-    read_buf_.Reset(tag);
-    read_buf_.AddRecvMessage(msg);
-    call_.PerformOps(&read_buf_);
+    read_ops_.SetOutputTag(tag);
+    read_ops_.RecvMessage(msg);
+    call_.PerformOps(&read_ops_);
   }
 
   void Finish(const W& msg, const Status& status, void* tag) {
-    finish_buf_.Reset(tag);
+    finish_ops_.SetOutputTag(tag);
     if (!ctx_->sent_initial_metadata_) {
-      finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
+      finish_ops_.SendInitialMetadata(ctx_->initial_metadata_);
       ctx_->sent_initial_metadata_ = true;
     }
     // The response is dropped if the status is not OK.
     if (status.IsOk()) {
-      finish_buf_.AddSendMessage(msg);
+      finish_ops_.SendMessage(msg);
     }
-    finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status);
-    call_.PerformOps(&finish_buf_);
+    finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
+    call_.PerformOps(&finish_ops_);
   }
 
   void FinishWithError(const Status& status, void* tag) {
     GPR_ASSERT(!status.IsOk());
-    finish_buf_.Reset(tag);
+    finish_ops_.SetOutputTag(tag);
     if (!ctx_->sent_initial_metadata_) {
-      finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
+      finish_ops_.SendInitialMetadata(ctx_->initial_metadata_);
       ctx_->sent_initial_metadata_ = true;
     }
-    finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status);
-    call_.PerformOps(&finish_buf_);
+    finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
+    call_.PerformOps(&finish_ops_);
   }
 
  private:
@@ -638,9 +640,9 @@ class ServerAsyncReader GRPC_FINAL : public ServerAsyncStreamingInterface,
 
   Call call_;
   ServerContext* ctx_;
-  CallOpBuffer meta_buf_;
-  CallOpBuffer read_buf_;
-  CallOpBuffer finish_buf_;
+  CallOpSet<CallOpSendInitialMetadata> meta_ops_;
+  CallOpSet<CallOpRecvMessage<R>> read_ops_;
+  CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpServerSendStatus> finish_ops_;
 };
 
 template <class W>
@@ -653,30 +655,30 @@ class ServerAsyncWriter GRPC_FINAL : public ServerAsyncStreamingInterface,
   void SendInitialMetadata(void* tag) GRPC_OVERRIDE {
     GPR_ASSERT(!ctx_->sent_initial_metadata_);
 
-    meta_buf_.Reset(tag);
-    meta_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
+    meta_ops_.SetOutputTag(tag);
+    meta_ops_.SendInitialMetadata(ctx_->initial_metadata_);
     ctx_->sent_initial_metadata_ = true;
-    call_.PerformOps(&meta_buf_);
+    call_.PerformOps(&meta_ops_);
   }
 
   void Write(const W& msg, void* tag) GRPC_OVERRIDE {
-    write_buf_.Reset(tag);
+    write_ops_.SetOutputTag(tag);
     if (!ctx_->sent_initial_metadata_) {
-      write_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
+      write_ops_.SendInitialMetadata(ctx_->initial_metadata_);
       ctx_->sent_initial_metadata_ = true;
     }
-    write_buf_.AddSendMessage(msg);
-    call_.PerformOps(&write_buf_);
+    write_ops_.SendMessage(msg);
+    call_.PerformOps(&write_ops_);
   }
 
   void Finish(const Status& status, void* tag) {
-    finish_buf_.Reset(tag);
+    finish_ops_.SetOutputTag(tag);
     if (!ctx_->sent_initial_metadata_) {
-      finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
+      finish_ops_.SendInitialMetadata(ctx_->initial_metadata_);
       ctx_->sent_initial_metadata_ = true;
     }
-    finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status);
-    call_.PerformOps(&finish_buf_);
+    finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
+    call_.PerformOps(&finish_ops_);
   }
 
  private:
@@ -684,9 +686,9 @@ class ServerAsyncWriter GRPC_FINAL : public ServerAsyncStreamingInterface,
 
   Call call_;
   ServerContext* ctx_;
-  CallOpBuffer meta_buf_;
-  CallOpBuffer write_buf_;
-  CallOpBuffer finish_buf_;
+  CallOpSet<CallOpSendInitialMetadata> meta_ops_;
+  CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> write_ops_;
+  CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> finish_ops_;
 };
 
 // Server-side interface for bi-directional streaming.
@@ -701,36 +703,36 @@ class ServerAsyncReaderWriter GRPC_FINAL : public ServerAsyncStreamingInterface,
   void SendInitialMetadata(void* tag) GRPC_OVERRIDE {
     GPR_ASSERT(!ctx_->sent_initial_metadata_);
 
-    meta_buf_.Reset(tag);
-    meta_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
+    meta_ops_.SetOutputTag(tag);
+    meta_ops_.SendInitialMetadata(ctx_->initial_metadata_);
     ctx_->sent_initial_metadata_ = true;
-    call_.PerformOps(&meta_buf_);
+    call_.PerformOps(&meta_ops_);
   }
 
   void Read(R* msg, void* tag) GRPC_OVERRIDE {
-    read_buf_.Reset(tag);
-    read_buf_.AddRecvMessage(msg);
-    call_.PerformOps(&read_buf_);
+    read_ops_.SetOutputTag(tag);
+    read_ops_.AddRecvMessage(msg);
+    call_.PerformOps(&read_ops_);
   }
 
   void Write(const W& msg, void* tag) GRPC_OVERRIDE {
-    write_buf_.Reset(tag);
+    write_ops_.SetOutputTag(tag);
     if (!ctx_->sent_initial_metadata_) {
-      write_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
+      write_ops_.SendInitialMetadata(ctx_->initial_metadata_);
       ctx_->sent_initial_metadata_ = true;
     }
-    write_buf_.AddSendMessage(msg);
-    call_.PerformOps(&write_buf_);
+    write_ops_.SendMessage(msg);
+    call_.PerformOps(&write_ops_);
   }
 
   void Finish(const Status& status, void* tag) {
-    finish_buf_.Reset(tag);
+    finish_ops_.SetOutputTag(tag);
     if (!ctx_->sent_initial_metadata_) {
-      finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
+      finish_ops_.SendInitialMetadata(ctx_->initial_metadata_);
       ctx_->sent_initial_metadata_ = true;
     }
-    finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status);
-    call_.PerformOps(&finish_buf_);
+    finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
+    call_.PerformOps(&finish_ops_);
   }
 
  private:
@@ -738,10 +740,10 @@ class ServerAsyncReaderWriter GRPC_FINAL : public ServerAsyncStreamingInterface,
 
   Call call_;
   ServerContext* ctx_;
-  CallOpBuffer meta_buf_;
-  CallOpBuffer read_buf_;
-  CallOpBuffer write_buf_;
-  CallOpBuffer finish_buf_;
+  CallOpSet<CallOpSendInitialMetadata> meta_ops_;
+  CallOpSet<CallOpRecvMessage<R>> read_ops_;
+  CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> write_ops_;
+  CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> finish_ops_;
 };
 
 }  // namespace grpc
diff --git a/src/compiler/config.h b/src/compiler/config.h
index e81de8d6c8..06ccd8530c 100644
--- a/src/compiler/config.h
+++ b/src/compiler/config.h
@@ -35,6 +35,7 @@
 #define SRC_COMPILER_CONFIG_H
 
 #include <grpc++/config.h>
+#include <grpc++/config_protobuf.h>
 
 #ifndef GRPC_CUSTOM_DESCRIPTOR
 #include <google/protobuf/descriptor.h>
-- 
GitLab