From 83815eab40568e142f05376dae48c2cef41bfefd Mon Sep 17 00:00:00 2001
From: murgatroid99 <mlumish@google.com>
Date: Thu, 8 Oct 2015 09:31:45 -0700
Subject: [PATCH] Added interval_us delay in Node interop server

---
 src/node/interop/interop_server.js | 47 ++++++++++++++++++++++++++----
 1 file changed, 41 insertions(+), 6 deletions(-)

diff --git a/src/node/interop/interop_server.js b/src/node/interop/interop_server.js
index 5321005c86..cc527364b4 100644
--- a/src/node/interop/interop_server.js
+++ b/src/node/interop/interop_server.js
@@ -35,6 +35,7 @@
 
 var fs = require('fs');
 var path = require('path');
+var async = require('async');
 var _ = require('lodash');
 var grpc = require('..');
 var testProto = grpc.load({
@@ -86,6 +87,22 @@ function getEchoTrailer(call) {
   return response_trailer;
 }
 
+/**
+ * @typedef Payload
+ * @type {object}
+ * @property {string} payload_type The payload type
+ * @property {Buffer} body The payload body
+ */
+
+/**
+ * Get a payload of the specified type and size. If the requested payload is
+ * COMPRESSABLE, it returns a zero buffer. If the type is UNCOMRESSABLE, it
+ * returns a slice of pre-loaded uncompressable data. If the type is RANDOM,
+ * it returns one of the other choices, chosen at random.
+ * @param {string} payload_type The type of payload to return
+ * @param {Number} size The size of the payload body
+ * @return {Payload} The requested payload
+ */
 function getPayload(payload_type, size) {
   if (payload_type === 'RANDOM') {
     payload_type = ['COMPRESSABLE',
@@ -99,6 +116,15 @@ function getPayload(payload_type, size) {
   return {type: payload_type, body: body};
 }
 
+function respondWithStream(call, request, callback) {
+  async.eachSeries(request.response_parameters, function(resp_param, callback) {
+    setTimeout(function() {
+      call.write({payload: getPayload(request.response_type, resp_param.size)});
+      callback();
+    }, resp_param.interval_us/1000);
+  }, callback);
+}
+
 /**
  * Respond to an empty parameter with an empty response.
  * NOTE: this currently does not work due to issue #137
@@ -162,10 +188,13 @@ function handleStreamingOutput(call) {
     call.emit('error', status);
     return;
   }
-  _.each(req.response_parameters, function(resp_param) {
-    call.write({payload: getPayload(req.response_type, resp_param.size)});
+  respondWithStream(call, req, function(err) {
+    if (err) {
+      call.emit(err);
+    } else {
+      call.end(getEchoTrailer(call));
+    }
   });
-  call.end(getEchoTrailer(call));
 }
 
 /**
@@ -175,6 +204,7 @@ function handleStreamingOutput(call) {
  */
 function handleFullDuplex(call) {
   echoHeader(call);
+  var call_ended;
   call.on('data', function(value) {
     if (value.response_status) {
       var status = value.response_status;
@@ -182,12 +212,17 @@ function handleFullDuplex(call) {
       call.emit('error', status);
       return;
     }
-    _.each(value.response_parameters, function(resp_param) {
-      call.write({payload: getPayload(value.response_type, resp_param.size)});
+    call.pause();
+    respondWithStream(call, value, function(err) {
+      call.resume();
+      if (call_ended) {
+        call.end(getEchoTrailer(call));
+      }
     });
   });
   call.on('end', function() {
-    call.end(getEchoTrailer(call));
+    call_ended = true;
+
   });
 }
 
-- 
GitLab