Skip to content
Snippets Groups Projects
Commit 49724152 authored by Tim Emiola's avatar Tim Emiola
Browse files

Merge pull request #221 from murgatroid99/node_surface_cancellation

Node surface cancellation
parents 7d8dcd71 12958f9f
No related branches found
No related tags found
No related merge requests found
...@@ -160,6 +160,14 @@ GrpcClientStream.prototype._write = function(chunk, encoding, callback) { ...@@ -160,6 +160,14 @@ GrpcClientStream.prototype._write = function(chunk, encoding, callback) {
}, 0); }, 0);
}; };
/**
* Cancel the ongoing call. If the call has not already finished, it will finish
* with status CANCELLED.
*/
GrpcClientStream.prototype.cancel = function() {
this._call.cancel();
};
/** /**
* Make a request on the channel to the given method with the given arguments * Make a request on the channel to the given method with the given arguments
* @param {grpc.Channel} channel The channel on which to make the request * @param {grpc.Channel} channel The channel on which to make the request
......
...@@ -246,6 +246,7 @@ function Server(options) { ...@@ -246,6 +246,7 @@ function Server(options) {
call.serverAccept(function(event) { call.serverAccept(function(event) {
if (event.data.code === grpc.status.CANCELLED) { if (event.data.code === grpc.status.CANCELLED) {
cancelled = true; cancelled = true;
stream.emit('cancelled');
} }
}, 0); }, 0);
call.serverEndInitialMetadata(0); call.serverEndInitialMetadata(0);
......
...@@ -128,6 +128,16 @@ function _write(chunk, encoding, callback) { ...@@ -128,6 +128,16 @@ function _write(chunk, encoding, callback) {
*/ */
ClientWritableObjectStream.prototype._write = _write; ClientWritableObjectStream.prototype._write = _write;
/**
* Cancel the underlying call
*/
function cancel() {
this._stream.cancel();
}
ClientReadableObjectStream.prototype.cancel = cancel;
ClientWritableObjectStream.prototype.cancel = cancel;
/** /**
* Get a function that can make unary requests to the specified method. * Get a function that can make unary requests to the specified method.
* @param {string} method The name of the method to request * @param {string} method The name of the method to request
...@@ -155,6 +165,9 @@ function makeUnaryRequestFunction(method, serialize, deserialize) { ...@@ -155,6 +165,9 @@ function makeUnaryRequestFunction(method, serialize, deserialize) {
var stream = client.makeRequest(this.channel, method, serialize, var stream = client.makeRequest(this.channel, method, serialize,
deserialize, metadata, deadline); deserialize, metadata, deadline);
var emitter = new EventEmitter(); var emitter = new EventEmitter();
emitter.cancel = function cancel() {
stream.cancel();
};
forwardEvent(stream, emitter, 'status'); forwardEvent(stream, emitter, 'status');
forwardEvent(stream, emitter, 'metadata'); forwardEvent(stream, emitter, 'metadata');
stream.write(argument); stream.write(argument);
...@@ -166,6 +179,11 @@ function makeUnaryRequestFunction(method, serialize, deserialize) { ...@@ -166,6 +179,11 @@ function makeUnaryRequestFunction(method, serialize, deserialize) {
callback(e); callback(e);
} }
}); });
stream.on('status', function forwardStatus(status) {
if (status.code !== client.status.OK) {
callback(status);
}
});
return emitter; return emitter;
} }
return makeUnaryRequest; return makeUnaryRequest;
...@@ -203,6 +221,11 @@ function makeClientStreamRequestFunction(method, serialize, deserialize) { ...@@ -203,6 +221,11 @@ function makeClientStreamRequestFunction(method, serialize, deserialize) {
callback(e); callback(e);
} }
}); });
stream.on('status', function forwardStatus(status) {
if (status.code !== client.status.OK) {
callback(status);
}
});
return obj_stream; return obj_stream;
} }
return makeClientStreamRequest; return makeClientStreamRequest;
......
...@@ -63,6 +63,9 @@ function ServerReadableObjectStream(stream) { ...@@ -63,6 +63,9 @@ function ServerReadableObjectStream(stream) {
get: function() { return stream.cancelled; } get: function() { return stream.cancelled; }
}); });
var self = this; var self = this;
this._stream.on('cancelled', function() {
self.emit('cancelled');
});
this._stream.on('data', function forwardData(chunk) { this._stream.on('data', function forwardData(chunk) {
if (!self.push(chunk)) { if (!self.push(chunk)) {
self._stream.pause(); self._stream.pause();
...@@ -100,6 +103,9 @@ function ServerWritableObjectStream(stream) { ...@@ -100,6 +103,9 @@ function ServerWritableObjectStream(stream) {
var options = {objectMode: true}; var options = {objectMode: true};
Writable.call(this, options); Writable.call(this, options);
this._stream = stream; this._stream = stream;
this._stream.on('cancelled', function() {
self.emit('cancelled');
});
this.on('finish', function() { this.on('finish', function() {
this._stream.end(); this._stream.end();
}); });
...@@ -138,6 +144,9 @@ function makeUnaryHandler(handler) { ...@@ -138,6 +144,9 @@ function makeUnaryHandler(handler) {
Object.defineProperty(call, 'cancelled', { Object.defineProperty(call, 'cancelled', {
get: function() { return stream.cancelled;} get: function() { return stream.cancelled;}
}); });
stream.on('cancelled', function() {
call.emit('cancelled');
});
handler(call, function sendUnaryData(err, value) { handler(call, function sendUnaryData(err, value) {
if (err) { if (err) {
stream.emit('error', err); stream.emit('error', err);
......
...@@ -77,6 +77,14 @@ function errorHandler(stream) { ...@@ -77,6 +77,14 @@ function errorHandler(stream) {
}; };
} }
/**
* Wait for a cancellation instead of responding
* @param {Stream} stream
*/
function cancelHandler(stream) {
// do nothing
}
describe('echo client', function() { describe('echo client', function() {
it('should receive echo responses', function(done) { it('should receive echo responses', function(done) {
var server = new Server(); var server = new Server();
...@@ -125,6 +133,26 @@ describe('echo client', function() { ...@@ -125,6 +133,26 @@ describe('echo client', function() {
done(); done();
}); });
}); });
it('should be able to cancel a call', function(done) {
var server = new Server();
var port_num = server.bind('0.0.0.0:0');
server.register('cancellation', cancelHandler);
server.start();
var channel = new grpc.Channel('localhost:' + port_num);
var stream = client.makeRequest(
channel,
'cancellation',
null,
getDeadline(1));
stream.cancel();
stream.on('status', function(status) {
assert.equal(status.code, grpc.status.CANCELLED);
server.shutdown();
done();
});
});
}); });
/* TODO(mlumish): explore options for reducing duplication between this test /* TODO(mlumish): explore options for reducing duplication between this test
* and the insecure echo client test */ * and the insecure echo client test */
......
...@@ -35,6 +35,8 @@ var assert = require('assert'); ...@@ -35,6 +35,8 @@ var assert = require('assert');
var surface_server = require('../surface_server.js'); var surface_server = require('../surface_server.js');
var surface_client = require('../surface_client.js');
var ProtoBuf = require('protobufjs'); var ProtoBuf = require('protobufjs');
var grpc = require('..'); var grpc = require('..');
...@@ -73,3 +75,54 @@ describe('Surface server constructor', function() { ...@@ -73,3 +75,54 @@ describe('Surface server constructor', function() {
}, /math.Math/); }, /math.Math/);
}); });
}); });
describe('Surface client', function() {
var client;
var server;
before(function() {
var Server = grpc.buildServer([mathService]);
server = new Server({
'math.Math': {
'div': function(stream) {},
'divMany': function(stream) {},
'fib': function(stream) {},
'sum': function(stream) {}
}
});
var port = server.bind('localhost:0');
var Client = surface_client.makeClientConstructor(mathService);
client = new Client('localhost:' + port);
});
after(function() {
server.shutdown();
});
it('Should correctly cancel a unary call', function(done) {
var call = client.div({'divisor': 0, 'dividend': 0}, function(err, resp) {
assert.strictEqual(err.code, surface_client.status.CANCELLED);
done();
});
call.cancel();
});
it('Should correctly cancel a client stream call', function(done) {
var call = client.sum(function(err, resp) {
assert.strictEqual(err.code, surface_client.status.CANCELLED);
done();
});
call.cancel();
});
it('Should correctly cancel a server stream call', function(done) {
var call = client.fib({'limit': 5});
call.on('status', function(status) {
assert.strictEqual(status.code, surface_client.status.CANCELLED);
done();
});
call.cancel();
});
it('Should correctly cancel a bidi stream call', function(done) {
var call = client.divMany();
call.on('status', function(status) {
assert.strictEqual(status.code, surface_client.status.CANCELLED);
done();
});
call.cancel();
});
});
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment