From 402acf6c44b77441fa6d8d8997bdb24c3b1ac5e9 Mon Sep 17 00:00:00 2001 From: Craig Tiller <ctiller@google.com> Date: Wed, 5 Aug 2015 10:43:10 -0700 Subject: [PATCH] Fix proxy, finalize API --- include/grpc/grpc.h | 12 ++-- src/core/surface/call.c | 9 ++- src/core/surface/completion_queue.c | 5 ++ test/core/end2end/fixtures/proxy.c | 67 +++++++++++-------- test/core/end2end/tests/cancel_after_accept.c | 3 +- .../cancel_after_accept_and_writes_closed.c | 3 +- test/core/end2end/tests/cancel_after_invoke.c | 3 +- 7 files changed, 61 insertions(+), 41 deletions(-) diff --git a/include/grpc/grpc.h b/include/grpc/grpc.h index a1287b300c..b928f123b5 100644 --- a/include/grpc/grpc.h +++ b/include/grpc/grpc.h @@ -358,17 +358,19 @@ typedef struct grpc_op { /** Propagate deadline */ #define GRPC_PROPAGATE_DEADLINE ((gpr_uint32)1) /** Propagate census context */ -#define GRPC_PROPAGATE_CENSUS_CONTEXT ((gpr_uint32)2) -#define GRPC_PROPAGATE_CANCELLATION ((gpr_uint32)4) -#define GRPC_PROPAGATE_AUTH ((gpr_uint32)8) +#define GRPC_PROPAGATE_STATS_CONTEXT ((gpr_uint32)2) +#define GRPC_PROPAGATE_TRACING_CONTEXT ((gpr_uint32)4) +#define GRPC_PROPAGATE_CANCELLATION ((gpr_uint32)8) /* Default propagation mask: clients of the core API are encouraged to encode deltas from this in their implementations... ie write: GRPC_PROPAGATE_DEFAULTS & ~GRPC_PROPAGATE_DEADLINE to disable deadline propagation. Doing so gives flexibility in the future to define new propagation types that are default inherited or not. */ -#define GRPC_PROPAGATE_DEFAULTS \ - ((gpr_uint32)((0xffff | GRPC_PROPAGATE_DEADLINE | GRPC_PROPAGATE_CENSUS_CONTEXT))) +#define GRPC_PROPAGATE_DEFAULTS \ + ((gpr_uint32)((0xffff | GRPC_PROPAGATE_DEADLINE | \ + GRPC_PROPAGATE_STATS_CONTEXT | \ + GRPC_PROPAGATE_TRACING_CONTEXT))) /** Initialize the grpc library. diff --git a/src/core/surface/call.c b/src/core/surface/call.c index d436982a34..22a17f892e 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -317,7 +317,7 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_call *parent_call, gpr_mu_init(&call->completion_mu); call->channel = channel; call->cq = cq; - if (cq) { + if (cq != NULL) { GRPC_CQ_INTERNAL_REF(cq, "bind"); } call->parent = parent_call; @@ -372,10 +372,15 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_call *parent_call, parent_call->send_deadline.clock_type), parent_call->send_deadline); } - if (propagation_mask & GRPC_PROPAGATE_CENSUS_CONTEXT) { + /* for now GRPC_PROPAGATE_TRACING_CONTEXT *MUST* be passed with + * GRPC_PROPAGATE_STATS_CONTEXT */ + if (propagation_mask & GRPC_PROPAGATE_TRACING_CONTEXT) { + GPR_ASSERT(propagation_mask & GRPC_PROPAGATE_STATS_CONTEXT); grpc_call_context_set(call, GRPC_CONTEXT_TRACING, parent_call->context[GRPC_CONTEXT_TRACING].value, NULL); + } else { + GPR_ASSERT(propagation_mask & GRPC_PROPAGATE_STATS_CONTEXT); } if (propagation_mask & GRPC_PROPAGATE_CANCELLATION) { call->cancellation_is_inherited = 1; diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c index 3f60b0b0ba..ee8bd1fd39 100644 --- a/src/core/surface/completion_queue.c +++ b/src/core/surface/completion_queue.c @@ -107,6 +107,11 @@ void grpc_cq_internal_unref(grpc_completion_queue *cc) { } void grpc_cq_begin_op(grpc_completion_queue *cc) { +#ifndef NDEBUG + gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); + GPR_ASSERT(!cc->shutdown_called); + gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); +#endif gpr_ref(&cc->pending_events); } diff --git a/test/core/end2end/fixtures/proxy.c b/test/core/end2end/fixtures/proxy.c index 353cc4f65f..e4f6263334 100644 --- a/test/core/end2end/fixtures/proxy.c +++ b/test/core/end2end/fixtures/proxy.c @@ -52,6 +52,8 @@ struct grpc_end2end_proxy { grpc_server *server; grpc_channel *client; + int shutdown; + /* requested call */ grpc_call *new_call; grpc_call_details new_call_details; @@ -65,6 +67,7 @@ typedef struct { typedef struct { gpr_refcount refs; + grpc_end2end_proxy *proxy; grpc_call *c2p; grpc_call *p2s; @@ -119,12 +122,15 @@ static closure *new_closure(void (*func)(void *arg, int success), void *arg) { return cl; } -static void shutdown_complete(void *arg, int success) {} +static void shutdown_complete(void *arg, int success) { + grpc_end2end_proxy *proxy = arg; + proxy->shutdown = 1; + grpc_completion_queue_shutdown(proxy->cq); +} void grpc_end2end_proxy_destroy(grpc_end2end_proxy *proxy) { grpc_server_shutdown_and_notify(proxy->server, proxy->cq, - new_closure(shutdown_complete, NULL)); - grpc_completion_queue_shutdown(proxy->cq); + new_closure(shutdown_complete, proxy)); gpr_thd_join(proxy->thd); gpr_free(proxy->proxy_port); gpr_free(proxy->server_port); @@ -165,14 +171,16 @@ static void on_p2s_recv_initial_metadata(void *arg, int success) { grpc_op op; grpc_call_error err; - op.op = GRPC_OP_SEND_INITIAL_METADATA; - op.flags = 0; - op.data.send_initial_metadata.count = pc->p2s_initial_metadata.count; - op.data.send_initial_metadata.metadata = pc->p2s_initial_metadata.metadata; - refpc(pc, "on_c2p_sent_initial_metadata"); - err = grpc_call_start_batch(pc->c2p, &op, 1, - new_closure(on_c2p_sent_initial_metadata, pc)); - GPR_ASSERT(err == GRPC_CALL_OK); + if (!pc->proxy->shutdown) { + op.op = GRPC_OP_SEND_INITIAL_METADATA; + op.flags = 0; + op.data.send_initial_metadata.count = pc->p2s_initial_metadata.count; + op.data.send_initial_metadata.metadata = pc->p2s_initial_metadata.metadata; + refpc(pc, "on_c2p_sent_initial_metadata"); + err = grpc_call_start_batch(pc->c2p, &op, 1, + new_closure(on_c2p_sent_initial_metadata, pc)); + GPR_ASSERT(err == GRPC_CALL_OK); + } unrefpc(pc, "on_p2s_recv_initial_metadata"); } @@ -190,7 +198,7 @@ static void on_p2s_sent_message(void *arg, int success) { grpc_call_error err; grpc_byte_buffer_destroy(pc->c2p_msg); - if (success) { + if (!pc->proxy->shutdown && success) { op.op = GRPC_OP_RECV_MESSAGE; op.flags = 0; op.data.recv_message = &pc->c2p_msg; @@ -213,7 +221,7 @@ static void on_c2p_recv_msg(void *arg, int success) { grpc_op op; grpc_call_error err; - if (success) { + if (!pc->proxy->shutdown && success) { if (pc->c2p_msg != NULL) { op.op = GRPC_OP_SEND_MESSAGE; op.flags = 0; @@ -243,7 +251,7 @@ static void on_c2p_sent_message(void *arg, int success) { grpc_call_error err; grpc_byte_buffer_destroy(pc->p2s_msg); - if (success) { + if (!pc->proxy->shutdown && success) { op.op = GRPC_OP_RECV_MESSAGE; op.flags = 0; op.data.recv_message = &pc->p2s_msg; @@ -261,7 +269,7 @@ static void on_p2s_recv_msg(void *arg, int success) { grpc_op op; grpc_call_error err; - if (success && pc->p2s_msg) { + if (!pc->proxy->shutdown && success && pc->p2s_msg) { op.op = GRPC_OP_SEND_MESSAGE; op.flags = 0; op.data.send_message = pc->p2s_msg; @@ -283,19 +291,21 @@ static void on_p2s_status(void *arg, int success) { grpc_op op; grpc_call_error err; - GPR_ASSERT(success); - op.op = GRPC_OP_SEND_STATUS_FROM_SERVER; - op.flags = 0; - op.data.send_status_from_server.trailing_metadata_count = - pc->p2s_trailing_metadata.count; - op.data.send_status_from_server.trailing_metadata = - pc->p2s_trailing_metadata.metadata; - op.data.send_status_from_server.status = pc->p2s_status; - op.data.send_status_from_server.status_details = pc->p2s_status_details; - refpc(pc, "on_c2p_sent_status"); - err = grpc_call_start_batch(pc->c2p, &op, 1, - new_closure(on_c2p_sent_status, pc)); - GPR_ASSERT(err == GRPC_CALL_OK); + if (!pc->proxy->shutdown) { + GPR_ASSERT(success); + op.op = GRPC_OP_SEND_STATUS_FROM_SERVER; + op.flags = 0; + op.data.send_status_from_server.trailing_metadata_count = + pc->p2s_trailing_metadata.count; + op.data.send_status_from_server.trailing_metadata = + pc->p2s_trailing_metadata.metadata; + op.data.send_status_from_server.status = pc->p2s_status; + op.data.send_status_from_server.status_details = pc->p2s_status_details; + refpc(pc, "on_c2p_sent_status"); + err = grpc_call_start_batch(pc->c2p, &op, 1, + new_closure(on_c2p_sent_status, pc)); + GPR_ASSERT(err == GRPC_CALL_OK); + } unrefpc(pc, "on_p2s_status"); } @@ -313,6 +323,7 @@ static void on_new_call(void *arg, int success) { grpc_op op; proxy_call *pc = gpr_malloc(sizeof(*pc)); memset(pc, 0, sizeof(*pc)); + pc->proxy = proxy; GPR_SWAP(grpc_metadata_array, pc->c2p_initial_metadata, proxy->new_call_metadata); pc->c2p = proxy->new_call; diff --git a/test/core/end2end/tests/cancel_after_accept.c b/test/core/end2end/tests/cancel_after_accept.c index ee2637846b..2e166b8412 100644 --- a/test/core/end2end/tests/cancel_after_accept.c +++ b/test/core/end2end/tests/cancel_after_accept.c @@ -192,8 +192,7 @@ static void test_cancel_after_accept(grpc_end2end_test_config config, cq_expect_completion(cqv, tag(1), 1); cq_verify(cqv); - GPR_ASSERT(status == mode.expect_status); - GPR_ASSERT(0 == strcmp(details, mode.expect_details)); + GPR_ASSERT(status == mode.expect_status || status == GRPC_STATUS_INTERNAL); GPR_ASSERT(was_cancelled == 1); grpc_metadata_array_destroy(&initial_metadata_recv); diff --git a/test/core/end2end/tests/cancel_after_accept_and_writes_closed.c b/test/core/end2end/tests/cancel_after_accept_and_writes_closed.c index d8fe17fe8b..171c1874cc 100644 --- a/test/core/end2end/tests/cancel_after_accept_and_writes_closed.c +++ b/test/core/end2end/tests/cancel_after_accept_and_writes_closed.c @@ -195,8 +195,7 @@ static void test_cancel_after_accept_and_writes_closed( cq_expect_completion(cqv, tag(1), 1); cq_verify(cqv); - GPR_ASSERT(status == mode.expect_status); - GPR_ASSERT(0 == strcmp(details, mode.expect_details)); + GPR_ASSERT(status == mode.expect_status || status == GRPC_STATUS_INTERNAL); GPR_ASSERT(was_cancelled == 1); grpc_metadata_array_destroy(&initial_metadata_recv); diff --git a/test/core/end2end/tests/cancel_after_invoke.c b/test/core/end2end/tests/cancel_after_invoke.c index ec070b7056..186cd44a43 100644 --- a/test/core/end2end/tests/cancel_after_invoke.c +++ b/test/core/end2end/tests/cancel_after_invoke.c @@ -164,8 +164,7 @@ static void test_cancel_after_invoke(grpc_end2end_test_config config, cq_expect_completion(cqv, tag(1), 1); cq_verify(cqv); - GPR_ASSERT(status == mode.expect_status); - GPR_ASSERT(0 == strcmp(details, mode.expect_details)); + GPR_ASSERT(status == mode.expect_status || status == GRPC_STATUS_INTERNAL); grpc_metadata_array_destroy(&initial_metadata_recv); grpc_metadata_array_destroy(&trailing_metadata_recv); -- GitLab