From 380b90a795cd161efd79334dfee454c4694977d0 Mon Sep 17 00:00:00 2001
From: murgatroid99 <mlumish@google.com>
Date: Thu, 16 Jul 2015 13:16:14 -0700
Subject: [PATCH] Removed server-wide metadata handler, individual handlers can
 now send metadata

---
 src/node/interop/interop_server.js |  2 +-
 src/node/src/server.js             | 72 +++++++++++++++++++++++-------
 2 files changed, 56 insertions(+), 18 deletions(-)

diff --git a/src/node/interop/interop_server.js b/src/node/interop/interop_server.js
index e979af86ed..505c6bb537 100644
--- a/src/node/interop/interop_server.js
+++ b/src/node/interop/interop_server.js
@@ -172,7 +172,7 @@ function getServer(port, tls) {
                                                     key_data,
                                                     pem_data);
   }
-  var server = new grpc.Server(null, options);
+  var server = new grpc.Server(options);
   server.addProtoService(testProto.TestService.service, {
     emptyCall: handleEmpty,
     unaryCall: handleUnary,
diff --git a/src/node/src/server.js b/src/node/src/server.js
index 0e40ac7378..0a3a0031bd 100644
--- a/src/node/src/server.js
+++ b/src/node/src/server.js
@@ -72,6 +72,9 @@ function handleError(call, error) {
     status.metadata = error.metadata;
   }
   var error_batch = {};
+  if (!call.metadataSent) {
+    error_batch[grpc.opType.SEND_INITIAL_METADATA] = {};
+  }
   error_batch[grpc.opType.SEND_STATUS_FROM_SERVER] = status;
   call.startBatch(error_batch, function(){});
 }
@@ -115,6 +118,10 @@ function sendUnaryResponse(call, value, serialize, metadata) {
   if (metadata) {
     status.metadata = metadata;
   }
+  if (!call.metadataSent) {
+    end_batch[grpc.opType.SEND_INITIAL_METADATA] = {};
+    call.metadataSent = true;
+  }
   end_batch[grpc.opType.SEND_MESSAGE] = serialize(value);
   end_batch[grpc.opType.SEND_STATUS_FROM_SERVER] = status;
   call.startBatch(end_batch, function (){});
@@ -136,6 +143,10 @@ function setUpWritable(stream, serialize) {
   stream.serialize = common.wrapIgnoreNull(serialize);
   function sendStatus() {
     var batch = {};
+    if (!stream.call.metadataSent) {
+      stream.call.metadataSent = true;
+      batch[grpc.opType.SEND_INITIAL_METADATA] = {};
+    }
     batch[grpc.opType.SEND_STATUS_FROM_SERVER] = stream.status;
     stream.call.startBatch(batch, function(){});
   }
@@ -239,6 +250,10 @@ function ServerWritableStream(call, serialize) {
 function _write(chunk, encoding, callback) {
   /* jshint validthis: true */
   var batch = {};
+  if (!this.call.metadataSent) {
+    batch[grpc.opType.SEND_INITIAL_METADATA] = {};
+    this.call.metadataSent = true;
+  }
   batch[grpc.opType.SEND_MESSAGE] = this.serialize(chunk);
   this.call.startBatch(batch, function(err, value) {
     if (err) {
@@ -251,6 +266,23 @@ function _write(chunk, encoding, callback) {
 
 ServerWritableStream.prototype._write = _write;
 
+function sendMetadata(responseMetadata) {
+  /* jshint validthis: true */
+  if (!this.call.metadataSent) {
+    this.call.metadataSent = true;
+    var batch = [];
+    batch[grpc.opType.SEND_INITIAL_METADATA] = responseMetadata;
+    this.call.startBatch(batch, function(err) {
+      if (err) {
+        this.emit('error', err);
+        return;
+      }
+    });
+  }
+}
+
+ServerWritableStream.prototype.sendMetadata = sendMetadata;
+
 util.inherits(ServerReadableStream, Readable);
 
 /**
@@ -339,6 +371,7 @@ function ServerDuplexStream(call, serialize, deserialize) {
 
 ServerDuplexStream.prototype._read = _read;
 ServerDuplexStream.prototype._write = _write;
+ServerDuplexStream.prototype.sendMetadata = sendMetadata;
 
 /**
  * Fully handle a unary call
@@ -348,12 +381,20 @@ ServerDuplexStream.prototype._write = _write;
  */
 function handleUnary(call, handler, metadata) {
   var emitter = new EventEmitter();
+  emitter.sendMetadata = function(responseMetadata) {
+    if (!call.metadataSent) {
+      call.metadataSent = true;
+      var batch = {};
+      batch[grpc.opType.SEND_INITIAL_METADATA] = responseMetadata;
+      call.startBatch(batch, function() {});
+    }
+  };
   emitter.on('error', function(error) {
     handleError(call, error);
   });
+  emitter.metadata = metadata;
   waitForCancel(call, emitter);
   var batch = {};
-  batch[grpc.opType.SEND_INITIAL_METADATA] = metadata;
   batch[grpc.opType.RECV_MESSAGE] = true;
   call.startBatch(batch, function(err, result) {
     if (err) {
@@ -392,8 +433,8 @@ function handleUnary(call, handler, metadata) {
 function handleServerStreaming(call, handler, metadata) {
   var stream = new ServerWritableStream(call, handler.serialize);
   waitForCancel(call, stream);
+  stream.metadata = metadata;
   var batch = {};
-  batch[grpc.opType.SEND_INITIAL_METADATA] = metadata;
   batch[grpc.opType.RECV_MESSAGE] = true;
   call.startBatch(batch, function(err, result) {
     if (err) {
@@ -419,13 +460,19 @@ function handleServerStreaming(call, handler, metadata) {
  */
 function handleClientStreaming(call, handler, metadata) {
   var stream = new ServerReadableStream(call, handler.deserialize);
+  stream.sendMetadata = function(responseMetadata) {
+    if (!call.metadataSent) {
+      call.metadataSent = true;
+      var batch = {};
+      batch[grpc.opType.SEND_INITIAL_METADATA] = responseMetadata;
+      call.startBatch(batch, function() {});
+    }
+  };
   stream.on('error', function(error) {
     handleError(call, error);
   });
   waitForCancel(call, stream);
-  var metadata_batch = {};
-  metadata_batch[grpc.opType.SEND_INITIAL_METADATA] = metadata;
-  call.startBatch(metadata_batch, function() {});
+  stream.metadata = metadata;
   handler.func(stream, function(err, value, trailer) {
     stream.terminate();
     if (err) {
@@ -449,9 +496,7 @@ function handleBidiStreaming(call, handler, metadata) {
   var stream = new ServerDuplexStream(call, handler.serialize,
                                       handler.deserialize);
   waitForCancel(call, stream);
-  var metadata_batch = {};
-  metadata_batch[grpc.opType.SEND_INITIAL_METADATA] = metadata;
-  call.startBatch(metadata_batch, function() {});
+  stream.metadata = metadata;
   handler.func(stream);
 }
 
@@ -466,13 +511,10 @@ var streamHandlers = {
  * Constructs a server object that stores request handlers and delegates
  * incoming requests to those handlers
  * @constructor
- * @param {function(string, Object<string, Array<Buffer>>):
-           Object<string, Array<Buffer|string>>=} getMetadata Callback that gets
- *     metatada for a given method
  * @param {Object=} options Options that should be passed to the internal server
  *     implementation
  */
-function Server(getMetadata, options) {
+function Server(options) {
   this.handlers = {};
   var handlers = this.handlers;
   var server = new grpc.Server(options);
@@ -525,11 +567,7 @@ function Server(getMetadata, options) {
         call.startBatch(batch, function() {});
         return;
       }
-      var response_metadata = {};
-      if (getMetadata) {
-        response_metadata = getMetadata(method, metadata);
-      }
-      streamHandlers[handler.type](call, handler, response_metadata);
+      streamHandlers[handler.type](call, handler, metadata);
     }
     server.requestCall(handleNewCall);
   };
-- 
GitLab