diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index dd8eaa943e5f53eabc07724acf9d7855815e6816..549ef8ca0da408f14e011b3e254481ddfc5aaa42 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -156,6 +156,9 @@ struct grpc_call {
   gpr_uint8 reading_message;
   /* have we bound a pollset yet? */
   gpr_uint8 bound_pollset;
+  /* is an error status set */
+  gpr_uint8 error_status_set;
+
   /* flags with bits corresponding to write states allowing us to determine
      what was sent */
   gpr_uint16 last_send_contains;
@@ -214,7 +217,7 @@ struct grpc_call {
   /* Received call statuses from various sources */
   received_status status[STATUS_SOURCE_COUNT];
 
-  /** Compression level for the call */
+  /* Compression level for the call */
   grpc_compression_level compression_level;
 
   /* Contexts for various subsystems (security, tracing, ...). */
@@ -409,6 +412,7 @@ static void set_status_code(grpc_call *call, status_source source,
 
   call->status[source].is_set = 1;
   call->status[source].code = status;
+  call->error_status_set = status != GRPC_STATUS_OK;
 
   if (status != GRPC_STATUS_OK && !grpc_bbq_empty(&call->incoming_queue)) {
     grpc_bbq_flush(&call->incoming_queue);
@@ -686,13 +690,13 @@ static void call_on_done_send(void *pc, int success) {
 }
 
 static void finish_message(grpc_call *call) {
-  /* TODO(ctiller): this could be a lot faster if coded directly */
-  grpc_byte_buffer *byte_buffer = grpc_raw_byte_buffer_create(
-      call->incoming_message.slices, call->incoming_message.count);
+  if (call->error_status_set == 0) {
+    /* TODO(ctiller): this could be a lot faster if coded directly */
+    grpc_byte_buffer *byte_buffer = grpc_raw_byte_buffer_create(
+        call->incoming_message.slices, call->incoming_message.count);
+    grpc_bbq_push(&call->incoming_queue, byte_buffer);
+  }
   gpr_slice_buffer_reset_and_unref(&call->incoming_message);
-
-  grpc_bbq_push(&call->incoming_queue, byte_buffer);
-
   GPR_ASSERT(call->incoming_message.count == 0);
   call->reading_message = 0;
 }
diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc
index 45ba8b0878b4f64d1bee61a65aaa57dc31a44b59..5e850ea30af5c19a7cfff0c1fc20184c6d20924b 100644
--- a/test/cpp/end2end/end2end_test.cc
+++ b/test/cpp/end2end/end2end_test.cc
@@ -68,6 +68,8 @@ namespace testing {
 
 namespace {
 
+const char* kServerCancelAfterReads = "cancel_after_reads";
+
 // When echo_deadline is requested, deadline seen in the ServerContext is set in
 // the response in seconds.
 void MaybeEchoDeadline(ServerContext* context, const EchoRequest* request,
@@ -131,7 +133,23 @@ class TestServiceImpl : public ::grpc::cpp::test::util::TestService::Service {
                        EchoResponse* response) GRPC_OVERRIDE {
     EchoRequest request;
     response->set_message("");
+    int cancel_after_reads = 0;
+    const std::multimap<grpc::string, grpc::string> client_initial_metadata =
+        context->client_metadata();
+    if (client_initial_metadata.find(kServerCancelAfterReads) !=
+        client_initial_metadata.end()) {
+      std::istringstream iss(
+          client_initial_metadata.find(kServerCancelAfterReads)->second);
+      iss >> cancel_after_reads;
+      gpr_log(GPR_INFO, "cancel_after_reads %d", cancel_after_reads);
+    }
     while (reader->Read(&request)) {
+      if (cancel_after_reads == 1) {
+        gpr_log(GPR_INFO, "return cancel status");
+        return Status::CANCELLED;
+      } else if (cancel_after_reads > 0) {
+        cancel_after_reads--;
+      }
       response->mutable_message()->append(request.message());
     }
     return Status::OK;
@@ -687,6 +705,27 @@ TEST_F(End2endTest, OverridePerCallCredentials) {
   EXPECT_TRUE(s.ok());
 }
 
+// Client sends 20 requests and the server returns CANCELLED status after
+// reading 10 requests.
+TEST_F(End2endTest, RequestStreamServerEarlyCancelTest) {
+  ResetStub();
+  EchoRequest request;
+  EchoResponse response;
+  ClientContext context;
+
+  context.AddMetadata(kServerCancelAfterReads, "10");
+  auto stream = stub_->RequestStream(&context, &response);
+  request.set_message("hello");
+  int send_messages = 20;
+  while (send_messages > 0) {
+    EXPECT_TRUE(stream->Write(request));
+    send_messages--;
+  }
+  stream->WritesDone();
+  Status s = stream->Finish();
+  EXPECT_EQ(s.error_code(), StatusCode::CANCELLED);
+}
+
 }  // namespace testing
 }  // namespace grpc