Skip to content
Snippets Groups Projects
Commit cca5ffa9 authored by murgatroid99's avatar murgatroid99
Browse files

Added code generation for clients and servers

parent 82fdc983
No related branches found
No related tags found
No related merge requests found
......@@ -31,32 +31,68 @@
*
*/
var _ = require('highland');
/**
* Get a function that deserializes a specific type of protobuf.
* @param {function()} cls The constructor of the message type to deserialize
* @return {function(Buffer):cls} The deserialization function
*/
function deserializeCls(cls) {
/**
* Deserialize a buffer to a message object
* @param {Buffer} arg_buf The buffer to deserialize
* @return {cls} The resulting object
*/
return function deserialize(arg_buf) {
return cls.decode(arg_buf);
};
}
/**
* Get a function that serializes objects to a buffer by protobuf class.
* @param {function()} Cls The constructor of the message type to serialize
* @return {function(Cls):Buffer} The serialization function
*/
function serializeCls(Cls) {
/**
* Serialize an object to a Buffer
* @param {Object} arg The object to serialize
* @return {Buffer} The serialized object
*/
return function serialize(arg) {
return new Buffer(new Cls(arg).encode().toBuffer());
};
}
/**
* When the given stream finishes without error, call the callback once. This
* will not be called until something begins to consume the stream.
* @param {function} callback The callback to call at stream end
* @param {stream} source The stream to watch
* @return {stream} The stream with the callback attached
* Get the fully qualified (dotted) name of a ProtoBuf.Reflect value.
* @param {ProtoBuf.Reflect.Namespace} value The value to get the name of
* @return {string} The fully qualified name of the value
*/
function onSuccessfulStreamEnd(callback, source) {
var error = false;
return source.consume(function(err, x, push, next) {
if (x === _.nil) {
if (!error) {
callback();
}
push(null, x);
} else if (err) {
error = true;
push(err);
next();
} else {
push(err, x);
next();
function fullyQualifiedName(value) {
if (value === null || value === undefined) {
return '';
}
var name = value.name;
if (value.hasOwnProperty('parent')) {
var parent_name = fullyQualifiedName(value.parent);
if (parent_name !== '') {
name = parent_name + '.' + name;
}
});
}
return name;
}
exports.onSuccessfulStreamEnd = onSuccessfulStreamEnd;
/**
* See docs for deserializeCls
*/
exports.deserializeCls = deserializeCls;
/**
* See docs for serializeCls
*/
exports.serializeCls = serializeCls;
/**
* See docs for fullyQualifiedName
*/
exports.fullyQualifiedName = fullyQualifiedName;
syntax = "proto2";
syntax = "proto3";
package math;
message DivArgs {
required int64 dividend = 1;
required int64 divisor = 2;
optional int64 dividend = 1;
optional int64 divisor = 2;
}
message DivReply {
required int64 quotient = 1;
required int64 remainder = 2;
optional int64 quotient = 1;
optional int64 remainder = 2;
}
message FibArgs {
......@@ -17,9 +17,34 @@ message FibArgs {
}
message Num {
required int64 num = 1;
optional int64 num = 1;
}
message FibReply {
required int64 count = 1;
}
\ No newline at end of file
optional int64 count = 1;
}
service Math {
// Div divides args.dividend by args.divisor and returns the quotient and
// remainder.
rpc Div (DivArgs) returns (DivReply) {
}
// DivMany accepts an arbitrary number of division args from the client stream
// and sends back the results in the reply stream. The stream continues until
// the client closes its end; the server does the same after sending all the
// replies. The stream ends immediately if either end aborts.
rpc DivMany (stream DivArgs) returns (stream DivReply) {
}
// Fib generates numbers in the Fibonacci sequence. If args.limit > 0, Fib
// generates up to limit numbers; otherwise it continues until the call is
// canceled. Unlike Fib above, Fib has no final FibReply.
rpc Fib (FibArgs) returns (stream Num) {
}
// Sum sums a stream of numbers, returning the final result once the stream
// is closed.
rpc Sum (stream Num) returns (Num) {
}
}
......@@ -38,77 +38,10 @@ var util = require('util');
var Transform = require('stream').Transform;
var builder = ProtoBuf.loadProtoFile(__dirname + '/math.proto');
var math = builder.build('math');
var grpc = require('..');
var math = grpc.load(__dirname + '/math.proto').math;
var makeConstructor = require('../surface_server.js').makeServerConstructor;
/**
* Get a function that deserializes a specific type of protobuf.
* @param {function()} cls The constructor of the message type to deserialize
* @return {function(Buffer):cls} The deserialization function
*/
function deserializeCls(cls) {
/**
* Deserialize a buffer to a message object
* @param {Buffer} arg_buf The buffer to deserialize
* @return {cls} The resulting object
*/
return function deserialize(arg_buf) {
return cls.decode(arg_buf);
};
}
/**
* Get a function that serializes objects to a buffer by protobuf class.
* @param {function()} Cls The constructor of the message type to serialize
* @return {function(Cls):Buffer} The serialization function
*/
function serializeCls(Cls) {
/**
* Serialize an object to a Buffer
* @param {Object} arg The object to serialize
* @return {Buffer} The serialized object
*/
return function serialize(arg) {
return new Buffer(new Cls(arg).encode().toBuffer());
};
}
/* This function call creates a server constructor for servers that that expose
* the four specified methods. This specifies how to serialize messages that the
* server sends and deserialize messages that the client sends, and whether the
* client or the server will send a stream of messages, for each method. This
* also specifies a prefix that will be added to method names when sending them
* on the wire. This function call and all of the preceding code in this file
* are intended to approximate what the generated code will look like for the
* math service */
var Server = makeConstructor({
Div: {
serialize: serializeCls(math.DivReply),
deserialize: deserializeCls(math.DivArgs),
client_stream: false,
server_stream: false
},
Fib: {
serialize: serializeCls(math.Num),
deserialize: deserializeCls(math.FibArgs),
client_stream: false,
server_stream: true
},
Sum: {
serialize: serializeCls(math.Num),
deserialize: deserializeCls(math.Num),
client_stream: true,
server_stream: false
},
DivMany: {
serialize: serializeCls(math.DivReply),
deserialize: deserializeCls(math.DivArgs),
client_stream: true,
server_stream: true
}
}, '/Math/');
var Server = grpc.buildServer([math.Math.service]);
/**
* Server function for division. Provides the /Math/DivMany and /Math/Div
......@@ -185,10 +118,12 @@ function mathDivMany(stream) {
}
var server = new Server({
Div: mathDiv,
Fib: mathFib,
Sum: mathSum,
DivMany: mathDivMany
'math.Math' : {
Div: mathDiv,
Fib: mathFib,
Sum: mathSum,
DivMany: mathDivMany
}
});
if (require.main === module) {
......
/*
*
* Copyright 2014, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
var _ = require('underscore');
var ProtoBuf = require('protobufjs');
var surface_client = require('./surface_client.js');
var surface_server = require('./surface_server.js');
var grpc = require('bindings')('grpc');
/**
* Load a gRPC object from an existing ProtoBuf.Reflect object.
* @param {ProtoBuf.Reflect.Namespace} value The ProtoBuf object to load.
* @return {Object<string, *>} The resulting gRPC object
*/
function loadObject(value) {
var result = {};
if (value.className === 'Namespace') {
_.each(value.children, function(child) {
result[child.name] = loadObject(child);
});
return result;
} else if (value.className === 'Service') {
return surface_client.makeClientConstructor(value);
} else if (value.className === 'Service.Message') {
return value.build();
} else {
return value;
}
}
/**
* Load a gRPC object from a .proto file.
* @param {string} filename The file to load
* @return {Object<string, *>} The resulting gRPC object
*/
function load(filename) {
var builder = ProtoBuf.loadProtoFile(filename);
return loadObject(builder.ns);
}
/**
* See docs for loadObject
*/
exports.loadObject = loadObject;
/**
* See docs for load
*/
exports.load = load;
/**
* See docs for surface_server.makeServerConstructor
*/
exports.buildServer = surface_server.makeServerConstructor;
/**
* Status name to code number mapping
*/
exports.status = grpc.status;
/**
* Call error name to code number mapping
*/
exports.callError = grpc.callError;
......@@ -8,11 +8,12 @@
"dependencies": {
"bindings": "^1.2.1",
"nan": "~1.3.0",
"underscore": "^1.7.0"
"underscore": "^1.7.0",
"protobufjs": "murgatroid99/ProtoBuf.js"
},
"devDependencies": {
"mocha": "~1.21.0",
"highland": "~2.0.0",
"protobufjs": "~3.8.0"
}
"highland": "~2.0.0"
},
"main": "main.js"
}
......@@ -31,6 +31,8 @@
*
*/
var _ = require('underscore');
var grpc = require('bindings')('grpc.node');
var common = require('./common');
......@@ -176,6 +178,10 @@ function Server(options) {
* @this Server
*/
this.start = function() {
console.log('Server starting');
_.each(handlers, function(handler, handler_name) {
console.log('Serving', handler_name);
});
if (this.started) {
throw 'Server is already running';
}
......
......@@ -35,6 +35,8 @@ var _ = require('underscore');
var client = require('./client.js');
var common = require('./common.js');
var EventEmitter = require('events').EventEmitter;
var stream = require('stream');
......@@ -44,6 +46,7 @@ var Writable = stream.Writable;
var Duplex = stream.Duplex;
var util = require('util');
function forwardEvent(fromEmitter, toEmitter, event) {
fromEmitter.on(event, function forward() {
_.partial(toEmitter.emit, event).apply(toEmitter, arguments);
......@@ -317,16 +320,13 @@ var requester_makers = {
}
/**
* Creates a constructor for clients with a service defined by the methods
* object. The methods object has string keys and values of this form:
* {serialize: function, deserialize: function, client_stream: bool,
* server_stream: bool}
* @param {!Object<string, Object>} methods Method descriptor for each method
* the client should expose
* @param {string} prefix The prefix to prepend to each method name
* Creates a constructor for clients for the given service
* @param {ProtoBuf.Reflect.Service} service The service to generate a client
* for
* @return {function(string, Object)} New client constructor
*/
function makeClientConstructor(methods, prefix) {
function makeClientConstructor(service) {
var prefix = '/' + common.fullyQualifiedName(service) + '/';
/**
* Create a client with the given methods
* @constructor
......@@ -337,27 +337,29 @@ function makeClientConstructor(methods, prefix) {
this.channel = new client.Channel(address, options);
}
_.each(methods, function(method, name) {
_.each(service.children, function(method) {
var method_type;
if (method.client_stream) {
if (method.server_stream) {
if (method.requestStream) {
if (method.responseStream) {
method_type = 'bidi';
} else {
method_type = 'client_stream';
}
} else {
if (method.server_stream) {
if (method.responseStream) {
method_type = 'server_stream';
} else {
method_type = 'unary';
}
}
SurfaceClient.prototype[name] = requester_makers[method_type](
prefix + name,
method.serialize,
method.deserialize);
SurfaceClient.prototype[method.name] = requester_makers[method_type](
prefix + method.name,
common.serializeCls(method.resolvedRequestType.build()),
common.deserializeCls(method.resolvedResponseType.build()));
});
SurfaceClient.service = service;
return SurfaceClient;
}
......
......@@ -42,6 +42,8 @@ var Writable = stream.Writable;
var Duplex = stream.Duplex;
var util = require('util');
var common = require('./common.js');
util.inherits(ServerReadableObjectStream, Readable);
/**
......@@ -287,36 +289,59 @@ var handler_makers = {
* @param {string} prefix The prefex to prepend to each method name
* @return {function(Object, Object)} New server constructor
*/
function makeServerConstructor(methods, prefix) {
function makeServerConstructor(services) {
var qual_names = [];
_.each(services, function(service) {
_.each(service.children, function(method) {
var name = common.fullyQualifiedName(method);
if (_.indexOf(qual_names, name) !== -1) {
throw new Error('Method ' + name + ' exposed by more than one service');
}
qual_names.push(name);
});
});
/**
* Create a server with the given handlers for all of the methods.
* @constructor
* @param {Object} handlers Map from method names to method handlers.
* @param {Object} service_handlers Map from service names to map from method
* names to handlers
* @param {Object} options Options to pass to the underlying server
*/
function SurfaceServer(handlers, options) {
function SurfaceServer(service_handlers, options) {
var server = new Server(options);
this.inner_server = server;
_.each(handlers, function(handler, name) {
var method = methods[name];
var method_type;
if (method.client_stream) {
if (method.server_stream) {
method_type = 'bidi';
_.each(services, function(service) {
var service_name = common.fullyQualifiedName(service);
if (service_handlers[service_name] === undefined) {
throw new Error('Handlers for service ' +
service_name + ' not provided.');
}
var prefix = '/' + common.fullyQualifiedName(service) + '/';
_.each(service.children, function(method) {
var method_type;
if (method.requestStream) {
if (method.responseStream) {
method_type = 'bidi';
} else {
method_type = 'client_stream';
}
} else {
method_type = 'client_stream';
if (method.responseStream) {
method_type = 'server_stream';
} else {
method_type = 'unary';
}
}
} else {
if (method.server_stream) {
method_type = 'server_stream';
} else {
method_type = 'unary';
if (service_handlers[service_name][method.name] === undefined) {
throw new Error('Method handler for ' +
common.fullyQualifiedName(method) + ' not provided.');
}
}
var binary_handler = handler_makers[method_type](handler,
method.serialize,
method.deserialize);
server.register('' + prefix + name, binary_handler);
var binary_handler = handler_makers[method_type](
service_handlers[service_name][method.name],
common.serializeCls(method.resolvedResponseType.build()),
common.deserializeCls(method.resolvedRequestType.build()));
server.register(prefix + method.name, binary_handler);
});
}, this);
}
......
......@@ -32,83 +32,13 @@
*/
var assert = require('assert');
var ProtoBuf = require('protobufjs');
var port_picker = require('../port_picker');
var builder = ProtoBuf.loadProtoFile(__dirname + '/../examples/math.proto');
var math = builder.build('math');
var grpc = require('..');
var math = grpc.load(__dirname + '/../examples/math.proto').math;
var client = require('../surface_client.js');
var makeConstructor = client.makeClientConstructor;
/**
* Get a function that deserializes a specific type of protobuf.
* @param {function()} cls The constructor of the message type to deserialize
* @return {function(Buffer):cls} The deserialization function
*/
function deserializeCls(cls) {
/**
* Deserialize a buffer to a message object
* @param {Buffer} arg_buf The buffer to deserialize
* @return {cls} The resulting object
*/
return function deserialize(arg_buf) {
return cls.decode(arg_buf);
};
}
/**
* Get a function that serializes objects to a buffer by protobuf class.
* @param {function()} Cls The constructor of the message type to serialize
* @return {function(Cls):Buffer} The serialization function
*/
function serializeCls(Cls) {
/**
* Serialize an object to a Buffer
* @param {Object} arg The object to serialize
* @return {Buffer} The serialized object
*/
return function serialize(arg) {
return new Buffer(new Cls(arg).encode().toBuffer());
};
}
/* This function call creates a client constructor for clients that expose the
* four specified methods. This specifies how to serialize messages that the
* client sends and deserialize messages that the server sends, and whether the
* client or the server will send a stream of messages, for each method. This
* also specifies a prefix that will be added to method names when sending them
* on the wire. This function call and all of the preceding code in this file
* are intended to approximate what the generated code will look like for the
* math client */
var MathClient = makeConstructor({
Div: {
serialize: serializeCls(math.DivArgs),
deserialize: deserializeCls(math.DivReply),
client_stream: false,
server_stream: false
},
Fib: {
serialize: serializeCls(math.FibArgs),
deserialize: deserializeCls(math.Num),
client_stream: false,
server_stream: true
},
Sum: {
serialize: serializeCls(math.Num),
deserialize: deserializeCls(math.Num),
client_stream: true,
server_stream: false
},
DivMany: {
serialize: serializeCls(math.DivArgs),
deserialize: deserializeCls(math.DivReply),
client_stream: true,
server_stream: true
}
}, '/Math/');
/**
* Channel to use to make requests to a running server.
* Client to use to make requests to a running server.
*/
var math_client;
......@@ -122,7 +52,7 @@ describe('Math client', function() {
before(function(done) {
port_picker.nextAvailablePort(function(port) {
server.bind(port).listen();
math_client = new MathClient(port);
math_client = new math.Math(port);
done();
});
});
......@@ -137,7 +67,7 @@ describe('Math client', function() {
assert.equal(value.remainder, 3);
});
call.on('status', function checkStatus(status) {
assert.strictEqual(status.code, client.status.OK);
assert.strictEqual(status.code, grpc.status.OK);
done();
});
});
......@@ -150,7 +80,7 @@ describe('Math client', function() {
next_expected += 1;
});
call.on('status', function checkStatus(status) {
assert.strictEqual(status.code, client.status.OK);
assert.strictEqual(status.code, grpc.status.OK);
done();
});
});
......@@ -164,7 +94,7 @@ describe('Math client', function() {
}
call.end();
call.on('status', function checkStatus(status) {
assert.strictEqual(status.code, client.status.OK);
assert.strictEqual(status.code, grpc.status.OK);
done();
});
});
......@@ -184,7 +114,7 @@ describe('Math client', function() {
}
call.end();
call.on('status', function checkStatus(status) {
assert.strictEqual(status.code, client.status.OK);
assert.strictEqual(status.code, grpc.status.OK);
done();
});
});
......
/*
*
* Copyright 2014, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
var assert = require('assert');
var surface_server = require('../surface_server.js');
var ProtoBuf = require('protobufjs');
var grpc = require('..');
var math_proto = ProtoBuf.loadProtoFile(__dirname + '/../examples/math.proto');
var mathService = math_proto.lookup('math.Math');
describe('Surface server constructor', function() {
it('Should fail with conflicting method names', function() {
assert.throws(function() {
grpc.buildServer([mathService, mathService]);
});
});
it('Should succeed with a single service', function() {
assert.doesNotThrow(function() {
grpc.buildServer([mathService]);
});
});
it('Should fail with missing handlers', function() {
var Server = grpc.buildServer([mathService]);
assert.throws(function() {
new Server({
'math.Math': {
'Div': function() {},
'DivMany': function() {},
'Fib': function() {}
}
});
}, /math.Math.Sum/);
});
it('Should fail with no handlers for the service', function() {
var Server = grpc.buildServer([mathService]);
assert.throws(function() {
new Server({});
}, /math.Math/);
});
});
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment