diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index 1bf6dd4c285c47edcb53473ed381635855871cd1..858cb41ec88d24709765a46e9f2d2d7710c42ab6 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -1018,10 +1018,6 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, grpc_chttp2_cancel_stream(exec_ctx, t, s, GRPC_ERROR_REF(op->cancel_error)); } - if (op->close_error != GRPC_ERROR_NONE) { - close_from_api(exec_ctx, t, s, GRPC_ERROR_REF(op->close_error)); - } - if (op->send_initial_metadata != NULL) { GPR_ASSERT(s->send_initial_metadata_finished == NULL); on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE; @@ -1461,6 +1457,12 @@ static void status_codes_from_error(grpc_error *error, gpr_timespec deadline, void grpc_chttp2_cancel_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s, grpc_error *due_to_error) { + if (!t->is_client && !s->sent_trailing_metadata && + grpc_error_get_int(due_to_error, GRPC_ERROR_INT_GRPC_STATUS, NULL)) { + close_from_api(exec_ctx, t, s, due_to_error); + return; + } + if (!s->read_closed || !s->write_closed) { grpc_status_code grpc_status; grpc_chttp2_error_code http_error; diff --git a/src/core/lib/channel/deadline_filter.c b/src/core/lib/channel/deadline_filter.c index 4e140c10f1960ae5cd6cd1cd0615bfd6ee5c96d1..bc9a2effc28c6fc87ba1c048216c95605e5f9fc1 100644 --- a/src/core/lib/channel/deadline_filter.c +++ b/src/core/lib/channel/deadline_filter.c @@ -197,8 +197,7 @@ void grpc_deadline_state_client_start_transport_stream_op( grpc_exec_ctx* exec_ctx, grpc_call_element* elem, grpc_transport_stream_op* op) { grpc_deadline_state* deadline_state = elem->call_data; - if (op->cancel_error != GRPC_ERROR_NONE || - op->close_error != GRPC_ERROR_NONE) { + if (op->cancel_error != GRPC_ERROR_NONE) { cancel_timer_if_needed(exec_ctx, deadline_state); } else { // Make sure we know when the call is complete, so that we can cancel @@ -286,8 +285,7 @@ static void server_start_transport_stream_op(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, grpc_transport_stream_op* op) { server_call_data* calld = elem->call_data; - if (op->cancel_error != GRPC_ERROR_NONE || - op->close_error != GRPC_ERROR_NONE) { + if (op->cancel_error != GRPC_ERROR_NONE) { cancel_timer_if_needed(exec_ctx, &calld->base.deadline_state); } else { // If we're receiving initial metadata, we need to get the deadline diff --git a/src/core/lib/iomgr/error.c b/src/core/lib/iomgr/error.c index e4bfa3221451c890ecbff886e2e89974e5eb8f9b..227c94020fe0bd5f0e052a6f6afb1a2557180b89 100644 --- a/src/core/lib/iomgr/error.c +++ b/src/core/lib/iomgr/error.c @@ -261,7 +261,8 @@ static grpc_error *copy_error_and_unref(grpc_error *in) { grpc_error *out; if (is_special(in)) { if (in == GRPC_ERROR_NONE) - out = GRPC_ERROR_CREATE("no error"); + out = grpc_error_set_int(GRPC_ERROR_CREATE("no error"), + GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_OK); else if (in == GRPC_ERROR_OOM) out = GRPC_ERROR_CREATE("oom"); else if (in == GRPC_ERROR_CANCELLED) diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index 1c32515c9334721962e1e7a54338b4521b9f9f3c..57603ca386c72f514ac0b3ce932207afc8445d9e 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -222,9 +222,8 @@ static void execute_op(grpc_exec_ctx *exec_ctx, grpc_call *call, static grpc_call_error cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c, grpc_status_code status, const char *description); -static grpc_call_error close_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c, - grpc_status_code status, - const char *description); +static void cancel_with_error(grpc_exec_ctx *exec_ctx, grpc_call *c, + grpc_error *error); static void destroy_call(grpc_exec_ctx *exec_ctx, void *call_stack, grpc_error *error); static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp, @@ -339,7 +338,7 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx, grpc_status_code status; const char *error_str; grpc_error_get_status(error, &status, &error_str); - close_with_status(exec_ctx, call, status, error_str); + cancel_with_status(exec_ctx, call, status, error_str); } if (args->cq != NULL) { GPR_ASSERT( @@ -528,13 +527,10 @@ grpc_call_error grpc_call_cancel_with_status(grpc_call *c, return r; } -typedef enum { TC_CANCEL, TC_CLOSE } termination_closure_type; - typedef struct termination_closure { grpc_closure closure; grpc_call *call; grpc_error *error; - termination_closure_type type; grpc_transport_stream_op op; } termination_closure; @@ -550,14 +546,7 @@ static void send_termination(grpc_exec_ctx *exec_ctx, void *tcp, grpc_error *error) { termination_closure *tc = tcp; memset(&tc->op, 0, sizeof(tc->op)); - switch (tc->type) { - case TC_CANCEL: - tc->op.cancel_error = tc->error; - break; - case TC_CLOSE: - tc->op.close_error = tc->error; - break; - } + tc->op.cancel_error = tc->error; /* reuse closure to catch completion */ grpc_closure_init(&tc->closure, done_termination, tc, grpc_schedule_on_exec_ctx); @@ -577,17 +566,19 @@ static grpc_call_error terminate_with_status(grpc_exec_ctx *exec_ctx, } static grpc_call_error terminate_with_error(grpc_exec_ctx *exec_ctx, - grpc_call *c, - termination_closure_type tc_type, - grpc_error *error) { + grpc_call *c, grpc_error *error) { termination_closure *tc = gpr_malloc(sizeof(*tc)); memset(tc, 0, sizeof(*tc)); - tc->type = tc_type; tc->call = c; tc->error = error; return terminate_with_status(exec_ctx, tc); } +static void cancel_with_error(grpc_exec_ctx *exec_ctx, grpc_call *c, + grpc_error *error) { + terminate_with_error(exec_ctx, c, error); +} + static grpc_error *error_from_status(grpc_status_code status, const char *description) { return grpc_error_set_int( @@ -599,14 +590,7 @@ static grpc_error *error_from_status(grpc_status_code status, static grpc_call_error cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c, grpc_status_code status, const char *description) { - return terminate_with_error(exec_ctx, c, TC_CANCEL, - error_from_status(status, description)); -} - -static grpc_call_error close_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c, - grpc_status_code status, - const char *description) { - return terminate_with_error(exec_ctx, c, TC_CLOSE, + return terminate_with_error(exec_ctx, c, error_from_status(status, description)); } @@ -927,7 +911,7 @@ grpc_call_stack *grpc_call_get_call_stack(grpc_call *call) { return CALL_STACK_FROM_CALL(call); } -/* +/******************************************************************************* * BATCH API IMPLEMENTATION */ @@ -1141,10 +1125,7 @@ static void receiving_stream_ready(grpc_exec_ctx *exec_ctx, void *bctlp, batch_control *bctl = bctlp; grpc_call *call = bctl->call; if (error != GRPC_ERROR_NONE) { - grpc_status_code status; - const char *msg; - grpc_error_get_status(error, &status, &msg); - close_with_status(exec_ctx, call, status, msg); + cancel_with_error(exec_ctx, call, GRPC_ERROR_REF(error)); } gpr_mu_lock(&bctl->call->mu); if (bctl->call->has_initial_md_been_received || error != GRPC_ERROR_NONE || @@ -1172,7 +1153,7 @@ static void validate_filtered_metadata(grpc_exec_ctx *exec_ctx, gpr_asprintf(&error_msg, "Invalid compression algorithm value '%d'.", algo); gpr_log(GPR_ERROR, "%s", error_msg); - close_with_status(exec_ctx, call, GRPC_STATUS_UNIMPLEMENTED, error_msg); + cancel_with_status(exec_ctx, call, GRPC_STATUS_UNIMPLEMENTED, error_msg); } else if (grpc_compression_options_is_algorithm_enabled( &compression_options, algo) == 0) { /* check if algorithm is supported by current channel config */ @@ -1181,7 +1162,7 @@ static void validate_filtered_metadata(grpc_exec_ctx *exec_ctx, gpr_asprintf(&error_msg, "Compression algorithm '%s' is disabled.", algo_name); gpr_log(GPR_ERROR, "%s", error_msg); - close_with_status(exec_ctx, call, GRPC_STATUS_UNIMPLEMENTED, error_msg); + cancel_with_status(exec_ctx, call, GRPC_STATUS_UNIMPLEMENTED, error_msg); } else { call->incoming_compression_algorithm = algo; } diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h index 5a92226942dae7dda2feae79f62c4ffadf66ab42..2dbb842a53efdf94ac61caa0fb20d96d58717af7 100644 --- a/src/core/lib/transport/transport.h +++ b/src/core/lib/transport/transport.h @@ -150,13 +150,18 @@ typedef struct grpc_transport_stream_op { /** Collect any stats into provided buffer, zero internal stat counters */ grpc_transport_stream_stats *collect_stats; - /** If != GRPC_ERROR_NONE, cancel this stream */ + /** If != GRPC_ERROR_NONE, forcefully close this stream. + The HTTP2 semantics should be: + - server side: if cancel_error has GRPC_ERROR_INT_GRPC_STATUS, and + trailing metadata has not been sent, send trailing metadata with status + and message from cancel_error (use grpc_error_get_status) followed by + a RST_STREAM with error=GRPC_CHTTP2_NO_ERROR to force a full close + - at all other times: use grpc_error_get_status to get a status code, and + convert to a HTTP2 error code using + grpc_chttp2_grpc_status_to_http2_error. Send a RST_STREAM with this + error. */ grpc_error *cancel_error; - /** If != GRPC_ERROR_NONE, send grpc-status, grpc-message, and close this - stream for both reading and writing */ - grpc_error *close_error; - /* Indexes correspond to grpc_context_index enum values */ grpc_call_context_element *context; diff --git a/src/core/lib/transport/transport_op_string.c b/src/core/lib/transport/transport_op_string.c index b36e4f22d0f3860e8725c7de7c389864718dd121..33bd35bd456543370342d972f5790c10a9a28522 100644 --- a/src/core/lib/transport/transport_op_string.c +++ b/src/core/lib/transport/transport_op_string.c @@ -125,14 +125,6 @@ char *grpc_transport_stream_op_string(grpc_transport_stream_op *op) { gpr_strvec_add(&b, tmp); } - if (op->close_error != GRPC_ERROR_NONE) { - gpr_strvec_add(&b, gpr_strdup(" ")); - const char *msg = grpc_error_string(op->close_error); - gpr_asprintf(&tmp, "CLOSE:%s", msg); - grpc_error_free_string(msg); - gpr_strvec_add(&b, tmp); - } - out = gpr_strvec_flatten(&b, NULL); gpr_strvec_destroy(&b); diff --git a/test/core/end2end/tests/filter_causes_close.c b/test/core/end2end/tests/filter_causes_close.c index 8afd6815050bf2aa94d20d31be17d83baeb21cd2..114fd71d0581f17fad8869085fb1427744d9145a 100644 --- a/test/core/end2end/tests/filter_causes_close.c +++ b/test/core/end2end/tests/filter_causes_close.c @@ -210,7 +210,9 @@ static void recv_im_ready(grpc_exec_ctx *exec_ctx, void *arg, call_data *calld = elem->call_data; grpc_closure_sched( exec_ctx, calld->recv_im_ready, - GRPC_ERROR_CREATE_REFERENCING("Forced call to close", &error, 1)); + grpc_error_set_int( + GRPC_ERROR_CREATE_REFERENCING("Forced call to close", &error, 1), + GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_PERMISSION_DENIED)); } static void start_transport_stream_op(grpc_exec_ctx *exec_ctx,