Skip to content
Snippets Groups Projects
Commit ffac55df authored by murgatroid99's avatar murgatroid99
Browse files

Refactor client logic into superclass with generic methods, improve documentation

parent c2ad372d
No related branches found
No related tags found
No related merge requests found
...@@ -31,6 +31,10 @@ ...@@ -31,6 +31,10 @@
* *
*/ */
/**
* @module
*/
'use strict'; 'use strict';
var path = require('path'); var path = require('path');
...@@ -256,5 +260,10 @@ exports.getClientChannel = client.getClientChannel; ...@@ -256,5 +260,10 @@ exports.getClientChannel = client.getClientChannel;
exports.waitForClientReady = client.waitForClientReady; exports.waitForClientReady = client.waitForClientReady;
exports.closeClient = function closeClient(client_obj) { exports.closeClient = function closeClient(client_obj) {
client.getClientChannel(client_obj).close(); client.Client.prototype.close.apply(client_obj);
}; };
/**
* @see module:src/client.Client
*/
exports.Client = client.Client;
...@@ -37,7 +37,7 @@ ...@@ -37,7 +37,7 @@
* This module contains the factory method for creating Client classes, and the * This module contains the factory method for creating Client classes, and the
* method calling code for all types of methods. * method calling code for all types of methods.
* *
* For example, to create a client and call a method on it: * @example <caption>Create a client and call a method on it</caption>
* *
* var proto_obj = grpc.load(proto_file_path); * var proto_obj = grpc.load(proto_file_path);
* var Client = proto_obj.package.subpackage.ServiceName; * var Client = proto_obj.package.subpackage.ServiceName;
...@@ -68,14 +68,33 @@ var Duplex = stream.Duplex; ...@@ -68,14 +68,33 @@ var Duplex = stream.Duplex;
var util = require('util'); var util = require('util');
var version = require('../../../package.json').version; var version = require('../../../package.json').version;
util.inherits(ClientUnaryCall, EventEmitter);
/**
* An EventEmitter. Used for unary calls
* @constructor
* @extends external:EventEmitter
* @param {grpc.Call} call The call object associated with the request
*/
function ClientUnaryCall(call) {
EventEmitter.call(this);
this.call = call;
}
util.inherits(ClientWritableStream, Writable); util.inherits(ClientWritableStream, Writable);
/** /**
* A stream that the client can write to. Used for calls that are streaming from * A stream that the client can write to. Used for calls that are streaming from
* the client side. * the client side.
* @constructor * @constructor
* @extends external:Writable
* @borrows module:src/client~ClientUnaryCall#cancel as
* module:src/client~ClientWritableStream#cancel
* @borrows module:src/client~ClientUnaryCall#getPeer as
* module:src/client~ClientWritableStream#getPeer
* @param {grpc.Call} call The call object to send data with * @param {grpc.Call} call The call object to send data with
* @param {function(*):Buffer=} serialize Serialization function for writes. * @param {module:src/common~serialize=} [serialize=identity] Serialization
* function for writes.
*/ */
function ClientWritableStream(call, serialize) { function ClientWritableStream(call, serialize) {
Writable.call(this, {objectMode: true}); Writable.call(this, {objectMode: true});
...@@ -134,8 +153,14 @@ util.inherits(ClientReadableStream, Readable); ...@@ -134,8 +153,14 @@ util.inherits(ClientReadableStream, Readable);
* A stream that the client can read from. Used for calls that are streaming * A stream that the client can read from. Used for calls that are streaming
* from the server side. * from the server side.
* @constructor * @constructor
* @extends external:Readable
* @borrows module:src/client~ClientUnaryCall#cancel as
* module:src/client~ClientReadableStream#cancel
* @borrows module:src/client~ClientUnaryCall#getPeer as
* module:src/client~ClientReadableStream#getPeer
* @param {grpc.Call} call The call object to read data with * @param {grpc.Call} call The call object to read data with
* @param {function(Buffer):*=} deserialize Deserialization function for reads * @param {module:src/common~deserialize=} [deserialize=identity]
* Deserialization function for reads
*/ */
function ClientReadableStream(call, deserialize) { function ClientReadableStream(call, deserialize) {
Readable.call(this, {objectMode: true}); Readable.call(this, {objectMode: true});
...@@ -155,6 +180,7 @@ function ClientReadableStream(call, deserialize) { ...@@ -155,6 +180,7 @@ function ClientReadableStream(call, deserialize) {
* parameter indicates that the call should end with that status. status * parameter indicates that the call should end with that status. status
* defaults to OK if not provided. * defaults to OK if not provided.
* @param {Object!} status The status that the call should end with * @param {Object!} status The status that the call should end with
* @access private
*/ */
function _readsDone(status) { function _readsDone(status) {
/* jshint validthis: true */ /* jshint validthis: true */
...@@ -173,6 +199,7 @@ ClientReadableStream.prototype._readsDone = _readsDone; ...@@ -173,6 +199,7 @@ ClientReadableStream.prototype._readsDone = _readsDone;
/** /**
* Called to indicate that we have received a status from the server. * Called to indicate that we have received a status from the server.
* @access private
*/ */
function _receiveStatus(status) { function _receiveStatus(status) {
/* jshint validthis: true */ /* jshint validthis: true */
...@@ -185,6 +212,7 @@ ClientReadableStream.prototype._receiveStatus = _receiveStatus; ...@@ -185,6 +212,7 @@ ClientReadableStream.prototype._receiveStatus = _receiveStatus;
/** /**
* If we have both processed all incoming messages and received the status from * If we have both processed all incoming messages and received the status from
* the server, emit the status. Otherwise, do nothing. * the server, emit the status. Otherwise, do nothing.
* @access private
*/ */
function _emitStatusIfDone() { function _emitStatusIfDone() {
/* jshint validthis: true */ /* jshint validthis: true */
...@@ -270,10 +298,16 @@ util.inherits(ClientDuplexStream, Duplex); ...@@ -270,10 +298,16 @@ util.inherits(ClientDuplexStream, Duplex);
* A stream that the client can read from or write to. Used for calls with * A stream that the client can read from or write to. Used for calls with
* duplex streaming. * duplex streaming.
* @constructor * @constructor
* @extends external:Duplex
* @borrows module:src/client~ClientUnaryCall#cancel as
* module:src/client~ClientDuplexStream#cancel
* @borrows module:src/client~ClientUnaryCall#getPeer as
* module:src/client~ClientDuplexStream#getPeer
* @param {grpc.Call} call Call object to proxy * @param {grpc.Call} call Call object to proxy
* @param {function(*):Buffer=} serialize Serialization function for requests * @param {module:src/common~serialize=} [serialize=identity] Serialization
* @param {function(Buffer):*=} deserialize Deserialization function for * function for requests
* responses * @param {module:src/common~deserialize=} [deserialize=identity]
* Deserialization function for responses
*/ */
function ClientDuplexStream(call, serialize, deserialize) { function ClientDuplexStream(call, serialize, deserialize) {
Duplex.call(this, {objectMode: true}); Duplex.call(this, {objectMode: true});
...@@ -300,12 +334,14 @@ ClientDuplexStream.prototype._write = _write; ...@@ -300,12 +334,14 @@ ClientDuplexStream.prototype._write = _write;
/** /**
* Cancel the ongoing call * Cancel the ongoing call
* @alias module:src/client~ClientUnaryCall#cancel
*/ */
function cancel() { function cancel() {
/* jshint validthis: true */ /* jshint validthis: true */
this.call.cancel(); this.call.cancel();
} }
ClientUnaryCall.prototype.cancel = cancel;
ClientReadableStream.prototype.cancel = cancel; ClientReadableStream.prototype.cancel = cancel;
ClientWritableStream.prototype.cancel = cancel; ClientWritableStream.prototype.cancel = cancel;
ClientDuplexStream.prototype.cancel = cancel; ClientDuplexStream.prototype.cancel = cancel;
...@@ -313,21 +349,49 @@ ClientDuplexStream.prototype.cancel = cancel; ...@@ -313,21 +349,49 @@ ClientDuplexStream.prototype.cancel = cancel;
/** /**
* Get the endpoint this call/stream is connected to. * Get the endpoint this call/stream is connected to.
* @return {string} The URI of the endpoint * @return {string} The URI of the endpoint
* @alias module:src/client~ClientUnaryCall#getPeer
*/ */
function getPeer() { function getPeer() {
/* jshint validthis: true */ /* jshint validthis: true */
return this.call.getPeer(); return this.call.getPeer();
} }
ClientUnaryCall.prototype.getPeer = getPeer;
ClientReadableStream.prototype.getPeer = getPeer; ClientReadableStream.prototype.getPeer = getPeer;
ClientWritableStream.prototype.getPeer = getPeer; ClientWritableStream.prototype.getPeer = getPeer;
ClientDuplexStream.prototype.getPeer = getPeer; ClientDuplexStream.prototype.getPeer = getPeer;
/**
* Any client call type
* @typedef {(ClientUnaryCall|ClientReadableStream|
* ClientWritableStream|ClientDuplexStream)}
* module:src/client~Call
*/
/**
* Options that can be set on a call.
* @typedef {Object} module:src/client~CallOptions
* @property {(date|number)} deadline The deadline for the entire call to
* complete. A value of Infinity indicates that no deadline should be set.
* @property {(string)} host Server hostname to set on the call. Only meaningful
* if different from the server address used to construct the client.
* @property {module:src/client~Call} parent Parent call. Used in servers when
* making a call as part of the process of handling a call. Used to
* propagate some information automatically, as specified by
* propagate_flags.
* @property {number} propagate_flags Indicates which properties of a parent
* call should propagate to this call. Bitwise combination of flags in
* [grpc.propagate]{@link module:index.propagate}.
* @property {module:src/credentials~CallCredentials} credentials The
* credentials that should be used to make this particular call.
*/
/** /**
* Get a call object built with the provided options. Keys for options are * Get a call object built with the provided options. Keys for options are
* 'deadline', which takes a date or number, and 'host', which takes a string * 'deadline', which takes a date or number, and 'host', which takes a string
* and overrides the hostname to connect to. * and overrides the hostname to connect to.
* @param {Object} options Options map. * @access private
* @param {module:src/client~CallOptions=} options Options object.
*/ */
function getCall(channel, method, options) { function getCall(channel, method, options) {
var deadline; var deadline;
...@@ -354,315 +418,380 @@ function getCall(channel, method, options) { ...@@ -354,315 +418,380 @@ function getCall(channel, method, options) {
} }
/** /**
* Get a function that can make unary requests to the specified method. * A generic gRPC client. Primarily useful as a base class for generated clients
* @param {string} method The name of the method to request * @alias module:src/client.Client
* @param {function(*):Buffer} serialize The serialization function for inputs * @constructor
* @param {function(Buffer)} deserialize The deserialization function for * @param {string} address Server address to connect to
* outputs * @param {module:src/credentials~ChannelCredentials} credentials Credentials to
* @return {Function} makeUnaryRequest * use to connect to the server
* @param {Object} options Options to apply to channel creation
*/ */
function makeUnaryRequestFunction(method, serialize, deserialize) { var Client = exports.Client = function Client(address, credentials, options) {
/** if (!options) {
* Make a unary request with this method on the given channel with the given options = {};
* argument, callback, etc. }
* @this {Client} Client object. Must have a channel member. /* Append the grpc-node user agent string after the application user agent
* @param {*} argument The argument to the call. Should be serializable with * string, and put the combination at the beginning of the user agent string
* serialize
* @param {Metadata=} metadata Metadata to add to the call
* @param {Object=} options Options map
* @param {function(?Error, value=)} callback The callback to for when the
* response is received
* @return {EventEmitter} An event emitter for stream related events
*/ */
function makeUnaryRequest(argument, metadata, options, callback) { if (options['grpc.primary_user_agent']) {
/* jshint validthis: true */ options['grpc.primary_user_agent'] += ' ';
/* While the arguments are listed in the function signature, those variables } else {
* are not used directly. Instead, ArgueJS processes the arguments options['grpc.primary_user_agent'] = '';
* object. This allows for simple handling of optional arguments in the
* middle of the argument list, and also provides type checking. */
var args = arguejs({argument: null, metadata: [Metadata, new Metadata()],
options: [Object], callback: Function}, arguments);
var emitter = new EventEmitter();
var call = getCall(this.$channel, method, args.options);
metadata = args.metadata.clone();
emitter.cancel = function cancel() {
call.cancel();
};
emitter.getPeer = function getPeer() {
return call.getPeer();
};
var client_batch = {};
var message = serialize(args.argument);
if (args.options) {
message.grpcWriteFlags = args.options.flags;
}
client_batch[grpc.opType.SEND_INITIAL_METADATA] =
metadata._getCoreRepresentation();
client_batch[grpc.opType.SEND_MESSAGE] = message;
client_batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true;
client_batch[grpc.opType.RECV_INITIAL_METADATA] = true;
client_batch[grpc.opType.RECV_MESSAGE] = true;
client_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true;
call.startBatch(client_batch, function(err, response) {
response.status.metadata = Metadata._fromCoreRepresentation(
response.status.metadata);
var status = response.status;
var error;
var deserialized;
emitter.emit('metadata', Metadata._fromCoreRepresentation(
response.metadata));
if (status.code === grpc.status.OK) {
if (err) {
// Got a batch error, but OK status. Something went wrong
args.callback(err);
return;
} else {
try {
deserialized = deserialize(response.read);
} catch (e) {
/* Change status to indicate bad server response. This will result
* in passing an error to the callback */
status = {
code: grpc.status.INTERNAL,
details: 'Failed to parse server response'
};
}
}
}
if (status.code !== grpc.status.OK) {
error = new Error(status.details);
error.code = status.code;
error.metadata = status.metadata;
args.callback(error);
} else {
args.callback(null, deserialized);
}
emitter.emit('status', status);
});
return emitter;
} }
return makeUnaryRequest; options['grpc.primary_user_agent'] += 'grpc-node/' + version;
} /* Private fields use $ as a prefix instead of _ because it is an invalid
* prefix of a method name */
this.$channel = new grpc.Channel(address, credentials, options);
};
/** /**
* Get a function that can make client stream requests to the specified method. * @typedef {Error} module:src/client.Client~ServiceError
* @property {number} code The error code, a key of
* [grpc.status]{@link module:src/client.status}
* @property {module:metadata.Metadata} metadata Metadata sent with the status
* by the server, if any
*/
/**
* @callback module:src/client.Client~requestCallback
* @param {?module:src/client.Client~ServiceError} error The error, if the call
* failed
* @param {*} value The response value, if the call succeeded
*/
/**
* Make a unary request to the given method, using the given serialize
* and deserialize functions, with the given argument.
* @param {string} method The name of the method to request * @param {string} method The name of the method to request
* @param {function(*):Buffer} serialize The serialization function for inputs * @param {module:src/common~serialize} serialize The serialization function for
* @param {function(Buffer)} deserialize The deserialization function for * inputs
* outputs * @param {module:src/common~deserialize} deserialize The deserialization
* @return {Function} makeClientStreamRequest * function for outputs
* @param {*} argument The argument to the call. Should be serializable with
* serialize
* @param {module:src/metadata.Metadata=} metadata Metadata to add to the call
* @param {module:src/client~CallOptions=} options Options map
* @param {module:src/client.Client~requestCallback} callback The callback to
* for when the response is received
* @return {EventEmitter} An event emitter for stream related events
*/ */
function makeClientStreamRequestFunction(method, serialize, deserialize) { Client.prototype.makeUnaryRequest = function(method, serialize, deserialize,
/** argument, metadata, options,
* Make a client stream request with this method on the given channel with the callback) {
* given callback, etc. /* While the arguments are listed in the function signature, those variables
* @this {Client} Client object. Must have a channel member. * are not used directly. Instead, ArgueJS processes the arguments
* @param {Metadata=} metadata Array of metadata key/value pairs to add to the * object. This allows for simple handling of optional arguments in the
* call * middle of the argument list, and also provides type checking. */
* @param {Object=} options Options map var args = arguejs({method: String, serialize: Function,
* @param {function(?Error, value=)} callback The callback to for when the deserialize: Function,
* response is received argument: null, metadata: [Metadata, new Metadata()],
* @return {EventEmitter} An event emitter for stream related events options: [Object], callback: Function}, arguments);
*/ var call = getCall(this.$channel, method, args.options);
function makeClientStreamRequest(metadata, options, callback) { var emitter = new ClientUnaryCall(call);
/* jshint validthis: true */ metadata = args.metadata.clone();
/* While the arguments are listed in the function signature, those variables var client_batch = {};
* are not used directly. Instead, ArgueJS processes the arguments var message = serialize(args.argument);
* object. This allows for simple handling of optional arguments in the if (args.options) {
* middle of the argument list, and also provides type checking. */ message.grpcWriteFlags = args.options.flags;
var args = arguejs({metadata: [Metadata, new Metadata()], }
options: [Object], callback: Function}, arguments);
var call = getCall(this.$channel, method, args.options); client_batch[grpc.opType.SEND_INITIAL_METADATA] =
metadata = args.metadata.clone(); metadata._getCoreRepresentation();
var stream = new ClientWritableStream(call, serialize); client_batch[grpc.opType.SEND_MESSAGE] = message;
var metadata_batch = {}; client_batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true;
metadata_batch[grpc.opType.SEND_INITIAL_METADATA] = client_batch[grpc.opType.RECV_INITIAL_METADATA] = true;
metadata._getCoreRepresentation(); client_batch[grpc.opType.RECV_MESSAGE] = true;
metadata_batch[grpc.opType.RECV_INITIAL_METADATA] = true; client_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true;
call.startBatch(metadata_batch, function(err, response) { call.startBatch(client_batch, function(err, response) {
response.status.metadata = Metadata._fromCoreRepresentation(
response.status.metadata);
var status = response.status;
var error;
var deserialized;
emitter.emit('metadata', Metadata._fromCoreRepresentation(
response.metadata));
if (status.code === grpc.status.OK) {
if (err) { if (err) {
// The call has stopped for some reason. A non-OK status will arrive // Got a batch error, but OK status. Something went wrong
// in the other batch. args.callback(err);
return; return;
}
stream.emit('metadata', Metadata._fromCoreRepresentation(
response.metadata));
});
var client_batch = {};
client_batch[grpc.opType.RECV_MESSAGE] = true;
client_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true;
call.startBatch(client_batch, function(err, response) {
response.status.metadata = Metadata._fromCoreRepresentation(
response.status.metadata);
var status = response.status;
var error;
var deserialized;
if (status.code === grpc.status.OK) {
if (err) {
// Got a batch error, but OK status. Something went wrong
args.callback(err);
return;
} else {
try {
deserialized = deserialize(response.read);
} catch (e) {
/* Change status to indicate bad server response. This will result
* in passing an error to the callback */
status = {
code: grpc.status.INTERNAL,
details: 'Failed to parse server response'
};
}
}
}
if (status.code !== grpc.status.OK) {
error = new Error(response.status.details);
error.code = status.code;
error.metadata = status.metadata;
args.callback(error);
} else { } else {
args.callback(null, deserialized); try {
deserialized = deserialize(response.read);
} catch (e) {
/* Change status to indicate bad server response. This will result
* in passing an error to the callback */
status = {
code: grpc.status.INTERNAL,
details: 'Failed to parse server response'
};
}
} }
stream.emit('status', status); }
}); if (status.code !== grpc.status.OK) {
return stream; error = new Error(status.details);
} error.code = status.code;
return makeClientStreamRequest; error.metadata = status.metadata;
} args.callback(error);
} else {
args.callback(null, deserialized);
}
emitter.emit('status', status);
});
return emitter;
};
/** /**
* Get a function that can make server stream requests to the specified method. * Make a client stream request to the given method, using the given serialize
* and deserialize functions, with the given argument.
* @param {string} method The name of the method to request * @param {string} method The name of the method to request
* @param {function(*):Buffer} serialize The serialization function for inputs * @param {module:src/common~serialize} serialize The serialization function for
* @param {function(Buffer)} deserialize The deserialization function for * inputs
* outputs * @param {module:src/common~deserialize} deserialize The deserialization
* @return {Function} makeServerStreamRequest * function for outputs
* @param {module:src/metadata.Metadata=} metadata Array of metadata key/value
* pairs to add to the call
* @param {module:src/client~CallOptions=} options Options map
* @param {Client~requestCallback} callback The callback to for when the
* response is received
* @return {module:src/client~ClientWritableStream} An event emitter for stream
* related events
*/ */
function makeServerStreamRequestFunction(method, serialize, deserialize) { Client.prototype.makeClientStreamRequest = function(method, serialize,
/** deserialize, metadata,
* Make a server stream request with this method on the given channel with the options, callback) {
* given argument, etc. /* While the arguments are listed in the function signature, those variables
* @this {SurfaceClient} Client object. Must have a channel member. * are not used directly. Instead, ArgueJS processes the arguments
* @param {*} argument The argument to the call. Should be serializable with * object. This allows for simple handling of optional arguments in the
* serialize * middle of the argument list, and also provides type checking. */
* @param {Metadata=} metadata Array of metadata key/value pairs to add to the var args = arguejs({method:String, serialize: Function,
* call deserialize: Function,
* @param {Object} options Options map metadata: [Metadata, new Metadata()],
* @return {EventEmitter} An event emitter for stream related events options: [Object], callback: Function}, arguments);
*/ var call = getCall(this.$channel, method, args.options);
function makeServerStreamRequest(argument, metadata, options) { metadata = args.metadata.clone();
/* jshint validthis: true */ var stream = new ClientWritableStream(call, serialize);
/* While the arguments are listed in the function signature, those variables var metadata_batch = {};
* are not used directly. Instead, ArgueJS processes the arguments metadata_batch[grpc.opType.SEND_INITIAL_METADATA] =
* object. */ metadata._getCoreRepresentation();
var args = arguejs({argument: null, metadata: [Metadata, new Metadata()], metadata_batch[grpc.opType.RECV_INITIAL_METADATA] = true;
options: [Object]}, arguments); call.startBatch(metadata_batch, function(err, response) {
var call = getCall(this.$channel, method, args.options); if (err) {
metadata = args.metadata.clone(); // The call has stopped for some reason. A non-OK status will arrive
var stream = new ClientReadableStream(call, deserialize); // in the other batch.
var start_batch = {}; return;
var message = serialize(args.argument);
if (args.options) {
message.grpcWriteFlags = args.options.flags;
} }
start_batch[grpc.opType.SEND_INITIAL_METADATA] = stream.emit('metadata', Metadata._fromCoreRepresentation(
metadata._getCoreRepresentation(); response.metadata));
start_batch[grpc.opType.RECV_INITIAL_METADATA] = true; });
start_batch[grpc.opType.SEND_MESSAGE] = message; var client_batch = {};
start_batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true; client_batch[grpc.opType.RECV_MESSAGE] = true;
call.startBatch(start_batch, function(err, response) { client_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true;
if (err) { call.startBatch(client_batch, function(err, response) {
// The call has stopped for some reason. A non-OK status will arrive response.status.metadata = Metadata._fromCoreRepresentation(
// in the other batch. response.status.metadata);
return; var status = response.status;
} var error;
stream.emit('metadata', Metadata._fromCoreRepresentation( var deserialized;
response.metadata)); if (status.code === grpc.status.OK) {
});
var status_batch = {};
status_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true;
call.startBatch(status_batch, function(err, response) {
if (err) { if (err) {
stream.emit('error', err); // Got a batch error, but OK status. Something went wrong
args.callback(err);
return; return;
} else {
try {
deserialized = deserialize(response.read);
} catch (e) {
/* Change status to indicate bad server response. This will result
* in passing an error to the callback */
status = {
code: grpc.status.INTERNAL,
details: 'Failed to parse server response'
};
}
} }
response.status.metadata = Metadata._fromCoreRepresentation( }
response.status.metadata); if (status.code !== grpc.status.OK) {
stream._receiveStatus(response.status); error = new Error(response.status.details);
}); error.code = status.code;
return stream; error.metadata = status.metadata;
} args.callback(error);
return makeServerStreamRequest; } else {
} args.callback(null, deserialized);
}
stream.emit('status', status);
});
return stream;
};
/** /**
* Get a function that can make bidirectional stream requests to the specified * Make a server stream request to the given method, with the given serialize
* method. * and deserialize function, using the given argument
* @param {string} method The name of the method to request * @param {string} method The name of the method to request
* @param {function(*):Buffer} serialize The serialization function for inputs * @param {module:src/common~serialize} serialize The serialization function for
* @param {function(Buffer)} deserialize The deserialization function for * inputs
* outputs * @param {module:src/common~deserialize} deserialize The deserialization
* @return {Function} makeBidiStreamRequest * function for outputs
* @param {*} argument The argument to the call. Should be serializable with
* serialize
* @param {module:src/metadata.Metadata=} metadata Array of metadata key/value
* pairs to add to the call
* @param {module:src/client~CallOptions=} options Options map
* @return {module:src/client~ClientReadableStream} An event emitter for stream
* related events
*/ */
function makeBidiStreamRequestFunction(method, serialize, deserialize) { Client.prototype.makeServerStreamRequest = function(method, serialize,
/** deserialize, argument,
* Make a bidirectional stream request with this method on the given channel. metadata, options) {
* @this {SurfaceClient} Client object. Must have a channel member. /* While the arguments are listed in the function signature, those variables
* @param {Metadata=} metadata Array of metadata key/value pairs to add to the * are not used directly. Instead, ArgueJS processes the arguments
* call * object. */
* @param {Options} options Options map var args = arguejs({method:String, serialize: Function,
* @return {EventEmitter} An event emitter for stream related events deserialize: Function,
*/ argument: null, metadata: [Metadata, new Metadata()],
function makeBidiStreamRequest(metadata, options) { options: [Object]}, arguments);
/* jshint validthis: true */ var call = getCall(this.$channel, method, args.options);
/* While the arguments are listed in the function signature, those variables metadata = args.metadata.clone();
* are not used directly. Instead, ArgueJS processes the arguments var stream = new ClientReadableStream(call, deserialize);
* object. */ var start_batch = {};
var args = arguejs({metadata: [Metadata, new Metadata()], var message = serialize(args.argument);
options: [Object]}, arguments); if (args.options) {
var call = getCall(this.$channel, method, args.options); message.grpcWriteFlags = args.options.flags;
metadata = args.metadata.clone();
var stream = new ClientDuplexStream(call, serialize, deserialize);
var start_batch = {};
start_batch[grpc.opType.SEND_INITIAL_METADATA] =
metadata._getCoreRepresentation();
start_batch[grpc.opType.RECV_INITIAL_METADATA] = true;
call.startBatch(start_batch, function(err, response) {
if (err) {
// The call has stopped for some reason. A non-OK status will arrive
// in the other batch.
return;
}
stream.emit('metadata', Metadata._fromCoreRepresentation(
response.metadata));
});
var status_batch = {};
status_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true;
call.startBatch(status_batch, function(err, response) {
if (err) {
stream.emit('error', err);
return;
}
response.status.metadata = Metadata._fromCoreRepresentation(
response.status.metadata);
stream._receiveStatus(response.status);
});
return stream;
} }
return makeBidiStreamRequest; start_batch[grpc.opType.SEND_INITIAL_METADATA] =
} metadata._getCoreRepresentation();
start_batch[grpc.opType.RECV_INITIAL_METADATA] = true;
start_batch[grpc.opType.SEND_MESSAGE] = message;
start_batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true;
call.startBatch(start_batch, function(err, response) {
if (err) {
// The call has stopped for some reason. A non-OK status will arrive
// in the other batch.
return;
}
stream.emit('metadata', Metadata._fromCoreRepresentation(
response.metadata));
});
var status_batch = {};
status_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true;
call.startBatch(status_batch, function(err, response) {
if (err) {
stream.emit('error', err);
return;
}
response.status.metadata = Metadata._fromCoreRepresentation(
response.status.metadata);
stream._receiveStatus(response.status);
});
return stream;
};
/**
* Make a bidirectional stream request with this method on the given channel.
* @param {string} method The name of the method to request
* @param {module:src/common~serialize} serialize The serialization function for
* inputs
* @param {module:src/common~deserialize} deserialize The deserialization
* function for outputs
* @param {module:src/metadata.Metadata=} metadata Array of metadata key/value
* pairs to add to the call
* @param {module:src/client~CallOptions=} options Options map
* @return {module:src/client~ClientDuplexStream} An event emitter for stream
* related events
*/
Client.prototype.makeBidiStreamRequest = function(method, serialize,
deserialize, metadata,
options) {
/* While the arguments are listed in the function signature, those variables
* are not used directly. Instead, ArgueJS processes the arguments
* object. */
var args = arguejs({method:String, serialize: Function,
deserialize: Function,
metadata: [Metadata, new Metadata()],
options: [Object]}, arguments);
var call = getCall(this.$channel, method, args.options);
metadata = args.metadata.clone();
var stream = new ClientDuplexStream(call, serialize, deserialize);
var start_batch = {};
start_batch[grpc.opType.SEND_INITIAL_METADATA] =
metadata._getCoreRepresentation();
start_batch[grpc.opType.RECV_INITIAL_METADATA] = true;
call.startBatch(start_batch, function(err, response) {
if (err) {
// The call has stopped for some reason. A non-OK status will arrive
// in the other batch.
return;
}
stream.emit('metadata', Metadata._fromCoreRepresentation(
response.metadata));
});
var status_batch = {};
status_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true;
call.startBatch(status_batch, function(err, response) {
if (err) {
stream.emit('error', err);
return;
}
response.status.metadata = Metadata._fromCoreRepresentation(
response.status.metadata);
stream._receiveStatus(response.status);
});
return stream;
};
Client.prototype.close = function() {
this.$channel.close();
};
/**
* Return the underlying channel object for the specified client
* @return {Channel} The channel
*/
Client.prototype.getChannel = function() {
return this.$channel;
};
/**
* Wait for the client to be ready. The callback will be called when the
* client has successfully connected to the server, and it will be called
* with an error if the attempt to connect to the server has unrecoverablly
* failed or if the deadline expires. This function will make the channel
* start connecting if it has not already done so.
* @param {(Date|Number)} deadline When to stop waiting for a connection. Pass
* Infinity to wait forever.
* @param {function(Error)} callback The callback to call when done attempting
* to connect.
*/
Client.prototype.waitForReady = function(deadline, callback) {
var self = this;
var checkState = function(err) {
if (err) {
callback(new Error('Failed to connect before the deadline'));
return;
}
var new_state = self.$channel.getConnectivityState(true);
if (new_state === grpc.connectivityState.READY) {
callback();
} else if (new_state === grpc.connectivityState.FATAL_FAILURE) {
callback(new Error('Failed to connect to server'));
} else {
self.$channel.watchConnectivityState(new_state, deadline, checkState);
}
};
checkState();
};
/** /**
* Map with short names for each of the requester maker functions. Used in * Map with short names for each of the requester maker functions. Used in
* makeClientConstructor * makeClientConstructor
* @access private
*/ */
var requester_makers = { var requester_funcs = {
unary: makeUnaryRequestFunction, unary: Client.prototype.makeUnaryRequest,
server_stream: makeServerStreamRequestFunction, server_stream: Client.prototype.makeServerStreamRequest,
client_stream: makeClientStreamRequestFunction, client_stream: Client.prototype.makeClientStreamRequest,
bidi: makeBidiStreamRequestFunction bidi: Client.prototype.makeBidiStreamRequest
}; };
function getDefaultValues(metadata, options) { function getDefaultValues(metadata, options) {
...@@ -675,6 +804,7 @@ function getDefaultValues(metadata, options) { ...@@ -675,6 +804,7 @@ function getDefaultValues(metadata, options) {
/** /**
* Map with wrappers for each type of requester function to make it use the old * Map with wrappers for each type of requester function to make it use the old
* argument order with optional arguments after the callback. * argument order with optional arguments after the callback.
* @access private
*/ */
var deprecated_request_wrap = { var deprecated_request_wrap = {
unary: function(makeUnaryRequest) { unary: function(makeUnaryRequest) {
...@@ -700,55 +830,33 @@ var deprecated_request_wrap = { ...@@ -700,55 +830,33 @@ var deprecated_request_wrap = {
}; };
/** /**
* Creates a constructor for a client with the given methods. The methods object * Creates a constructor for a client with the given methods, as specified in
* maps method name to an object with the following keys: * the methods argument.
* path: The path on the server for accessing the method. For example, for * @param {module:src/common~ServiceDefinition} methods An object mapping
* protocol buffers, we use "/service_name/method_name" * method names to method attributes
* requestStream: bool indicating whether the client sends a stream
* resonseStream: bool indicating whether the server sends a stream
* requestSerialize: function to serialize request objects
* responseDeserialize: function to deserialize response objects
* @param {Object} methods An object mapping method names to method attributes
* @param {string} serviceName The fully qualified name of the service * @param {string} serviceName The fully qualified name of the service
* @param {Object} class_options An options object. Currently only uses the key * @param {Object} class_options An options object.
* deprecatedArgumentOrder, a boolean that Indicates that the old argument * @param {boolean=} [class_options.deprecatedArgumentOrder=false] Indicates
* order should be used for methods, with optional arguments at the end * that the old argument order should be used for methods, with optional
* instead of the callback at the end. Defaults to false. This option is * arguments at the end instead of the callback at the end. This option
* only a temporary stopgap measure to smooth an API breakage. * is only a temporary stopgap measure to smooth an API breakage.
* It is deprecated, and new code should not use it. * It is deprecated, and new code should not use it.
* @return {function(string, Object)} New client constructor * @return {function(string, Object)} New client constructor, which is a
* subclass of [grpc.Client]{@link module:src/client.Client}, and has the
* same arguments as that constructor.
*/ */
exports.makeClientConstructor = function(methods, serviceName, exports.makeClientConstructor = function(methods, serviceName,
class_options) { class_options) {
if (!class_options) { if (!class_options) {
class_options = {}; class_options = {};
} }
/**
* Create a client with the given methods function ServiceClient(address, credentials, options) {
* @constructor Client.call(this, address, credentials, options);
* @param {string} address The address of the server to connect to
* @param {grpc.Credentials} credentials Credentials to use to connect
* to the server
* @param {Object} options Options to pass to the underlying channel
*/
function Client(address, credentials, options) {
if (!options) {
options = {};
}
/* Append the grpc-node user agent string after the application user agent
* string, and put the combination at the beginning of the user agent string
*/
if (options['grpc.primary_user_agent']) {
options['grpc.primary_user_agent'] += ' ';
} else {
options['grpc.primary_user_agent'] = '';
}
options['grpc.primary_user_agent'] += 'grpc-node/' + version;
/* Private fields use $ as a prefix instead of _ because it is an invalid
* prefix of a method name */
this.$channel = new grpc.Channel(address, credentials, options);
} }
util.inherits(ServiceClient, Client);
_.each(methods, function(attrs, name) { _.each(methods, function(attrs, name) {
var method_type; var method_type;
if (_.startsWith(name, '$')) { if (_.startsWith(name, '$')) {
...@@ -769,20 +877,20 @@ exports.makeClientConstructor = function(methods, serviceName, ...@@ -769,20 +877,20 @@ exports.makeClientConstructor = function(methods, serviceName,
} }
var serialize = attrs.requestSerialize; var serialize = attrs.requestSerialize;
var deserialize = attrs.responseDeserialize; var deserialize = attrs.responseDeserialize;
var method_func = requester_makers[method_type]( var method_func = _.partial(requester_funcs[method_type], attrs.path,
attrs.path, serialize, deserialize); serialize, deserialize);
if (class_options.deprecatedArgumentOrder) { if (class_options.deprecatedArgumentOrder) {
Client.prototype[name] = deprecated_request_wrap(method_func); ServiceClient.prototype[name] = deprecated_request_wrap(method_func);
} else { } else {
Client.prototype[name] = method_func; ServiceClient.prototype[name] = method_func;
} }
// Associate all provided attributes with the method // Associate all provided attributes with the method
_.assign(Client.prototype[name], attrs); _.assign(ServiceClient.prototype[name], attrs);
}); });
Client.service = methods; ServiceClient.service = methods;
return Client; return ServiceClient;
}; };
/** /**
...@@ -791,7 +899,7 @@ exports.makeClientConstructor = function(methods, serviceName, ...@@ -791,7 +899,7 @@ exports.makeClientConstructor = function(methods, serviceName,
* @return {Channel} The channel * @return {Channel} The channel
*/ */
exports.getClientChannel = function(client) { exports.getClientChannel = function(client) {
return client.$channel; return Client.prototype.getChannel.apply(client);
}; };
/** /**
...@@ -807,21 +915,7 @@ exports.getClientChannel = function(client) { ...@@ -807,21 +915,7 @@ exports.getClientChannel = function(client) {
* to connect. * to connect.
*/ */
exports.waitForClientReady = function(client, deadline, callback) { exports.waitForClientReady = function(client, deadline, callback) {
var checkState = function(err) { Client.prototype.waitForReady.apply(client, deadline, callback);
if (err) {
callback(new Error('Failed to connect before the deadline'));
return;
}
var new_state = client.$channel.getConnectivityState(true);
if (new_state === grpc.connectivityState.READY) {
callback();
} else if (new_state === grpc.connectivityState.FATAL_FAILURE) {
callback(new Error('Failed to connect to server'));
} else {
client.$channel.watchConnectivityState(new_state, deadline, checkState);
}
};
checkState();
}; };
/** /**
......
...@@ -43,7 +43,8 @@ var _ = require('lodash'); ...@@ -43,7 +43,8 @@ var _ = require('lodash');
/** /**
* Wrap a function to pass null-like values through without calling it. If no * Wrap a function to pass null-like values through without calling it. If no
* function is given, just uses the identity; * function is given, just uses the identity.
* @private
* @param {?function} func The function to wrap * @param {?function} func The function to wrap
* @return {function} The wrapped function * @return {function} The wrapped function
*/ */
...@@ -90,3 +91,67 @@ exports.defaultGrpcOptions = { ...@@ -90,3 +91,67 @@ exports.defaultGrpcOptions = {
enumsAsStrings: true, enumsAsStrings: true,
deprecatedArgumentOrder: false deprecatedArgumentOrder: false
}; };
// JSDoc definitions that are used in multiple other modules
/**
* The EventEmitter class in the event standard module
* @external EventEmitter
* @see https://nodejs.org/api/events.html#events_class_eventemitter
*/
/**
* The Readable class in the stream standard module
* @external Readable
* @see https://nodejs.org/api/stream.html#stream_readable_streams
*/
/**
* The Writable class in the stream standard module
* @external Writable
* @see https://nodejs.org/api/stream.html#stream_writable_streams
*/
/**
* The Duplex class in the stream standard module
* @external Duplex
* @see https://nodejs.org/api/stream.html#stream_class_stream_duplex
*/
/**
* A serialization function
* @callback module:src/common~serialize
* @param {*} value The value to serialize
* @return {Buffer} The value serialized as a byte sequence
*/
/**
* A deserialization function
* @callback module:src/common~deserialize
* @param {Buffer} data The byte sequence to deserialize
* @return {*} The data deserialized as a value
*/
/**
* An object that completely defines a service method signature.
* @typedef {Object} module:src/common~MethodDefinition
* @property {string} path The method's URL path
* @property {boolean} requestStream Indicates whether the method accepts
* a stream of requests
* @property {boolean} responseStream Indicates whether the method returns
* a stream of responses
* @property {module:src/common~serialize} requestSerialize Serialization
* function for request values
* @property {module:src/common~serialize} responseSerialize Serialization
* function for response values
* @property {module:src/common~deserialize} requestDeserialize Deserialization
* function for request data
* @property {module:src/common~deserialize} responseDeserialize Deserialization
* function for repsonse data
*/
/**
* An object that completely defines a service.
* @typedef {Object.<string, module:src/common~MethodDefinition>}
* module:src/common~ServiceDefinition
*/
...@@ -35,12 +35,7 @@ ...@@ -35,12 +35,7 @@
* Metadata module * Metadata module
* *
* This module defines the Metadata class, which represents header and trailer * This module defines the Metadata class, which represents header and trailer
* metadata for gRPC calls. Here is an example of how to use it: * metadata for gRPC calls.
*
* var metadata = new metadata_module.Metadata();
* metadata.set('key1', 'value1');
* metadata.add('key1', 'value2');
* metadata.get('key1') // returns ['value1', 'value2']
* *
* @module * @module
*/ */
...@@ -54,6 +49,12 @@ var grpc = require('./grpc_extension'); ...@@ -54,6 +49,12 @@ var grpc = require('./grpc_extension');
/** /**
* Class for storing metadata. Keys are normalized to lowercase ASCII. * Class for storing metadata. Keys are normalized to lowercase ASCII.
* @constructor * @constructor
* @alias module:src/metadata.Metadata
* @example
* var metadata = new metadata_module.Metadata();
* metadata.set('key1', 'value1');
* metadata.add('key1', 'value2');
* metadata.get('key1') // returns ['value1', 'value2']
*/ */
function Metadata() { function Metadata() {
this._internal_repr = {}; this._internal_repr = {};
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment