diff --git a/Makefile b/Makefile index e390a3efe636fa3087c8834d65900ff14a606145..3601a4dd1c947caaecb3b552ec9228f37e6b374e 100644 --- a/Makefile +++ b/Makefile @@ -8076,13 +8076,24 @@ $(BINDIR)/$(CONFIG)/qps_client_async: openssl_dep_error else -$(BINDIR)/$(CONFIG)/qps_client_async: $(QPS_CLIENT_ASYNC_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a + +ifeq ($(NO_PROTOBUF),true) + +# You can't build the protoc plugins or protobuf-enabled targets if you don't have protobuf 3.0.0+. + +$(BINDIR)/$(CONFIG)/qps_client_async: protobuf_dep_error + +else + +$(BINDIR)/$(CONFIG)/qps_client_async: $(PROTOBUF_DEP) $(QPS_CLIENT_ASYNC_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a $(E) "[LD] Linking $@" $(Q) mkdir -p `dirname $@` $(Q) $(LDXX) $(LDFLAGS) $(QPS_CLIENT_ASYNC_OBJS) $(GTEST_LIB) $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LDLIBSXX) $(LDLIBS_PROTOBUF) $(LDLIBS) $(LDLIBS_SECURE) -o $(BINDIR)/$(CONFIG)/qps_client_async endif +endif + $(OBJDIR)/$(CONFIG)/test/cpp/qps/qpstest.o: $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a $(OBJDIR)/$(CONFIG)/test/cpp/qps/client_async.o: $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a @@ -8153,13 +8164,24 @@ $(BINDIR)/$(CONFIG)/qps_server_async: openssl_dep_error else -$(BINDIR)/$(CONFIG)/qps_server_async: $(QPS_SERVER_ASYNC_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a + +ifeq ($(NO_PROTOBUF),true) + +# You can't build the protoc plugins or protobuf-enabled targets if you don't have protobuf 3.0.0+. + +$(BINDIR)/$(CONFIG)/qps_server_async: protobuf_dep_error + +else + +$(BINDIR)/$(CONFIG)/qps_server_async: $(PROTOBUF_DEP) $(QPS_SERVER_ASYNC_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a $(E) "[LD] Linking $@" $(Q) mkdir -p `dirname $@` $(Q) $(LDXX) $(LDFLAGS) $(QPS_SERVER_ASYNC_OBJS) $(GTEST_LIB) $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LDLIBSXX) $(LDLIBS_PROTOBUF) $(LDLIBS) $(LDLIBS_SECURE) -o $(BINDIR)/$(CONFIG)/qps_server_async endif +endif + $(OBJDIR)/$(CONFIG)/test/cpp/qps/qpstest.o: $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a $(OBJDIR)/$(CONFIG)/test/cpp/qps/server_async.o: $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a diff --git a/build.json b/build.json index 6be1d537ffa8142ea80b68dd180ade650c8d598c..2bbb012fbdbd2e78c8a0ea8fb2573862fb0b7b86 100644 --- a/build.json +++ b/build.json @@ -572,6 +572,7 @@ }, { "name": "census_statistics_multiple_writers_circular_buffer_test", + "flaky": true, "build": "test", "language": "c", "src": [ @@ -582,8 +583,7 @@ "grpc", "gpr_test_util", "gpr" - ], - "flaky": true + ] }, { "name": "census_statistics_multiple_writers_test", @@ -629,6 +629,7 @@ }, { "name": "census_statistics_small_log_test", + "flaky": true, "build": "test", "language": "c", "src": [ @@ -639,8 +640,7 @@ "grpc", "gpr_test_util", "gpr" - ], - "flaky": true + ] }, { "name": "census_stats_store_test", @@ -868,8 +868,7 @@ "grpc", "gpr_test_util", "gpr" - ], - "flaky": true + ] }, { "name": "fling_test", @@ -883,8 +882,7 @@ "grpc", "gpr_test_util", "gpr" - ], - "flaky": true + ] }, { "name": "gen_hpack_tables", diff --git a/src/core/channel/connected_channel.c b/src/core/channel/connected_channel.c index fa186551648e3a1839504b43d929ae1c7dc8bf48..62611e08f318f733162bcb7714cc40607bdef816 100644 --- a/src/core/channel/connected_channel.c +++ b/src/core/channel/connected_channel.c @@ -48,12 +48,12 @@ /* the protobuf library will (by default) start warning at 100megs */ #define DEFAULT_MAX_MESSAGE_LENGTH (100 * 1024 * 1024) -typedef struct { +typedef struct connected_channel_channel_data { grpc_transport *transport; gpr_uint32 max_message_length; } channel_data; -typedef struct { +typedef struct connected_channel_call_data { grpc_call_element *elem; grpc_stream_op_buffer outgoing_sopb; diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c index c4b8d60782f613cac41041d9fb9cdda9ca7cc848..6a1d83ce5d4ffee0f82d204a9d3653cbe5d1cd5d 100644 --- a/src/core/surface/completion_queue.c +++ b/src/core/surface/completion_queue.c @@ -71,6 +71,7 @@ struct grpc_completion_queue { grpc_pollset pollset; /* 0 initially, 1 once we've begun shutting down */ int shutdown; + int shutdown_called; /* Head of a linked list of queued events (prev points to the last element) */ event *queue; /* Fixed size chained hash table of events for pluck() */ @@ -107,7 +108,6 @@ static event *add_locked(grpc_completion_queue *cc, grpc_completion_type type, grpc_event_finish_func on_finish, void *user_data) { event *ev = gpr_malloc(sizeof(event)); gpr_uintptr bucket = ((gpr_uintptr)tag) % NUM_TAG_BUCKETS; - GPR_ASSERT(!cc->shutdown); ev->base.type = type; ev->base.tag = tag; ev->base.call = call; @@ -150,6 +150,7 @@ static void end_op_locked(grpc_completion_queue *cc, #endif if (gpr_unref(&cc->refs)) { GPR_ASSERT(!cc->shutdown); + GPR_ASSERT(cc->shutdown_called); cc->shutdown = 1; gpr_cv_broadcast(GRPC_POLLSET_CV(&cc->pollset)); } @@ -380,6 +381,10 @@ grpc_event *grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, /* Shutdown simply drops a ref that we reserved at creation time; if we drop to zero here, then enter shutdown mode and wake up any waiters */ void grpc_completion_queue_shutdown(grpc_completion_queue *cc) { + gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); + cc->shutdown_called = 1; + gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); + if (gpr_unref(&cc->refs)) { gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); GPR_ASSERT(!cc->shutdown); diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index 476cc4b226edeef967596a13844a050b0d096a53..6b7273e3e48b3a81e833f4b3d217c046ac34eef2 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -309,6 +309,7 @@ static void push_setting(transport *t, grpc_chttp2_setting_id id, static int prepare_callbacks(transport *t); static void run_callbacks(transport *t, const grpc_transport_callbacks *cb); +static void call_cb_closed(transport *t, const grpc_transport_callbacks *cb); static int prepare_write(transport *t); static void perform_write(transport *t, grpc_endpoint *ep); @@ -516,13 +517,29 @@ static void init_transport(transport *t, grpc_transport_setup_callback setup, static void destroy_transport(grpc_transport *gt) { transport *t = (transport *)gt; - gpr_mu_lock(&t->mu); + lock(t); t->destroying = 1; - while (t->calling_back) { + /* Wait for pending stuff to finish. + We need to be not calling back to ensure that closed() gets a chance to + trigger if needed during unlock() before we die. + We need to be not writing as cancellation finalization may produce some + callbacks that NEED to be made to close out some streams when t->writing + becomes 0. */ + while (t->calling_back || t->writing) { gpr_cv_wait(&t->cv, &t->mu, gpr_inf_future); } - t->cb = NULL; - gpr_mu_unlock(&t->mu); + drop_connection(t); + unlock(t); + + /* The drop_connection() above puts the transport into an error state, and + the follow-up unlock should then (as part of the cleanup work it does) + ensure that cb is NULL, and therefore not call back anything further. + This check validates this very subtle behavior. + It's shutdown path, so I don't believe an extra lock pair is going to be + problematic for performance. */ + lock(t); + GPR_ASSERT(!t->cb); + unlock(t); unref_transport(t); } @@ -680,6 +697,7 @@ static void stream_list_add_tail(transport *t, stream *s, stream_list_id id) { } static void stream_list_join(transport *t, stream *s, stream_list_id id) { + if (id == PENDING_CALLBACKS) GPR_ASSERT(t->cb != NULL || t->error_state == ERROR_STATE_NONE); if (s->included[id]) { return; } @@ -738,7 +756,7 @@ static void unlock(transport *t) { if (perform_callbacks) { t->calling_back = 1; } - if (t->error_state == ERROR_STATE_SEEN) { + if (t->error_state == ERROR_STATE_SEEN && !t->writing) { call_closed = 1; t->calling_back = 1; t->cb = NULL; /* no more callbacks */ @@ -772,7 +790,7 @@ static void unlock(transport *t) { } if (call_closed) { - cb->closed(t->cb_user_data, &t->base); + call_cb_closed(t, cb); } /* write some bytes if necessary */ @@ -903,13 +921,16 @@ static void finish_write_common(transport *t, int success) { } while ((s = stream_list_remove_head(t, WRITTEN_CLOSED))) { s->sent_write_closed = 1; - stream_list_join(t, s, PENDING_CALLBACKS); + if (!s->cancelled) stream_list_join(t, s, PENDING_CALLBACKS); } t->outbuf.count = 0; t->outbuf.length = 0; /* leave the writing flag up on shutdown to prevent further writes in unlock() from starting */ t->writing = 0; + if (t->destroying) { + gpr_cv_signal(&t->cv); + } if (!t->reading) { grpc_endpoint_destroy(t->ep); t->ep = NULL; @@ -979,7 +1000,8 @@ static void send_batch(grpc_transport *gt, grpc_stream *gs, grpc_stream_op *ops, } else { grpc_sopb_append(&t->nuke_later_sopb, ops, ops_count); } - if (is_last && s->outgoing_sopb.nops == 0 && s->read_closed) { + if (is_last && s->outgoing_sopb.nops == 0 && s->read_closed && + !s->published_close) { stream_list_join(t, s, PENDING_CALLBACKS); } @@ -1765,6 +1787,10 @@ static void run_callbacks(transport *t, const grpc_transport_callbacks *cb) { } } +static void call_cb_closed(transport *t, const grpc_transport_callbacks *cb) { + cb->closed(t->cb_user_data, &t->base); +} + static void add_to_pollset(grpc_transport *gt, grpc_pollset *pollset) { transport *t = (transport *)gt; lock(t); diff --git a/test/core/end2end/no_server_test.c b/test/core/end2end/no_server_test.c index d953552c0a268cbb6dc53f92a5e721b51bd31621..ffc651ab05886afb74bef2a3aa0f72655c218cdd 100644 --- a/test/core/end2end/no_server_test.c +++ b/test/core/end2end/no_server_test.c @@ -41,7 +41,7 @@ static void *tag(gpr_intptr i) { return (void *)i; } int main(int argc, char **argv) { grpc_channel *chan; grpc_call *call; - gpr_timespec deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5); + gpr_timespec deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(2); grpc_completion_queue *cq; cq_verifier *cqv; grpc_event *ev; diff --git a/test/core/fling/server.c b/test/core/fling/server.c index 59c303015a3935413407afc0460bd7dc51c8aede..5c1ab14d03477cced84afb85cd07cc1d2408e73b 100644 --- a/test/core/fling/server.c +++ b/test/core/fling/server.c @@ -275,7 +275,7 @@ int main(int argc, char **argv) { case FLING_SERVER_SEND_STATUS_FOR_STREAMING: /* Send status and close completed at server */ grpc_call_destroy(call); - request_call(); + if (!shutdown_started) request_call(); break; case FLING_SERVER_READ_FOR_UNARY: /* Finished payload read for unary. Start all reamaining @@ -288,7 +288,7 @@ int main(int argc, char **argv) { grpc_byte_buffer_destroy(payload_buffer); payload_buffer = NULL; grpc_call_destroy(call); - request_call(); + if (!shutdown_started) request_call(); break; } break; diff --git a/tools/buildgen/build-cleaner.py b/tools/buildgen/build-cleaner.py index 880f3e26a48c95e6da10383761d5fc2670c3a9d0..1d9157aad7df67df66b7aa3bf6f3720f29d72f26 100755 --- a/tools/buildgen/build-cleaner.py +++ b/tools/buildgen/build-cleaner.py @@ -41,6 +41,7 @@ _TOP_LEVEL_KEYS = ['settings', 'filegroups', 'libs', 'targets'] _VERSION_KEYS = ['major', 'minor', 'micro', 'build'] _ELEM_KEYS = [ 'name', + 'flaky', 'build', 'run', 'language', diff --git a/tools/run_tests/run_lcov.sh b/tools/run_tests/run_lcov.sh index 292aec454825d24ceb63eea355d9eff21bbc4ebd..69b1de6b8973b7ca81712571dc695aaa9699824d 100755 --- a/tools/run_tests/run_lcov.sh +++ b/tools/run_tests/run_lcov.sh @@ -35,7 +35,7 @@ out=`realpath ${1:-coverage}` root=`realpath $(dirname $0)/../..` tmp=`mktemp` cd $root -tools/run_tests/run_tests.py -c gcov -l c c++ +tools/run_tests/run_tests.py -c gcov -l c c++ || true lcov --capture --directory . --output-file $tmp genhtml $tmp --output-directory $out rm $tmp diff --git a/tools/run_tests/tests.json b/tools/run_tests/tests.json index 772856b9c7fff8ee8b0ee9f963642fa1b9b26213..06bf58f93e8f8655bb23b50b9711c861eada67d9 100644 --- a/tools/run_tests/tests.json +++ b/tools/run_tests/tests.json @@ -102,12 +102,12 @@ "name": "fd_posix_test" }, { - "flaky": true, + "flaky": false, "language": "c", "name": "fling_stream_test" }, { - "flaky": true, + "flaky": false, "language": "c", "name": "fling_test" },