Skip to content
Snippets Groups Projects
Commit fe6b328a authored by Michael Lumish's avatar Michael Lumish
Browse files

Node: refactor non-uv completion queue wrapping code

parent 2b3e12ce
No related branches found
No related tags found
No related merge requests found
...@@ -78,6 +78,8 @@ class CompletionQueueAsyncWorker : public Nan::AsyncWorker { ...@@ -78,6 +78,8 @@ class CompletionQueueAsyncWorker : public Nan::AsyncWorker {
void HandleErrorCallback(); void HandleErrorCallback();
private: private:
static void TryAddWorker();
grpc_event result; grpc_event result;
static grpc_completion_queue *queue; static grpc_completion_queue *queue;
...@@ -118,20 +120,21 @@ void CompletionQueueAsyncWorker::Execute() { ...@@ -118,20 +120,21 @@ void CompletionQueueAsyncWorker::Execute() {
grpc_completion_queue *CompletionQueueAsyncWorker::GetQueue() { return queue; } grpc_completion_queue *CompletionQueueAsyncWorker::GetQueue() { return queue; }
void CompletionQueueAsyncWorker::Next() { void CompletionQueueAsyncWorker::TryAddWorker() {
#ifndef GRPC_UV if (current_threads < max_queue_threads && waiting_next_calls > 0) {
Nan::HandleScope scope;
if (current_threads < max_queue_threads) {
current_threads += 1; current_threads += 1;
waiting_next_calls -= 1;
CompletionQueueAsyncWorker *worker = new CompletionQueueAsyncWorker(); CompletionQueueAsyncWorker *worker = new CompletionQueueAsyncWorker();
Nan::AsyncQueueWorker(worker); Nan::AsyncQueueWorker(worker);
} else {
waiting_next_calls += 1;
} }
GPR_ASSERT(current_threads <= max_queue_threads); GPR_ASSERT(current_threads <= max_queue_threads);
GPR_ASSERT((current_threads == max_queue_threads) || GPR_ASSERT((current_threads == max_queue_threads) ||
(waiting_next_calls == 0)); (waiting_next_calls == 0));
#endif }
void CompletionQueueAsyncWorker::Next() {
waiting_next_calls += 1;
TryAddWorker();
} }
void CompletionQueueAsyncWorker::Init(Local<Object> exports) { void CompletionQueueAsyncWorker::Init(Local<Object> exports) {
...@@ -143,17 +146,8 @@ void CompletionQueueAsyncWorker::Init(Local<Object> exports) { ...@@ -143,17 +146,8 @@ void CompletionQueueAsyncWorker::Init(Local<Object> exports) {
void CompletionQueueAsyncWorker::HandleOKCallback() { void CompletionQueueAsyncWorker::HandleOKCallback() {
Nan::HandleScope scope; Nan::HandleScope scope;
if (waiting_next_calls > 0) { current_threads -= 1;
waiting_next_calls -= 1; TryAddWorker();
// Old worker removed, new worker added. current_threads += 0
CompletionQueueAsyncWorker *worker = new CompletionQueueAsyncWorker();
Nan::AsyncQueueWorker(worker);
} else {
current_threads -= 1;
}
GPR_ASSERT(current_threads <= max_queue_threads);
GPR_ASSERT((current_threads == max_queue_threads) ||
(waiting_next_calls == 0));
Nan::Callback *callback = GetTagCallback(result.tag); Nan::Callback *callback = GetTagCallback(result.tag);
Local<Value> argv[] = {Nan::Null(), GetTagNodeValue(result.tag)}; Local<Value> argv[] = {Nan::Null(), GetTagNodeValue(result.tag)};
callback->Call(2, argv); callback->Call(2, argv);
...@@ -162,18 +156,9 @@ void CompletionQueueAsyncWorker::HandleOKCallback() { ...@@ -162,18 +156,9 @@ void CompletionQueueAsyncWorker::HandleOKCallback() {
} }
void CompletionQueueAsyncWorker::HandleErrorCallback() { void CompletionQueueAsyncWorker::HandleErrorCallback() {
if (waiting_next_calls > 0) {
waiting_next_calls -= 1;
// Old worker removed, new worker added. current_threads += 0
CompletionQueueAsyncWorker *worker = new CompletionQueueAsyncWorker();
Nan::AsyncQueueWorker(worker);
} else {
current_threads -= 1;
}
GPR_ASSERT(current_threads <= max_queue_threads);
GPR_ASSERT((current_threads == max_queue_threads) ||
(waiting_next_calls == 0));
Nan::HandleScope scope; Nan::HandleScope scope;
current_threads -= 1;
TryAddWorker();
Nan::Callback *callback = GetTagCallback(result.tag); Nan::Callback *callback = GetTagCallback(result.tag);
Local<Value> argv[] = {Nan::Error(ErrorMessage())}; Local<Value> argv[] = {Nan::Error(ErrorMessage())};
...@@ -189,6 +174,7 @@ grpc_completion_queue *GetCompletionQueue() { ...@@ -189,6 +174,7 @@ grpc_completion_queue *GetCompletionQueue() {
} }
void CompletionQueueNext() { void CompletionQueueNext() {
gpr_log(GPR_DEBUG, "Called CompletionQueueNext");
CompletionQueueAsyncWorker::Next(); CompletionQueueAsyncWorker::Next();
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment