From feb77ef996b946371c4abb77fdf86f53ee968809 Mon Sep 17 00:00:00 2001
From: Craig Tiller <ctiller@google.com>
Date: Wed, 12 Apr 2017 07:10:22 -0700
Subject: [PATCH] Get cancel error right

---
 .../message_compress_filter.c                 | 46 +++++++++++--------
 1 file changed, 28 insertions(+), 18 deletions(-)

diff --git a/src/core/ext/filters/http/message_compress/message_compress_filter.c b/src/core/ext/filters/http/message_compress/message_compress_filter.c
index 61ab3d26d8..978dbd61d4 100644
--- a/src/core/ext/filters/http/message_compress/message_compress_filter.c
+++ b/src/core/ext/filters/http/message_compress/message_compress_filter.c
@@ -52,9 +52,10 @@
 int grpc_compression_trace = 0;
 
 #define INITIAL_METADATA_UNSEEN 0
-#define HAS_COMPRESSION_ALGORITHM 1
-#define NO_COMPRESSION_ALGORITHM 2
-#define CANCELLED 3
+#define HAS_COMPRESSION_ALGORITHM 2
+#define NO_COMPRESSION_ALGORITHM 4
+
+#define CANCELLED_BIT ((gpr_atm)1)
 
 typedef struct call_data {
   grpc_slice_buffer slices; /**< Buffers up input slices to be compressed */
@@ -71,9 +72,9 @@ typedef struct call_data {
                                  set
      NO_COMPRESSION_ALGORITHM - initial metadata seen; no compression algorithm
                                 set
-     CANCELLED - request was cancelled
      pointer - a stalled op containing a send_message that's waiting on initial
-               metadata */
+               metadata
+     pointer | CANCELLED_BIT - request was cancelled with error pointed to */
   gpr_atm send_initial_metadata_state;
 
   grpc_transport_stream_op_batch *send_op;
@@ -269,20 +270,23 @@ static void compress_start_transport_stream_op_batch(
 
   if (op->cancel_stream) {
     gpr_atm cur;
+    GRPC_ERROR_REF(op->payload->cancel_stream.cancel_error);
     do {
       cur = gpr_atm_acq_load(&calld->send_initial_metadata_state);
-    } while (
-        !gpr_atm_rel_cas(&calld->send_initial_metadata_state, cur, CANCELLED));
+    } while (!gpr_atm_rel_cas(
+        &calld->send_initial_metadata_state, cur,
+        CANCELLED_BIT | (gpr_atm)op->payload->cancel_stream.cancel_error));
     switch (cur) {
       case HAS_COMPRESSION_ALGORITHM:
       case NO_COMPRESSION_ALGORITHM:
       case INITIAL_METADATA_UNSEEN:
-      case CANCELLED:
         break;
       default:
-        grpc_transport_stream_op_batch_finish_with_failure(
-            exec_ctx, (grpc_transport_stream_op_batch *)cur,
-            GRPC_ERROR_CANCELLED);
+        if ((cur & CANCELLED_BIT) == 0) {
+          grpc_transport_stream_op_batch_finish_with_failure(
+              exec_ctx, (grpc_transport_stream_op_batch *)cur,
+              op->payload->cancel_stream.cancel_error);
+        }
         break;
     }
   }
@@ -300,7 +304,7 @@ static void compress_start_transport_stream_op_batch(
     gpr_atm cur = gpr_atm_acq_load(&calld->send_initial_metadata_state);
     GPR_ASSERT(cur != HAS_COMPRESSION_ALGORITHM &&
                cur != NO_COMPRESSION_ALGORITHM);
-    if (cur != CANCELLED) {
+    if ((cur & CANCELLED_BIT) == 0) {
       gpr_atm_rel_store(&calld->send_initial_metadata_state,
                         has_compression_algorithm ? HAS_COMPRESSION_ALGORITHM
                                                   : NO_COMPRESSION_ALGORITHM);
@@ -321,10 +325,6 @@ static void compress_start_transport_stream_op_batch(
           goto retry_send;
         }
         break;
-      case CANCELLED:
-        grpc_transport_stream_op_batch_finish_with_failure(
-            exec_ctx, op, GRPC_ERROR_CANCELLED);
-        break;
       case HAS_COMPRESSION_ALGORITHM:
       case NO_COMPRESSION_ALGORITHM:
         if (!skip_compression(elem,
@@ -340,8 +340,13 @@ static void compress_start_transport_stream_op_batch(
         }
         break;
       default:
-        /* >1 send_message concurrently */
-        GPR_UNREACHABLE_CODE(break);
+        if (cur & CANCELLED_BIT) {
+          grpc_transport_stream_op_batch_finish_with_failure(
+              exec_ctx, op, (grpc_error *)(cur & ~CANCELLED_BIT));
+        } else {
+          /* >1 send_message concurrently */
+          GPR_UNREACHABLE_CODE(break);
+        }
     }
   } else {
     /* pass control down the stack */
@@ -375,6 +380,11 @@ static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
   /* grab pointers to our data from the call element */
   call_data *calld = elem->call_data;
   grpc_slice_buffer_destroy_internal(exec_ctx, &calld->slices);
+  gpr_atm imstate =
+      gpr_atm_no_barrier_load(&calld->send_initial_metadata_state);
+  if (imstate & CANCELLED_BIT) {
+    GRPC_ERROR_UNREF((grpc_error *)(imstate & ~CANCELLED_BIT));
+  }
 }
 
 /* Constructor for channel_data */
-- 
GitLab