Skip to content
Snippets Groups Projects
Commit 3548ed87 authored by murgatroid99's avatar murgatroid99
Browse files

Updated to new call.invoke API

parent ea36ba32
No related branches found
No related tags found
No related merge requests found
...@@ -78,8 +78,8 @@ void Call::Init(Handle<Object> exports) { ...@@ -78,8 +78,8 @@ void Call::Init(Handle<Object> exports) {
tpl->InstanceTemplate()->SetInternalFieldCount(1); tpl->InstanceTemplate()->SetInternalFieldCount(1);
NanSetPrototypeTemplate(tpl, "addMetadata", NanSetPrototypeTemplate(tpl, "addMetadata",
FunctionTemplate::New(AddMetadata)->GetFunction()); FunctionTemplate::New(AddMetadata)->GetFunction());
NanSetPrototypeTemplate(tpl, "startInvoke", NanSetPrototypeTemplate(tpl, "invoke",
FunctionTemplate::New(StartInvoke)->GetFunction()); FunctionTemplate::New(Invoke)->GetFunction());
NanSetPrototypeTemplate(tpl, "serverAccept", NanSetPrototypeTemplate(tpl, "serverAccept",
FunctionTemplate::New(ServerAccept)->GetFunction()); FunctionTemplate::New(ServerAccept)->GetFunction());
NanSetPrototypeTemplate( NanSetPrototypeTemplate(
...@@ -203,37 +203,30 @@ NAN_METHOD(Call::AddMetadata) { ...@@ -203,37 +203,30 @@ NAN_METHOD(Call::AddMetadata) {
NanReturnUndefined(); NanReturnUndefined();
} }
NAN_METHOD(Call::StartInvoke) { NAN_METHOD(Call::Invoke) {
NanScope(); NanScope();
if (!HasInstance(args.This())) { if (!HasInstance(args.This())) {
return NanThrowTypeError("startInvoke can only be called on Call objects"); return NanThrowTypeError("invoke can only be called on Call objects");
} }
if (!args[0]->IsFunction()) { if (!args[0]->IsFunction()) {
return NanThrowTypeError("StartInvoke's first argument must be a function"); return NanThrowTypeError("invoke's first argument must be a function");
} }
if (!args[1]->IsFunction()) { if (!args[1]->IsFunction()) {
return NanThrowTypeError( return NanThrowTypeError("invoke's second argument must be a function");
"StartInvoke's second argument must be a function");
}
if (!args[2]->IsFunction()) {
return NanThrowTypeError("StartInvoke's third argument must be a function");
} }
if (!args[3]->IsUint32()) { if (!args[2]->IsUint32()) {
return NanThrowTypeError( return NanThrowTypeError("invoke's third argument must be integer flags");
"StartInvoke's fourth argument must be integer flags");
} }
Call *call = ObjectWrap::Unwrap<Call>(args.This()); Call *call = ObjectWrap::Unwrap<Call>(args.This());
unsigned int flags = args[3]->Uint32Value(); unsigned int flags = args[3]->Uint32Value();
grpc_call_error error = grpc_call_start_invoke( grpc_call_error error = grpc_call_invoke(
call->wrapped_call, CompletionQueueAsyncWorker::GetQueue(), call->wrapped_call, CompletionQueueAsyncWorker::GetQueue(),
CreateTag(args[0], args.This()), CreateTag(args[1], args.This()), CreateTag(args[0], args.This()), CreateTag(args[1], args.This()), flags);
CreateTag(args[2], args.This()), flags);
if (error == GRPC_CALL_OK) { if (error == GRPC_CALL_OK) {
CompletionQueueAsyncWorker::Next(); CompletionQueueAsyncWorker::Next();
CompletionQueueAsyncWorker::Next(); CompletionQueueAsyncWorker::Next();
CompletionQueueAsyncWorker::Next();
} else { } else {
return NanThrowError("startInvoke failed", error); return NanThrowError("invoke failed", error);
} }
NanReturnUndefined(); NanReturnUndefined();
} }
...@@ -281,7 +274,7 @@ NAN_METHOD(Call::ServerEndInitialMetadata) { ...@@ -281,7 +274,7 @@ NAN_METHOD(Call::ServerEndInitialMetadata) {
NAN_METHOD(Call::Cancel) { NAN_METHOD(Call::Cancel) {
NanScope(); NanScope();
if (!HasInstance(args.This())) { if (!HasInstance(args.This())) {
return NanThrowTypeError("startInvoke can only be called on Call objects"); return NanThrowTypeError("cancel can only be called on Call objects");
} }
Call *call = ObjectWrap::Unwrap<Call>(args.This()); Call *call = ObjectWrap::Unwrap<Call>(args.This());
grpc_call_error error = grpc_call_cancel(call->wrapped_call); grpc_call_error error = grpc_call_cancel(call->wrapped_call);
......
...@@ -61,7 +61,7 @@ class Call : public ::node::ObjectWrap { ...@@ -61,7 +61,7 @@ class Call : public ::node::ObjectWrap {
static NAN_METHOD(New); static NAN_METHOD(New);
static NAN_METHOD(AddMetadata); static NAN_METHOD(AddMetadata);
static NAN_METHOD(StartInvoke); static NAN_METHOD(Invoke);
static NAN_METHOD(ServerAccept); static NAN_METHOD(ServerAccept);
static NAN_METHOD(ServerEndInitialMetadata); static NAN_METHOD(ServerEndInitialMetadata);
static NAN_METHOD(Cancel); static NAN_METHOD(Cancel);
......
...@@ -50,99 +50,51 @@ util.inherits(GrpcClientStream, Duplex); ...@@ -50,99 +50,51 @@ util.inherits(GrpcClientStream, Duplex);
function GrpcClientStream(call, options) { function GrpcClientStream(call, options) {
Duplex.call(this, options); Duplex.call(this, options);
var self = this; var self = this;
// Indicates that we can start reading and have not received a null read var finished = false;
var can_read = false;
// Indicates that a read is currently pending // Indicates that a read is currently pending
var reading = false; var reading = false;
// Indicates that we can call startWrite
var can_write = false;
// Indicates that a write is currently pending // Indicates that a write is currently pending
var writing = false; var writing = false;
this._call = call; this._call = call;
/** /**
* Callback to handle receiving a READ event. Pushes the data from that event * Callback to be called when a READ event is received. Pushes the data onto
* onto the read queue and starts reading again if applicable. * the read queue and starts reading again if applicable
* @param {grpc.Event} event The READ event object * @param {grpc.Event} event READ event object
*/ */
function readCallback(event) { function readCallback(event) {
var data = event.data; if (finished) {
if (self.push(data)) { self.push(null);
if (data == null) { return;
// Disable starting to read after null read was received
can_read = false;
reading = false;
} else {
call.startRead(readCallback);
} }
var data = event.data;
if (self.push(data) && data != null) {
self._call.startRead(readCallback);
} else { } else {
// Indicate that reading can be resumed by calling startReading
reading = false; reading = false;
} }
};
/**
* Initiate a read, which continues until self.push returns false (indicating
* that reading should be paused) or data is null (indicating that there is no
* more data to read).
*/
function startReading() {
call.startRead(readCallback);
} }
// TODO(mlumish): possibly change queue implementation due to shift slowness call.invoke(function(event) {
var write_queue = [];
/**
* Write the next chunk of data in the write queue if there is one. Otherwise
* indicate that there is no pending write. When the write succeeds, this
* function is called again.
*/
function writeNext() {
if (write_queue.length > 0) {
writing = true;
var next = write_queue.shift();
var writeCallback = function(event) {
next.callback();
writeNext();
};
call.startWrite(next.chunk, writeCallback, 0);
} else {
writing = false;
}
}
call.startInvoke(function(event) {
can_read = true;
can_write = true;
startReading();
writeNext();
}, function(event) {
self.emit('metadata', event.data); self.emit('metadata', event.data);
}, function(event) { }, function(event) {
finished = true;
self.emit('status', event.data); self.emit('status', event.data);
}, 0); }, 0);
this.on('finish', function() { this.on('finish', function() {
call.writesDone(function() {}); call.writesDone(function() {});
}); });
/** /**
* Indicate that reads should start, and start them if the INVOKE_ACCEPTED * Start reading if there is not already a pending read. Reading will
* event has been received. * continue until self.push returns false (indicating reads should slow
* down) or the read data is null (indicating that there is no more data).
*/ */
this._enableRead = function() { this.startReading = function() {
if (finished) {
self.push(null);
} else {
if (!reading) { if (!reading) {
reading = true; reading = true;
if (can_read) { self._call.startRead(readCallback);
startReading();
}
} }
};
/**
* Push the chunk onto the write queue, and write from the write queue if
* there is not a pending write
* @param {Buffer} chunk The chunk of data to write
* @param {function(Error=)} callback The callback to call when the write
* completes
*/
this._tryWrite = function(chunk, callback) {
write_queue.push({chunk: chunk, callback: callback});
if (can_write && !writing) {
writeNext();
} }
}; };
} }
...@@ -153,7 +105,7 @@ function GrpcClientStream(call, options) { ...@@ -153,7 +105,7 @@ function GrpcClientStream(call, options) {
* @param {number} size Ignored * @param {number} size Ignored
*/ */
GrpcClientStream.prototype._read = function(size) { GrpcClientStream.prototype._read = function(size) {
this._enableRead(); this.startReading();
}; };
/** /**
...@@ -164,7 +116,10 @@ GrpcClientStream.prototype._read = function(size) { ...@@ -164,7 +116,10 @@ GrpcClientStream.prototype._read = function(size) {
* @param {function(Error=)} callback Ignored * @param {function(Error=)} callback Ignored
*/ */
GrpcClientStream.prototype._write = function(chunk, encoding, callback) { GrpcClientStream.prototype._write = function(chunk, encoding, callback) {
this._tryWrite(chunk, callback); var self = this;
self._call.startWrite(chunk, function(event) {
callback();
}, 0);
}; };
/** /**
......
...@@ -148,8 +148,6 @@ void InitCompletionTypeConstants(Handle<Object> exports) { ...@@ -148,8 +148,6 @@ void InitCompletionTypeConstants(Handle<Object> exports) {
completion_type->Set(NanNew("QUEUE_SHUTDOWN"), QUEUE_SHUTDOWN); completion_type->Set(NanNew("QUEUE_SHUTDOWN"), QUEUE_SHUTDOWN);
Handle<Value> READ(NanNew<Uint32, uint32_t>(GRPC_READ)); Handle<Value> READ(NanNew<Uint32, uint32_t>(GRPC_READ));
completion_type->Set(NanNew("READ"), READ); completion_type->Set(NanNew("READ"), READ);
Handle<Value> INVOKE_ACCEPTED(NanNew<Uint32, uint32_t>(GRPC_INVOKE_ACCEPTED));
completion_type->Set(NanNew("INVOKE_ACCEPTED"), INVOKE_ACCEPTED);
Handle<Value> WRITE_ACCEPTED(NanNew<Uint32, uint32_t>(GRPC_WRITE_ACCEPTED)); Handle<Value> WRITE_ACCEPTED(NanNew<Uint32, uint32_t>(GRPC_WRITE_ACCEPTED));
completion_type->Set(NanNew("WRITE_ACCEPTED"), WRITE_ACCEPTED); completion_type->Set(NanNew("WRITE_ACCEPTED"), WRITE_ACCEPTED);
Handle<Value> FINISH_ACCEPTED(NanNew<Uint32, uint32_t>(GRPC_FINISH_ACCEPTED)); Handle<Value> FINISH_ACCEPTED(NanNew<Uint32, uint32_t>(GRPC_FINISH_ACCEPTED));
......
...@@ -118,10 +118,9 @@ describe('call', function() { ...@@ -118,10 +118,9 @@ describe('call', function() {
call.addMetadata(5); call.addMetadata(5);
}, TypeError); }, TypeError);
}); });
it('should fail if startInvoke was already called', function(done) { it('should fail if invoke was already called', function(done) {
var call = new grpc.Call(channel, 'method', getDeadline(1)); var call = new grpc.Call(channel, 'method', getDeadline(1));
call.startInvoke(function() {}, call.invoke(function() {},
function() {},
function() {done();}, function() {done();},
0); 0);
assert.throws(function() { assert.throws(function() {
...@@ -133,30 +132,24 @@ describe('call', function() { ...@@ -133,30 +132,24 @@ describe('call', function() {
call.cancel(); call.cancel();
}); });
}); });
describe('startInvoke', function() { describe('invoke', function() {
it('should fail with fewer than 4 arguments', function() { it('should fail with fewer than 3 arguments', function() {
var call = new grpc.Call(channel, 'method', getDeadline(1)); var call = new grpc.Call(channel, 'method', getDeadline(1));
assert.throws(function() { assert.throws(function() {
call.startInvoke(); call.invoke();
}, TypeError); }, TypeError);
assert.throws(function() { assert.throws(function() {
call.startInvoke(function() {}); call.invoke(function() {});
}, TypeError); }, TypeError);
assert.throws(function() { assert.throws(function() {
call.startInvoke(function() {}, call.invoke(function() {},
function() {});
}, TypeError);
assert.throws(function() {
call.startInvoke(function() {},
function() {},
function() {}); function() {});
}, TypeError); }, TypeError);
}); });
it('should work with 3 args and an int', function(done) { it('should work with 2 args and an int', function(done) {
assert.doesNotThrow(function() { assert.doesNotThrow(function() {
var call = new grpc.Call(channel, 'method', getDeadline(1)); var call = new grpc.Call(channel, 'method', getDeadline(1));
call.startInvoke(function() {}, call.invoke(function() {},
function() {},
function() {done();}, function() {done();},
0); 0);
// Cancel to speed up the test // Cancel to speed up the test
...@@ -166,11 +159,10 @@ describe('call', function() { ...@@ -166,11 +159,10 @@ describe('call', function() {
it('should reject incorrectly typed arguments', function() { it('should reject incorrectly typed arguments', function() {
var call = new grpc.Call(channel, 'method', getDeadline(1)); var call = new grpc.Call(channel, 'method', getDeadline(1));
assert.throws(function() { assert.throws(function() {
call.startInvoke(0, 0, 0, 0); call.invoke(0, 0, 0);
}, TypeError); }, TypeError);
assert.throws(function() { assert.throws(function() {
call.startInvoke(function() {}, call.invoke(function() {},
function() {},
function() {}, 'test'); function() {}, 'test');
}); });
}); });
......
...@@ -94,7 +94,6 @@ var opErrorNames = [ ...@@ -94,7 +94,6 @@ var opErrorNames = [
var completionTypeNames = [ var completionTypeNames = [
'QUEUE_SHUTDOWN', 'QUEUE_SHUTDOWN',
'READ', 'READ',
'INVOKE_ACCEPTED',
'WRITE_ACCEPTED', 'WRITE_ACCEPTED',
'FINISH_ACCEPTED', 'FINISH_ACCEPTED',
'CLIENT_METADATA_READ', 'CLIENT_METADATA_READ',
......
...@@ -72,16 +72,7 @@ describe('end-to-end', function() { ...@@ -72,16 +72,7 @@ describe('end-to-end', function() {
var call = new grpc.Call(channel, var call = new grpc.Call(channel,
'dummy_method', 'dummy_method',
deadline); deadline);
call.startInvoke(function(event) { call.invoke(function(event) {
assert.strictEqual(event.type,
grpc.completionType.INVOKE_ACCEPTED);
call.writesDone(function(event) {
assert.strictEqual(event.type,
grpc.completionType.FINISH_ACCEPTED);
assert.strictEqual(event.data, grpc.opError.OK);
});
},function(event) {
assert.strictEqual(event.type, assert.strictEqual(event.type,
grpc.completionType.CLIENT_METADATA_READ); grpc.completionType.CLIENT_METADATA_READ);
},function(event) { },function(event) {
...@@ -91,6 +82,11 @@ describe('end-to-end', function() { ...@@ -91,6 +82,11 @@ describe('end-to-end', function() {
assert.strictEqual(status.details, status_text); assert.strictEqual(status.details, status_text);
done(); done();
}, 0); }, 0);
call.writesDone(function(event) {
assert.strictEqual(event.type,
grpc.completionType.FINISH_ACCEPTED);
assert.strictEqual(event.data, grpc.opError.OK);
});
server.start(); server.start();
server.requestCall(function(event) { server.requestCall(function(event) {
...@@ -131,9 +127,17 @@ describe('end-to-end', function() { ...@@ -131,9 +127,17 @@ describe('end-to-end', function() {
var call = new grpc.Call(channel, var call = new grpc.Call(channel,
'dummy_method', 'dummy_method',
deadline); deadline);
call.startInvoke(function(event) { call.invoke(function(event) {
assert.strictEqual(event.type, assert.strictEqual(event.type,
grpc.completionType.INVOKE_ACCEPTED); grpc.completionType.CLIENT_METADATA_READ);
done();
},function(event) {
assert.strictEqual(event.type, grpc.completionType.FINISHED);
var status = event.data;
assert.strictEqual(status.code, grpc.status.OK);
assert.strictEqual(status.details, status_text);
done();
}, 0);
call.startWrite( call.startWrite(
new Buffer(req_text), new Buffer(req_text),
function(event) { function(event) {
...@@ -152,17 +156,6 @@ describe('end-to-end', function() { ...@@ -152,17 +156,6 @@ describe('end-to-end', function() {
assert.strictEqual(event.data.toString(), reply_text); assert.strictEqual(event.data.toString(), reply_text);
done(); done();
}); });
},function(event) {
assert.strictEqual(event.type,
grpc.completionType.CLIENT_METADATA_READ);
done();
},function(event) {
assert.strictEqual(event.type, grpc.completionType.FINISHED);
var status = event.data;
assert.strictEqual(status.code, grpc.status.OK);
assert.strictEqual(status.details, status_text);
done();
}, 0);
server.start(); server.start();
server.requestCall(function(event) { server.requestCall(function(event) {
......
...@@ -83,9 +83,18 @@ describe('echo server', function() { ...@@ -83,9 +83,18 @@ describe('echo server', function() {
var call = new grpc.Call(channel, var call = new grpc.Call(channel,
'echo', 'echo',
deadline); deadline);
call.startInvoke(function(event) { call.invoke(function(event) {
assert.strictEqual(event.type, assert.strictEqual(event.type,
grpc.completionType.INVOKE_ACCEPTED); grpc.completionType.CLIENT_METADATA_READ);
done();
},function(event) {
assert.strictEqual(event.type, grpc.completionType.FINISHED);
var status = event.data;
assert.strictEqual(status.code, grpc.status.OK);
assert.strictEqual(status.details, status_text);
server.shutdown();
done();
}, 0);
call.startWrite( call.startWrite(
new Buffer(req_text), new Buffer(req_text),
function(event) { function(event) {
...@@ -104,18 +113,6 @@ describe('echo server', function() { ...@@ -104,18 +113,6 @@ describe('echo server', function() {
assert.strictEqual(event.data.toString(), req_text); assert.strictEqual(event.data.toString(), req_text);
done(); done();
}); });
},function(event) {
assert.strictEqual(event.type,
grpc.completionType.CLIENT_METADATA_READ);
done();
},function(event) {
assert.strictEqual(event.type, grpc.completionType.FINISHED);
var status = event.data;
assert.strictEqual(status.code, grpc.status.OK);
assert.strictEqual(status.details, status_text);
server.shutdown();
done();
}, 0);
}); });
}); });
}); });
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment