diff --git a/src/node/src/server.js b/src/node/src/server.js index d1fb627e6cb3f4be4d404d638fdb21229db423fc..ceaa9f5d1fcbfe840653f98ef1d9599aeff03555 100644 --- a/src/node/src/server.js +++ b/src/node/src/server.js @@ -100,28 +100,6 @@ function handleError(call, error) { call.startBatch(error_batch, function(){}); } -/** - * Wait for the client to close, then emit a cancelled event if the client - * cancelled. - * @access private - * @param {grpc.Call} call The call object to wait on - * @param {EventEmitter} emitter The event emitter to emit the cancelled event - * on - */ -function waitForCancel(call, emitter) { - var cancel_batch = {}; - cancel_batch[grpc.opType.RECV_CLOSE_ON_SERVER] = true; - call.startBatch(cancel_batch, function(err, result) { - if (err) { - emitter.emit('error', err); - } - if (result.cancelled) { - emitter.cancelled = true; - emitter.emit('cancelled'); - } - }); -} - /** * Send a response to a unary or client streaming call. * @access private @@ -258,6 +236,13 @@ function setUpReadable(stream, deserialize) { }); } +util.inherits(ServerUnaryCall, EventEmitter); + +function ServerUnaryCall(call) { + EventEmitter.call(this); + this.call = call; +} + util.inherits(ServerWritableStream, Writable); /** @@ -311,33 +296,6 @@ function _write(chunk, encoding, callback) { ServerWritableStream.prototype._write = _write; -/** - * Send the initial metadata for a writable stream. - * @param {Metadata} responseMetadata Metadata to send - */ -function sendMetadata(responseMetadata) { - /* jshint validthis: true */ - var self = this; - if (!this.call.metadataSent) { - this.call.metadataSent = true; - var batch = []; - batch[grpc.opType.SEND_INITIAL_METADATA] = - responseMetadata._getCoreRepresentation(); - this.call.startBatch(batch, function(err) { - if (err) { - self.emit('error', err); - return; - } - }); - } -} - -/** - * @inheritdoc - * @alias module:src/server~ServerWritableStream#sendMetadata - */ -ServerWritableStream.prototype.sendMetadata = sendMetadata; - util.inherits(ServerReadableStream, Readable); /** @@ -427,6 +385,31 @@ function ServerDuplexStream(call, serialize, deserialize) { ServerDuplexStream.prototype._read = _read; ServerDuplexStream.prototype._write = _write; + +/** + * Send the initial metadata for a writable stream. + * @param {Metadata} responseMetadata Metadata to send + */ +function sendMetadata(responseMetadata) { + /* jshint validthis: true */ + var self = this; + if (!this.call.metadataSent) { + this.call.metadataSent = true; + var batch = {}; + batch[grpc.opType.SEND_INITIAL_METADATA] = + responseMetadata._getCoreRepresentation(); + this.call.startBatch(batch, function(err) { + if (err) { + self.emit('error', err); + return; + } + }); + } +} + +ServerUnaryCall.prototype.sendMetadata = sendMetadata; +ServerWritableStream.prototype.sendMetadata = sendMetadata; +ServerReadableStream.prototype.sendMetadata = sendMetadata; ServerDuplexStream.prototype.sendMetadata = sendMetadata; /** @@ -438,10 +421,36 @@ function getPeer() { return this.call.getPeer(); } +ServerUnaryCall.prototype.getPeer = getPeer; ServerReadableStream.prototype.getPeer = getPeer; ServerWritableStream.prototype.getPeer = getPeer; ServerDuplexStream.prototype.getPeer = getPeer; +/** + * Wait for the client to close, then emit a cancelled event if the client + * cancelled. + */ +function waitForCancel() { + /* jshint validthis: true */ + var self = this; + var cancel_batch = {}; + cancel_batch[grpc.opType.RECV_CLOSE_ON_SERVER] = true; + self.call.startBatch(cancel_batch, function(err, result) { + if (err) { + self.emit('error', err); + } + if (result.cancelled) { + self.cancelled = true; + self.emit('cancelled'); + } + }); +} + +ServerUnaryCall.prototype.waitForCancel = waitForCancel; +ServerReadableStream.prototype.waitForCancel = waitForCancel; +ServerWritableStream.prototype.waitForCancel = waitForCancel; +ServerDuplexStream.prototype.waitForCancel = waitForCancel; + /** * Fully handle a unary call * @access private @@ -450,25 +459,12 @@ ServerDuplexStream.prototype.getPeer = getPeer; * @param {Metadata} metadata Metadata from the client */ function handleUnary(call, handler, metadata) { - var emitter = new EventEmitter(); - emitter.sendMetadata = function(responseMetadata) { - if (!call.metadataSent) { - call.metadataSent = true; - var batch = {}; - batch[grpc.opType.SEND_INITIAL_METADATA] = - responseMetadata._getCoreRepresentation(); - call.startBatch(batch, function() {}); - } - }; - emitter.getPeer = function() { - return call.getPeer(); - }; + var emitter = new ServerUnaryCall(call); emitter.on('error', function(error) { handleError(call, error); }); emitter.metadata = metadata; - waitForCancel(call, emitter); - emitter.call = call; + emitter.waitForCancel(); var batch = {}; batch[grpc.opType.RECV_MESSAGE] = true; call.startBatch(batch, function(err, result) { @@ -508,7 +504,7 @@ function handleUnary(call, handler, metadata) { */ function handleServerStreaming(call, handler, metadata) { var stream = new ServerWritableStream(call, handler.serialize); - waitForCancel(call, stream); + stream.waitForCancel(); stream.metadata = metadata; var batch = {}; batch[grpc.opType.RECV_MESSAGE] = true; @@ -537,19 +533,10 @@ function handleServerStreaming(call, handler, metadata) { */ function handleClientStreaming(call, handler, metadata) { var stream = new ServerReadableStream(call, handler.deserialize); - stream.sendMetadata = function(responseMetadata) { - if (!call.metadataSent) { - call.metadataSent = true; - var batch = {}; - batch[grpc.opType.SEND_INITIAL_METADATA] = - responseMetadata._getCoreRepresentation(); - call.startBatch(batch, function() {}); - } - }; stream.on('error', function(error) { handleError(call, error); }); - waitForCancel(call, stream); + stream.waitForCancel(); stream.metadata = metadata; handler.func(stream, function(err, value, trailer, flags) { stream.terminate(); @@ -574,7 +561,7 @@ function handleClientStreaming(call, handler, metadata) { function handleBidiStreaming(call, handler, metadata) { var stream = new ServerDuplexStream(call, handler.serialize, handler.deserialize); - waitForCancel(call, stream); + stream.waitForCancel(); stream.metadata = metadata; handler.func(stream); } diff --git a/src/node/test/surface_test.js b/src/node/test/surface_test.js index 39673e4e05beb89543efcbd13097c970488ec48a..523fda6849df5e3658ad612bf9440b4c1a51ac53 100644 --- a/src/node/test/surface_test.js +++ b/src/node/test/surface_test.js @@ -312,6 +312,54 @@ describe('Generic client and server', function() { }); }); }); +describe('Server-side getPeer', function() { + function toString(val) { + return val.toString(); + } + function toBuffer(str) { + return new Buffer(str); + } + var string_service_attrs = { + 'getPeer' : { + path: '/string/getPeer', + requestStream: false, + responseStream: false, + requestSerialize: toBuffer, + requestDeserialize: toString, + responseSerialize: toBuffer, + responseDeserialize: toString + } + }; + var client; + var server; + before(function() { + server = new grpc.Server(); + server.addService(string_service_attrs, { + getPeer: function(call, callback) { + try { + callback(null, call.getPeer()); + } catch (e) { + call.emit('error', e); + } + } + }); + var port = server.bind('localhost:0', server_insecure_creds); + server.start(); + var Client = grpc.makeGenericClientConstructor(string_service_attrs); + client = new Client('localhost:' + port, + grpc.credentials.createInsecure()); + }); + after(function() { + server.forceShutdown(); + }); + it('should respond with a string representing the client', function(done) { + client.getPeer('', function(err, response) { + assert.ifError(err); + // We don't expect a specific value, just that it worked without error + done(); + }); + }); +}); describe('Echo metadata', function() { var client; var server;