From fe6b328a1b93df552a4f2b42746c25092cc7bcc2 Mon Sep 17 00:00:00 2001
From: Michael Lumish <mlumish@google.com>
Date: Tue, 7 Feb 2017 11:20:16 -0800
Subject: [PATCH] Node: refactor non-uv completion queue wrapping code

---
 src/node/ext/completion_queue_threadpool.cc | 44 +++++++--------------
 1 file changed, 15 insertions(+), 29 deletions(-)

diff --git a/src/node/ext/completion_queue_threadpool.cc b/src/node/ext/completion_queue_threadpool.cc
index 6302e7a103..4881542f2d 100644
--- a/src/node/ext/completion_queue_threadpool.cc
+++ b/src/node/ext/completion_queue_threadpool.cc
@@ -78,6 +78,8 @@ class CompletionQueueAsyncWorker : public Nan::AsyncWorker {
   void HandleErrorCallback();
 
  private:
+  static void TryAddWorker();
+
   grpc_event result;
 
   static grpc_completion_queue *queue;
@@ -118,20 +120,21 @@ void CompletionQueueAsyncWorker::Execute() {
 
 grpc_completion_queue *CompletionQueueAsyncWorker::GetQueue() { return queue; }
 
-void CompletionQueueAsyncWorker::Next() {
-#ifndef GRPC_UV
-  Nan::HandleScope scope;
-  if (current_threads < max_queue_threads) {
+void CompletionQueueAsyncWorker::TryAddWorker() {
+  if (current_threads < max_queue_threads && waiting_next_calls > 0) {
     current_threads += 1;
+    waiting_next_calls -= 1;
     CompletionQueueAsyncWorker *worker = new CompletionQueueAsyncWorker();
     Nan::AsyncQueueWorker(worker);
-  } else {
-    waiting_next_calls += 1;
   }
   GPR_ASSERT(current_threads <= max_queue_threads);
   GPR_ASSERT((current_threads == max_queue_threads) ||
              (waiting_next_calls == 0));
-#endif
+}
+
+void CompletionQueueAsyncWorker::Next() {
+  waiting_next_calls += 1;
+  TryAddWorker();
 }
 
 void CompletionQueueAsyncWorker::Init(Local<Object> exports) {
@@ -143,17 +146,8 @@ void CompletionQueueAsyncWorker::Init(Local<Object> exports) {
 
 void CompletionQueueAsyncWorker::HandleOKCallback() {
   Nan::HandleScope scope;
-  if (waiting_next_calls > 0) {
-    waiting_next_calls -= 1;
-    // Old worker removed, new worker added. current_threads += 0
-    CompletionQueueAsyncWorker *worker = new CompletionQueueAsyncWorker();
-    Nan::AsyncQueueWorker(worker);
-  } else {
-    current_threads -= 1;
-  }
-  GPR_ASSERT(current_threads <= max_queue_threads);
-  GPR_ASSERT((current_threads == max_queue_threads) ||
-             (waiting_next_calls == 0));
+  current_threads -= 1;
+  TryAddWorker();
   Nan::Callback *callback = GetTagCallback(result.tag);
   Local<Value> argv[] = {Nan::Null(), GetTagNodeValue(result.tag)};
   callback->Call(2, argv);
@@ -162,18 +156,9 @@ void CompletionQueueAsyncWorker::HandleOKCallback() {
 }
 
 void CompletionQueueAsyncWorker::HandleErrorCallback() {
-  if (waiting_next_calls > 0) {
-    waiting_next_calls -= 1;
-    // Old worker removed, new worker added. current_threads += 0
-    CompletionQueueAsyncWorker *worker = new CompletionQueueAsyncWorker();
-    Nan::AsyncQueueWorker(worker);
-  } else {
-    current_threads -= 1;
-  }
-  GPR_ASSERT(current_threads <= max_queue_threads);
-  GPR_ASSERT((current_threads == max_queue_threads) ||
-             (waiting_next_calls == 0));
   Nan::HandleScope scope;
+  current_threads -= 1;
+  TryAddWorker();
   Nan::Callback *callback = GetTagCallback(result.tag);
   Local<Value> argv[] = {Nan::Error(ErrorMessage())};
 
@@ -189,6 +174,7 @@ grpc_completion_queue *GetCompletionQueue() {
 }
 
 void CompletionQueueNext() {
+  gpr_log(GPR_DEBUG, "Called CompletionQueueNext");
   CompletionQueueAsyncWorker::Next();
 }
 
-- 
GitLab