diff --git a/BUILD b/BUILD index 0325d3c08a7b13a1dd8111a74c218e346812b77b..30c2b9ff6bfca89d2bcf41e8419d53fdb03c3263 100644 --- a/BUILD +++ b/BUILD @@ -168,6 +168,7 @@ cc_library( "src/core/lib/channel/compress_filter.h", "src/core/lib/channel/connected_channel.h", "src/core/lib/channel/context.h", + "src/core/lib/channel/deadline_filter.h", "src/core/lib/channel/handshaker.h", "src/core/lib/channel/http_client_filter.h", "src/core/lib/channel/http_server_filter.h", @@ -326,6 +327,7 @@ cc_library( "src/core/lib/channel/channel_stack_builder.c", "src/core/lib/channel/compress_filter.c", "src/core/lib/channel/connected_channel.c", + "src/core/lib/channel/deadline_filter.c", "src/core/lib/channel/handshaker.c", "src/core/lib/channel/http_client_filter.c", "src/core/lib/channel/http_server_filter.c", @@ -565,6 +567,7 @@ cc_library( "src/core/lib/channel/compress_filter.h", "src/core/lib/channel/connected_channel.h", "src/core/lib/channel/context.h", + "src/core/lib/channel/deadline_filter.h", "src/core/lib/channel/handshaker.h", "src/core/lib/channel/http_client_filter.h", "src/core/lib/channel/http_server_filter.h", @@ -708,6 +711,7 @@ cc_library( "src/core/lib/channel/channel_stack_builder.c", "src/core/lib/channel/compress_filter.c", "src/core/lib/channel/connected_channel.c", + "src/core/lib/channel/deadline_filter.c", "src/core/lib/channel/handshaker.c", "src/core/lib/channel/http_client_filter.c", "src/core/lib/channel/http_server_filter.c", @@ -917,6 +921,7 @@ cc_library( "src/core/lib/channel/compress_filter.h", "src/core/lib/channel/connected_channel.h", "src/core/lib/channel/context.h", + "src/core/lib/channel/deadline_filter.h", "src/core/lib/channel/handshaker.h", "src/core/lib/channel/http_client_filter.h", "src/core/lib/channel/http_server_filter.h", @@ -1052,6 +1057,7 @@ cc_library( "src/core/lib/channel/channel_stack_builder.c", "src/core/lib/channel/compress_filter.c", "src/core/lib/channel/connected_channel.c", + "src/core/lib/channel/deadline_filter.c", "src/core/lib/channel/handshaker.c", "src/core/lib/channel/http_client_filter.c", "src/core/lib/channel/http_server_filter.c", @@ -1266,6 +1272,7 @@ cc_library( "src/core/lib/channel/compress_filter.h", "src/core/lib/channel/connected_channel.h", "src/core/lib/channel/context.h", + "src/core/lib/channel/deadline_filter.h", "src/core/lib/channel/handshaker.h", "src/core/lib/channel/http_client_filter.h", "src/core/lib/channel/http_server_filter.h", @@ -1379,6 +1386,7 @@ cc_library( "src/core/lib/channel/channel_stack_builder.c", "src/core/lib/channel/compress_filter.c", "src/core/lib/channel/connected_channel.c", + "src/core/lib/channel/deadline_filter.c", "src/core/lib/channel/handshaker.c", "src/core/lib/channel/http_client_filter.c", "src/core/lib/channel/http_server_filter.c", @@ -1672,6 +1680,7 @@ cc_library( "src/core/lib/channel/compress_filter.h", "src/core/lib/channel/connected_channel.h", "src/core/lib/channel/context.h", + "src/core/lib/channel/deadline_filter.h", "src/core/lib/channel/handshaker.h", "src/core/lib/channel/http_client_filter.h", "src/core/lib/channel/http_server_filter.h", @@ -1780,6 +1789,7 @@ cc_library( "src/core/lib/channel/channel_stack_builder.c", "src/core/lib/channel/compress_filter.c", "src/core/lib/channel/connected_channel.c", + "src/core/lib/channel/deadline_filter.c", "src/core/lib/channel/handshaker.c", "src/core/lib/channel/http_client_filter.c", "src/core/lib/channel/http_server_filter.c", @@ -2167,6 +2177,7 @@ objc_library( "src/core/lib/channel/channel_stack_builder.c", "src/core/lib/channel/compress_filter.c", "src/core/lib/channel/connected_channel.c", + "src/core/lib/channel/deadline_filter.c", "src/core/lib/channel/handshaker.c", "src/core/lib/channel/http_client_filter.c", "src/core/lib/channel/http_server_filter.c", @@ -2385,6 +2396,7 @@ objc_library( "src/core/lib/channel/compress_filter.h", "src/core/lib/channel/connected_channel.h", "src/core/lib/channel/context.h", + "src/core/lib/channel/deadline_filter.h", "src/core/lib/channel/handshaker.h", "src/core/lib/channel/http_client_filter.h", "src/core/lib/channel/http_server_filter.h", diff --git a/CMakeLists.txt b/CMakeLists.txt index 8fd0d39b74f6c7f50b5a24482575c1cca9404297..88fcf5752b3e1bf73efb0980c62d7c2eb34c5df3 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -295,6 +295,7 @@ add_library(grpc src/core/lib/channel/channel_stack_builder.c src/core/lib/channel/compress_filter.c src/core/lib/channel/connected_channel.c + src/core/lib/channel/deadline_filter.c src/core/lib/channel/handshaker.c src/core/lib/channel/http_client_filter.c src/core/lib/channel/http_server_filter.c @@ -552,6 +553,7 @@ add_library(grpc_cronet src/core/lib/channel/channel_stack_builder.c src/core/lib/channel/compress_filter.c src/core/lib/channel/connected_channel.c + src/core/lib/channel/deadline_filter.c src/core/lib/channel/handshaker.c src/core/lib/channel/http_client_filter.c src/core/lib/channel/http_server_filter.c @@ -781,6 +783,7 @@ add_library(grpc_unsecure src/core/lib/channel/channel_stack_builder.c src/core/lib/channel/compress_filter.c src/core/lib/channel/connected_channel.c + src/core/lib/channel/deadline_filter.c src/core/lib/channel/handshaker.c src/core/lib/channel/http_client_filter.c src/core/lib/channel/http_server_filter.c @@ -1037,6 +1040,7 @@ add_library(grpc++ src/core/lib/channel/channel_stack_builder.c src/core/lib/channel/compress_filter.c src/core/lib/channel/connected_channel.c + src/core/lib/channel/deadline_filter.c src/core/lib/channel/handshaker.c src/core/lib/channel/http_client_filter.c src/core/lib/channel/http_server_filter.c @@ -1389,6 +1393,7 @@ add_library(grpc++_unsecure src/core/lib/channel/channel_stack_builder.c src/core/lib/channel/compress_filter.c src/core/lib/channel/connected_channel.c + src/core/lib/channel/deadline_filter.c src/core/lib/channel/handshaker.c src/core/lib/channel/http_client_filter.c src/core/lib/channel/http_server_filter.c diff --git a/Makefile b/Makefile index 978d99177c1a7b9bb4344175c3f3f578fbfcc42e..639e7f5ed8dd554bdc65cd2596b66a9502041971 100644 --- a/Makefile +++ b/Makefile @@ -2532,6 +2532,7 @@ LIBGRPC_SRC = \ src/core/lib/channel/channel_stack_builder.c \ src/core/lib/channel/compress_filter.c \ src/core/lib/channel/connected_channel.c \ + src/core/lib/channel/deadline_filter.c \ src/core/lib/channel/handshaker.c \ src/core/lib/channel/http_client_filter.c \ src/core/lib/channel/http_server_filter.c \ @@ -2807,6 +2808,7 @@ LIBGRPC_CRONET_SRC = \ src/core/lib/channel/channel_stack_builder.c \ src/core/lib/channel/compress_filter.c \ src/core/lib/channel/connected_channel.c \ + src/core/lib/channel/deadline_filter.c \ src/core/lib/channel/handshaker.c \ src/core/lib/channel/http_client_filter.c \ src/core/lib/channel/http_server_filter.c \ @@ -3071,6 +3073,7 @@ LIBGRPC_TEST_UTIL_SRC = \ src/core/lib/channel/channel_stack_builder.c \ src/core/lib/channel/compress_filter.c \ src/core/lib/channel/connected_channel.c \ + src/core/lib/channel/deadline_filter.c \ src/core/lib/channel/handshaker.c \ src/core/lib/channel/http_client_filter.c \ src/core/lib/channel/http_server_filter.c \ @@ -3262,6 +3265,7 @@ LIBGRPC_UNSECURE_SRC = \ src/core/lib/channel/channel_stack_builder.c \ src/core/lib/channel/compress_filter.c \ src/core/lib/channel/connected_channel.c \ + src/core/lib/channel/deadline_filter.c \ src/core/lib/channel/handshaker.c \ src/core/lib/channel/http_client_filter.c \ src/core/lib/channel/http_server_filter.c \ @@ -3601,6 +3605,7 @@ LIBGRPC++_SRC = \ src/core/lib/channel/channel_stack_builder.c \ src/core/lib/channel/compress_filter.c \ src/core/lib/channel/connected_channel.c \ + src/core/lib/channel/deadline_filter.c \ src/core/lib/channel/handshaker.c \ src/core/lib/channel/http_client_filter.c \ src/core/lib/channel/http_server_filter.c \ @@ -4228,6 +4233,7 @@ LIBGRPC++_UNSECURE_SRC = \ src/core/lib/channel/channel_stack_builder.c \ src/core/lib/channel/compress_filter.c \ src/core/lib/channel/connected_channel.c \ + src/core/lib/channel/deadline_filter.c \ src/core/lib/channel/handshaker.c \ src/core/lib/channel/http_client_filter.c \ src/core/lib/channel/http_server_filter.c \ diff --git a/binding.gyp b/binding.gyp index 45d04c8e671e7b18be0a0785f4ee5a280c586110..dc215cf5ffa5a5e2e9ed53737cb6774ee504c502 100644 --- a/binding.gyp +++ b/binding.gyp @@ -570,6 +570,7 @@ 'src/core/lib/channel/channel_stack_builder.c', 'src/core/lib/channel/compress_filter.c', 'src/core/lib/channel/connected_channel.c', + 'src/core/lib/channel/deadline_filter.c', 'src/core/lib/channel/handshaker.c', 'src/core/lib/channel/http_client_filter.c', 'src/core/lib/channel/http_server_filter.c', diff --git a/build.yaml b/build.yaml index a0a3dc53ac420f40e3f2da9548e5837347d7ae9a..c95bec4169f1ffca0f8abc865e1268dc8ce49916 100644 --- a/build.yaml +++ b/build.yaml @@ -172,6 +172,7 @@ filegroups: - src/core/lib/channel/compress_filter.h - src/core/lib/channel/connected_channel.h - src/core/lib/channel/context.h + - src/core/lib/channel/deadline_filter.h - src/core/lib/channel/handshaker.h - src/core/lib/channel/http_client_filter.h - src/core/lib/channel/http_server_filter.h @@ -253,6 +254,7 @@ filegroups: - src/core/lib/channel/channel_stack_builder.c - src/core/lib/channel/compress_filter.c - src/core/lib/channel/connected_channel.c + - src/core/lib/channel/deadline_filter.c - src/core/lib/channel/handshaker.c - src/core/lib/channel/http_client_filter.c - src/core/lib/channel/http_server_filter.c diff --git a/config.m4 b/config.m4 index 6f8b0ce48c954d30c4642b404665e12a3fd1e18f..1c52caae4853079456d72a7b2bb3cc9421f0197b 100644 --- a/config.m4 +++ b/config.m4 @@ -89,6 +89,7 @@ if test "$PHP_GRPC" != "no"; then src/core/lib/channel/channel_stack_builder.c \ src/core/lib/channel/compress_filter.c \ src/core/lib/channel/connected_channel.c \ + src/core/lib/channel/deadline_filter.c \ src/core/lib/channel/handshaker.c \ src/core/lib/channel/http_client_filter.c \ src/core/lib/channel/http_server_filter.c \ diff --git a/doc/core/pending_api_cleanups.md b/doc/core/pending_api_cleanups.md index 6d30e0b0e6394af9fddc56f06ee41c254f964a9b..a12e8a9dfbdc278548eda32a10ab304d1335a19f 100644 --- a/doc/core/pending_api_cleanups.md +++ b/doc/core/pending_api_cleanups.md @@ -13,3 +13,5 @@ number: - remove `GRPC_ARG_MAX_MESSAGE_LENGTH` channel arg from `include/grpc/impl/codegen/grpc_types.h` (commit `af00d8b`) +- remove `ServerBuilder::SetMaxMessageSize()` method from + `include/grpc++/server_builder.h` (commit `6980362`) diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec index 0214880ff9293be34e385cc7acf2ffd1f9d2cd3c..4d8186190ab9cd784c52ddd041faa4822f5edb67 100644 --- a/gRPC-Core.podspec +++ b/gRPC-Core.podspec @@ -255,6 +255,7 @@ Pod::Spec.new do |s| 'src/core/lib/channel/compress_filter.h', 'src/core/lib/channel/connected_channel.h', 'src/core/lib/channel/context.h', + 'src/core/lib/channel/deadline_filter.h', 'src/core/lib/channel/handshaker.h', 'src/core/lib/channel/http_client_filter.h', 'src/core/lib/channel/http_server_filter.h', @@ -417,6 +418,7 @@ Pod::Spec.new do |s| 'src/core/lib/channel/channel_stack_builder.c', 'src/core/lib/channel/compress_filter.c', 'src/core/lib/channel/connected_channel.c', + 'src/core/lib/channel/deadline_filter.c', 'src/core/lib/channel/handshaker.c', 'src/core/lib/channel/http_client_filter.c', 'src/core/lib/channel/http_server_filter.c', @@ -624,6 +626,7 @@ Pod::Spec.new do |s| 'src/core/lib/channel/compress_filter.h', 'src/core/lib/channel/connected_channel.h', 'src/core/lib/channel/context.h', + 'src/core/lib/channel/deadline_filter.h', 'src/core/lib/channel/handshaker.h', 'src/core/lib/channel/http_client_filter.h', 'src/core/lib/channel/http_server_filter.h', diff --git a/grpc.gemspec b/grpc.gemspec index 71763aa6fe4045da8f3753f6214eebed4d3d3ed1..bc28444a0d562f4384c51a80557a6003e898ead8 100755 --- a/grpc.gemspec +++ b/grpc.gemspec @@ -175,6 +175,7 @@ Gem::Specification.new do |s| s.files += %w( src/core/lib/channel/compress_filter.h ) s.files += %w( src/core/lib/channel/connected_channel.h ) s.files += %w( src/core/lib/channel/context.h ) + s.files += %w( src/core/lib/channel/deadline_filter.h ) s.files += %w( src/core/lib/channel/handshaker.h ) s.files += %w( src/core/lib/channel/http_client_filter.h ) s.files += %w( src/core/lib/channel/http_server_filter.h ) @@ -337,6 +338,7 @@ Gem::Specification.new do |s| s.files += %w( src/core/lib/channel/channel_stack_builder.c ) s.files += %w( src/core/lib/channel/compress_filter.c ) s.files += %w( src/core/lib/channel/connected_channel.c ) + s.files += %w( src/core/lib/channel/deadline_filter.c ) s.files += %w( src/core/lib/channel/handshaker.c ) s.files += %w( src/core/lib/channel/http_client_filter.c ) s.files += %w( src/core/lib/channel/http_server_filter.c ) diff --git a/include/grpc++/ext/reflection.grpc.pb.h b/include/grpc++/ext/reflection.grpc.pb.h index 064117e30305251ebd6003308aa28be1d97d3465..6e56088497ac537c73783accc037ef381ffcb00e 100644 --- a/include/grpc++/ext/reflection.grpc.pb.h +++ b/include/grpc++/ext/reflection.grpc.pb.h @@ -32,7 +32,7 @@ */ -// Generated by the gRPC protobuf plugin. +// Generated by tools/codegen/extensions/gen_reflection_proto.sh // If you make any local change, they will be lost. // source: reflection.proto // Original file comments: diff --git a/include/grpc++/ext/reflection.pb.h b/include/grpc++/ext/reflection.pb.h index bdb86197d036f2702181c79edbe3cf6a9386c324..caa1592424a89d139e405dca4b5cfd4a5c362950 100644 --- a/include/grpc++/ext/reflection.pb.h +++ b/include/grpc++/ext/reflection.pb.h @@ -32,7 +32,7 @@ */ -// Generated by the protocol buffer compiler. DO NOT EDIT! +// Generated by tools/codegen/extensions/gen_reflection_proto.sh // source: reflection.proto #ifndef PROTOBUF_reflection_2eproto__INCLUDED diff --git a/include/grpc/impl/codegen/grpc_types.h b/include/grpc/impl/codegen/grpc_types.h index a8e8ebe6ea92875441187a5ea7782bd902f19fb5..191cdd0e5fb02b82b5bd6a5d738fc5f1f4b896f5 100644 --- a/include/grpc/impl/codegen/grpc_types.h +++ b/include/grpc/impl/codegen/grpc_types.h @@ -148,11 +148,13 @@ typedef struct { /** Maximum number of concurrent incoming streams to allow on a http2 connection. Int valued. */ #define GRPC_ARG_MAX_CONCURRENT_STREAMS "grpc.max_concurrent_streams" -/** Maximum message length that the channel can receive. Int valued, bytes. */ +/** Maximum message length that the channel can receive. Int valued, bytes. + -1 means unlimited. */ #define GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH "grpc.max_receive_message_length" /** \deprecated For backward compatibility. */ #define GRPC_ARG_MAX_MESSAGE_LENGTH GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH -/** Maximum message length that the channel can send. Int valued, bytes. */ +/** Maximum message length that the channel can send. Int valued, bytes. + -1 means unlimited. */ #define GRPC_ARG_MAX_SEND_MESSAGE_LENGTH "grpc.max_send_message_length" /** Initial sequence number for http2 transports. Int valued. */ #define GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER \ diff --git a/package.xml b/package.xml index 140d1161758830be6eb791ea9477c2cebfb0af4f..3e54e1a59927aeb1732468b220f012c9a4416412 100644 --- a/package.xml +++ b/package.xml @@ -182,6 +182,7 @@ <file baseinstalldir="/" name="src/core/lib/channel/compress_filter.h" role="src" /> <file baseinstalldir="/" name="src/core/lib/channel/connected_channel.h" role="src" /> <file baseinstalldir="/" name="src/core/lib/channel/context.h" role="src" /> + <file baseinstalldir="/" name="src/core/lib/channel/deadline_filter.h" role="src" /> <file baseinstalldir="/" name="src/core/lib/channel/handshaker.h" role="src" /> <file baseinstalldir="/" name="src/core/lib/channel/http_client_filter.h" role="src" /> <file baseinstalldir="/" name="src/core/lib/channel/http_server_filter.h" role="src" /> @@ -344,6 +345,7 @@ <file baseinstalldir="/" name="src/core/lib/channel/channel_stack_builder.c" role="src" /> <file baseinstalldir="/" name="src/core/lib/channel/compress_filter.c" role="src" /> <file baseinstalldir="/" name="src/core/lib/channel/connected_channel.c" role="src" /> + <file baseinstalldir="/" name="src/core/lib/channel/deadline_filter.c" role="src" /> <file baseinstalldir="/" name="src/core/lib/channel/handshaker.c" role="src" /> <file baseinstalldir="/" name="src/core/lib/channel/http_client_filter.c" role="src" /> <file baseinstalldir="/" name="src/core/lib/channel/http_server_filter.c" role="src" /> diff --git a/src/core/ext/client_config/client_channel.c b/src/core/ext/client_config/client_channel.c index b2b4fea83cd7dba2c14438c0aed3b342a75050ea..feb4cbde7b539b941e00bba2de6a75ac621524be 100644 --- a/src/core/ext/client_config/client_channel.c +++ b/src/core/ext/client_config/client_channel.c @@ -46,6 +46,7 @@ #include "src/core/ext/client_config/subchannel.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/connected_channel.h" +#include "src/core/lib/channel/deadline_filter.h" #include "src/core/lib/iomgr/iomgr.h" #include "src/core/lib/iomgr/polling_entity.h" #include "src/core/lib/profiling/timers.h" @@ -114,7 +115,7 @@ static void set_channel_connectivity_state_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy_cancel_picks( exec_ctx, chand->lb_policy, /* mask= */ GRPC_INITIAL_METADATA_IGNORE_CONNECTIVITY, - /* check= */ 0); + /* check= */ 0, GRPC_ERROR_REF(error)); } grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, state, error, reason); @@ -391,6 +392,17 @@ typedef enum { for initial metadata before trying to create a call object, and handling cancellation gracefully. */ typedef struct client_channel_call_data { + // State for handling deadlines. + // The code in deadline_filter.c requires this to be the first field. + // TODO(roth): This is slightly sub-optimal in that grpc_deadline_state + // and this struct both independently store a pointer to the call + // stack and each has its own mutex. If/when we have time, find a way + // to avoid this without breaking the grpc_deadline_state abstraction. + grpc_deadline_state deadline_state; + gpr_timespec deadline; + + grpc_error *cancel_error; + /** either 0 for no call, 1 for cancelled, or a pointer to a grpc_subchannel_call */ gpr_atm subchannel_call; @@ -485,7 +497,7 @@ static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg, gpr_atm_no_barrier_store(&calld->subchannel_call, 1); fail_locked(exec_ctx, calld, GRPC_ERROR_CREATE_REFERENCING( "Failed to create subchannel", &error, 1)); - } else if (1 == gpr_atm_acq_load(&calld->subchannel_call)) { + } else if (GET_CALL(calld) == CANCELLED_CALL) { /* already cancelled before subchannel became ready */ fail_locked(exec_ctx, calld, GRPC_ERROR_CREATE_REFERENCING( @@ -493,7 +505,7 @@ static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg, } else { grpc_subchannel_call *subchannel_call = NULL; grpc_error *new_error = grpc_connected_subchannel_create_call( - exec_ctx, calld->connected_subchannel, calld->pollent, + exec_ctx, calld->connected_subchannel, calld->pollent, calld->deadline, &subchannel_call); if (new_error != GRPC_ERROR_NONE) { new_error = grpc_error_add_child(new_error, error); @@ -531,7 +543,7 @@ static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_metadata_batch *initial_metadata, uint32_t initial_metadata_flags, grpc_connected_subchannel **connected_subchannel, - grpc_closure *on_ready); + grpc_closure *on_ready, grpc_error *error); static void continue_picking(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { @@ -542,7 +554,8 @@ static void continue_picking(grpc_exec_ctx *exec_ctx, void *arg, grpc_exec_ctx_sched(exec_ctx, cpa->on_ready, GRPC_ERROR_REF(error), NULL); } else if (pick_subchannel(exec_ctx, cpa->elem, cpa->initial_metadata, cpa->initial_metadata_flags, - cpa->connected_subchannel, cpa->on_ready)) { + cpa->connected_subchannel, cpa->on_ready, + GRPC_ERROR_NONE)) { grpc_exec_ctx_sched(exec_ctx, cpa->on_ready, GRPC_ERROR_NONE, NULL); } gpr_free(cpa); @@ -552,7 +565,7 @@ static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_metadata_batch *initial_metadata, uint32_t initial_metadata_flags, grpc_connected_subchannel **connected_subchannel, - grpc_closure *on_ready) { + grpc_closure *on_ready, grpc_error *error) { GPR_TIMER_BEGIN("pick_subchannel", 0); channel_data *chand = elem->channel_data; @@ -566,21 +579,24 @@ static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, if (initial_metadata == NULL) { if (chand->lb_policy != NULL) { grpc_lb_policy_cancel_pick(exec_ctx, chand->lb_policy, - connected_subchannel); + connected_subchannel, GRPC_ERROR_REF(error)); } for (closure = chand->waiting_for_config_closures.head; closure != NULL; closure = closure->next_data.next) { cpa = closure->cb_arg; if (cpa->connected_subchannel == connected_subchannel) { cpa->connected_subchannel = NULL; - grpc_exec_ctx_sched(exec_ctx, cpa->on_ready, - GRPC_ERROR_CREATE("Pick cancelled"), NULL); + grpc_exec_ctx_sched( + exec_ctx, cpa->on_ready, + GRPC_ERROR_CREATE_REFERENCING("Pick cancelled", &error, 1), NULL); } } gpr_mu_unlock(&chand->mu); GPR_TIMER_END("pick_subchannel", 0); + GRPC_ERROR_UNREF(error); return true; } + GPR_ASSERT(error == GRPC_ERROR_NONE); if (chand->lb_policy != NULL) { grpc_lb_policy *lb_policy = chand->lb_policy; int r; @@ -631,12 +647,13 @@ static void cc_start_transport_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport_stream_op *op) { call_data *calld = elem->call_data; GRPC_CALL_LOG_OP(GPR_INFO, elem, op); + grpc_deadline_state_client_start_transport_stream_op(exec_ctx, elem, op); /* try to (atomically) get the call */ grpc_subchannel_call *call = GET_CALL(calld); GPR_TIMER_BEGIN("cc_start_transport_stream_op", 0); if (call == CANCELLED_CALL) { - grpc_transport_stream_op_finish_with_failure(exec_ctx, op, - GRPC_ERROR_CANCELLED); + grpc_transport_stream_op_finish_with_failure( + exec_ctx, op, GRPC_ERROR_REF(calld->cancel_error)); GPR_TIMER_END("cc_start_transport_stream_op", 0); return; } @@ -652,8 +669,8 @@ retry: call = GET_CALL(calld); if (call == CANCELLED_CALL) { gpr_mu_unlock(&calld->mu); - grpc_transport_stream_op_finish_with_failure(exec_ctx, op, - GRPC_ERROR_CANCELLED); + grpc_transport_stream_op_finish_with_failure( + exec_ctx, op, GRPC_ERROR_REF(calld->cancel_error)); GPR_TIMER_END("cc_start_transport_stream_op", 0); return; } @@ -669,18 +686,24 @@ retry: (gpr_atm)(uintptr_t)CANCELLED_CALL)) { goto retry; } else { + // Stash a copy of cancel_error in our call data, so that we can use + // it for subsequent operations. This ensures that if the call is + // cancelled before any ops are passed down (e.g., if the deadline + // is in the past when the call starts), we can return the right + // error to the caller when the first op does get passed down. + calld->cancel_error = GRPC_ERROR_REF(op->cancel_error); switch (calld->creation_phase) { case GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING: fail_locked(exec_ctx, calld, GRPC_ERROR_REF(op->cancel_error)); break; case GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL: pick_subchannel(exec_ctx, elem, NULL, 0, &calld->connected_subchannel, - NULL); + NULL, GRPC_ERROR_REF(op->cancel_error)); break; } gpr_mu_unlock(&calld->mu); - grpc_transport_stream_op_finish_with_failure(exec_ctx, op, - GRPC_ERROR_CANCELLED); + grpc_transport_stream_op_finish_with_failure( + exec_ctx, op, GRPC_ERROR_REF(op->cancel_error)); GPR_TIMER_END("cc_start_transport_stream_op", 0); return; } @@ -694,7 +717,8 @@ retry: GRPC_CALL_STACK_REF(calld->owning_call, "pick_subchannel"); if (pick_subchannel(exec_ctx, elem, op->send_initial_metadata, op->send_initial_metadata_flags, - &calld->connected_subchannel, &calld->next_step)) { + &calld->connected_subchannel, &calld->next_step, + GRPC_ERROR_NONE)) { calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING; GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "pick_subchannel"); } @@ -704,7 +728,7 @@ retry: calld->connected_subchannel != NULL) { grpc_subchannel_call *subchannel_call = NULL; grpc_error *error = grpc_connected_subchannel_create_call( - exec_ctx, calld->connected_subchannel, calld->pollent, + exec_ctx, calld->connected_subchannel, calld->pollent, calld->deadline, &subchannel_call); if (error != GRPC_ERROR_NONE) { subchannel_call = CANCELLED_CALL; @@ -727,6 +751,9 @@ static grpc_error *cc_init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_call_element_args *args) { call_data *calld = elem->call_data; + grpc_deadline_state_init(exec_ctx, elem, args); + calld->deadline = args->deadline; + calld->cancel_error = GRPC_ERROR_NONE; gpr_atm_rel_store(&calld->subchannel_call, 0); gpr_mu_init(&calld->mu); calld->connected_subchannel = NULL; @@ -745,6 +772,8 @@ static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx, const grpc_call_final_info *final_info, void *and_free_memory) { call_data *calld = elem->call_data; + grpc_deadline_state_destroy(exec_ctx, elem); + GRPC_ERROR_UNREF(calld->cancel_error); grpc_subchannel_call *call = GET_CALL(calld); if (call != NULL && call != CANCELLED_CALL) { GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, call, "client_channel_destroy_call"); diff --git a/src/core/ext/client_config/lb_policy.c b/src/core/ext/client_config/lb_policy.c index 903563ef6b58d8cc13a2f32851f6c4fd3eea29c4..46391272a69088ff6546183c1564ee455d53dacd 100644 --- a/src/core/ext/client_config/lb_policy.c +++ b/src/core/ext/client_config/lb_policy.c @@ -108,16 +108,18 @@ int grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, } void grpc_lb_policy_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, - grpc_connected_subchannel **target) { - policy->vtable->cancel_pick(exec_ctx, policy, target); + grpc_connected_subchannel **target, + grpc_error *error) { + policy->vtable->cancel_pick(exec_ctx, policy, target, error); } void grpc_lb_policy_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, uint32_t initial_metadata_flags_mask, - uint32_t initial_metadata_flags_eq) { + uint32_t initial_metadata_flags_eq, + grpc_error *error) { policy->vtable->cancel_picks(exec_ctx, policy, initial_metadata_flags_mask, - initial_metadata_flags_eq); + initial_metadata_flags_eq, error); } void grpc_lb_policy_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy) { diff --git a/src/core/ext/client_config/lb_policy.h b/src/core/ext/client_config/lb_policy.h index 37c93d707c17906459fa52c2e20ebc6eaceed2be..7fb3e08cb3ecc90361688d32838ae180b89d7568 100644 --- a/src/core/ext/client_config/lb_policy.h +++ b/src/core/ext/client_config/lb_policy.h @@ -77,12 +77,12 @@ struct grpc_lb_policy_vtable { /** \see grpc_lb_policy_cancel_pick */ void (*cancel_pick)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, - grpc_connected_subchannel **target); + grpc_connected_subchannel **target, grpc_error *error); /** \see grpc_lb_policy_cancel_picks */ void (*cancel_picks)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, uint32_t initial_metadata_flags_mask, - uint32_t initial_metadata_flags_eq); + uint32_t initial_metadata_flags_eq, grpc_error *error); /** \see grpc_lb_policy_ping_one */ void (*ping_one)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, @@ -161,7 +161,8 @@ void grpc_lb_policy_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, The \a on_complete callback of the pending picks will be invoked with \a *target set to NULL. */ void grpc_lb_policy_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, - grpc_connected_subchannel **target); + grpc_connected_subchannel **target, + grpc_error *error); /** Cancel all pending picks for which their \a initial_metadata_flags (as given in the call to \a grpc_lb_policy_pick) matches \a initial_metadata_flags_eq @@ -169,7 +170,8 @@ void grpc_lb_policy_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, void grpc_lb_policy_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, uint32_t initial_metadata_flags_mask, - uint32_t initial_metadata_flags_eq); + uint32_t initial_metadata_flags_eq, + grpc_error *error); /** Try to enter a READY connectivity state */ void grpc_lb_policy_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy); diff --git a/src/core/ext/client_config/subchannel.c b/src/core/ext/client_config/subchannel.c index 96595a38dd4a28ffad232d98837f23ca4c53f417..f26f9d6ab79c4660536b73f3b2b23faeb0453fc5 100644 --- a/src/core/ext/client_config/subchannel.c +++ b/src/core/ext/client_config/subchannel.c @@ -629,7 +629,9 @@ static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *arg, c->have_alarm = 1; grpc_connectivity_state_set( exec_ctx, &c->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, - GRPC_ERROR_CREATE_REFERENCING("Connect Failed", &error, 1), + grpc_error_set_int( + GRPC_ERROR_CREATE_REFERENCING("Connect Failed", &error, 1), + GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE), "connect_failed"); gpr_timespec time_til_next = gpr_time_sub(c->next_attempt, now); const char *errmsg = grpc_error_string(error); @@ -698,14 +700,15 @@ grpc_connected_subchannel *grpc_subchannel_get_connected_subchannel( grpc_error *grpc_connected_subchannel_create_call( grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con, - grpc_polling_entity *pollent, grpc_subchannel_call **call) { + grpc_polling_entity *pollent, gpr_timespec deadline, + grpc_subchannel_call **call) { grpc_channel_stack *chanstk = CHANNEL_STACK_FROM_CONNECTION(con); *call = gpr_malloc(sizeof(grpc_subchannel_call) + chanstk->call_stack_size); grpc_call_stack *callstk = SUBCHANNEL_CALL_TO_CALL_STACK(*call); (*call)->connection = con; // Ref is added below. grpc_error *error = grpc_call_stack_init(exec_ctx, chanstk, 1, subchannel_call_destroy, *call, - NULL, NULL, callstk); + NULL, NULL, deadline, callstk); if (error != GRPC_ERROR_NONE) { const char *error_string = grpc_error_string(error); gpr_log(GPR_ERROR, "error: %s", error_string); diff --git a/src/core/ext/client_config/subchannel.h b/src/core/ext/client_config/subchannel.h index 218bb43e0a8eada1a56d7ba3ad8a7bf6dd66643e..3330621071286a6538c67157f326e6b3b078b6e9 100644 --- a/src/core/ext/client_config/subchannel.h +++ b/src/core/ext/client_config/subchannel.h @@ -110,7 +110,8 @@ void grpc_subchannel_call_unref(grpc_exec_ctx *exec_ctx, /** construct a subchannel call */ grpc_error *grpc_connected_subchannel_create_call( grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *connected_subchannel, - grpc_polling_entity *pollent, grpc_subchannel_call **subchannel_call); + grpc_polling_entity *pollent, gpr_timespec deadline, + grpc_subchannel_call **subchannel_call); /** process a transport level op */ void grpc_connected_subchannel_process_transport_op( diff --git a/src/core/ext/client_config/subchannel_factory.c b/src/core/ext/client_config/subchannel_factory.c deleted file mode 100644 index d1e4d75a023052c771ce4242f1b2a29b13c3dff6..0000000000000000000000000000000000000000 --- a/src/core/ext/client_config/subchannel_factory.c +++ /dev/null @@ -1,49 +0,0 @@ -/* - * - * Copyright 2015, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#include "src/core/ext/client_config/subchannel_factory.h" - -void grpc_subchannel_factory_ref(grpc_subchannel_factory* factory) { - factory->vtable->ref(factory); -} - -void grpc_subchannel_factory_unref(grpc_exec_ctx* exec_ctx, - grpc_subchannel_factory* factory) { - factory->vtable->unref(exec_ctx, factory); -} - -grpc_subchannel* grpc_subchannel_factory_create_subchannel( - grpc_exec_ctx* exec_ctx, grpc_subchannel_factory* factory, - grpc_subchannel_args* args) { - return factory->vtable->create_subchannel(exec_ctx, factory, args); -} diff --git a/src/core/ext/client_config/subchannel_factory.h b/src/core/ext/client_config/subchannel_factory.h deleted file mode 100644 index 0fb806d08197bbe9b4e77eb97346be6d211257b7..0000000000000000000000000000000000000000 --- a/src/core/ext/client_config/subchannel_factory.h +++ /dev/null @@ -1,66 +0,0 @@ -/* - * - * Copyright 2015, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#ifndef GRPC_CORE_EXT_CLIENT_CONFIG_SUBCHANNEL_FACTORY_H -#define GRPC_CORE_EXT_CLIENT_CONFIG_SUBCHANNEL_FACTORY_H - -#include "src/core/ext/client_config/subchannel.h" -#include "src/core/lib/channel/channel_stack.h" - -typedef struct grpc_subchannel_factory grpc_subchannel_factory; -typedef struct grpc_subchannel_factory_vtable grpc_subchannel_factory_vtable; - -/** Constructor for new configured channels. - Creating decorators around this type is encouraged to adapt behavior. */ -struct grpc_subchannel_factory { - const grpc_subchannel_factory_vtable *vtable; -}; - -struct grpc_subchannel_factory_vtable { - void (*ref)(grpc_subchannel_factory *factory); - void (*unref)(grpc_exec_ctx *exec_ctx, grpc_subchannel_factory *factory); - grpc_subchannel *(*create_subchannel)(grpc_exec_ctx *exec_ctx, - grpc_subchannel_factory *factory, - grpc_subchannel_args *args); -}; - -void grpc_subchannel_factory_ref(grpc_subchannel_factory *factory); -void grpc_subchannel_factory_unref(grpc_exec_ctx *exec_ctx, - grpc_subchannel_factory *factory); - -/** Create a new grpc_subchannel */ -grpc_subchannel *grpc_subchannel_factory_create_subchannel( - grpc_exec_ctx *exec_ctx, grpc_subchannel_factory *factory, - grpc_subchannel_args *args); - -#endif /* GRPC_CORE_EXT_CLIENT_CONFIG_SUBCHANNEL_FACTORY_H */ diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c index f45cff1a178eab77aa092fc5ce69e2657e8adffd..0050489425ac24954d0adce14736f49a6b647a1d 100644 --- a/src/core/ext/lb_policy/grpclb/grpclb.c +++ b/src/core/ext/lb_policy/grpclb/grpclb.c @@ -688,7 +688,8 @@ static void glb_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { } static void glb_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, - grpc_connected_subchannel **target) { + grpc_connected_subchannel **target, + grpc_error *error) { glb_lb_policy *glb_policy = (glb_lb_policy *)pol; gpr_mu_lock(&glb_policy->mu); pending_pick *pp = glb_policy->pending_picks; @@ -699,8 +700,10 @@ static void glb_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_polling_entity_del_from_pollset_set( exec_ctx, pp->pollent, glb_policy->base.interested_parties); *target = NULL; - grpc_exec_ctx_sched(exec_ctx, &pp->wrapped_on_complete, - GRPC_ERROR_CANCELLED, NULL); + grpc_exec_ctx_sched( + exec_ctx, &pp->wrapped_on_complete, + GRPC_ERROR_CREATE_REFERENCING("Pick Cancelled", &error, 1), NULL); + gpr_free(pp); } else { pp->next = glb_policy->pending_picks; glb_policy->pending_picks = pp; @@ -708,12 +711,14 @@ static void glb_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, pp = next; } gpr_mu_unlock(&glb_policy->mu); + GRPC_ERROR_UNREF(error); } static grpc_call *lb_client_data_get_call(struct lb_client_data *lb_client); static void glb_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, uint32_t initial_metadata_flags_mask, - uint32_t initial_metadata_flags_eq) { + uint32_t initial_metadata_flags_eq, + grpc_error *error) { glb_lb_policy *glb_policy = (glb_lb_policy *)pol; gpr_mu_lock(&glb_policy->mu); if (glb_policy->lb_client != NULL) { @@ -728,8 +733,10 @@ static void glb_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, initial_metadata_flags_eq) { grpc_polling_entity_del_from_pollset_set( exec_ctx, pp->pollent, glb_policy->base.interested_parties); - grpc_exec_ctx_sched(exec_ctx, &pp->wrapped_on_complete, - GRPC_ERROR_CANCELLED, NULL); + grpc_exec_ctx_sched( + exec_ctx, &pp->wrapped_on_complete, + GRPC_ERROR_CREATE_REFERENCING("Pick Cancelled", &error, 1), NULL); + gpr_free(pp); } else { pp->next = glb_policy->pending_picks; glb_policy->pending_picks = pp; @@ -737,6 +744,7 @@ static void glb_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, pp = next; } gpr_mu_unlock(&glb_policy->mu); + GRPC_ERROR_UNREF(error); } static void query_for_backends(grpc_exec_ctx *exec_ctx, diff --git a/src/core/ext/lb_policy/pick_first/pick_first.c b/src/core/ext/lb_policy/pick_first/pick_first.c index 466a0fdede8ef2afac46263af3094874a15ac0ca..961a0c9b1964352e4891fbf66c077e0dc2694116 100644 --- a/src/core/ext/lb_policy/pick_first/pick_first.c +++ b/src/core/ext/lb_policy/pick_first/pick_first.c @@ -128,7 +128,8 @@ static void pf_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { } static void pf_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, - grpc_connected_subchannel **target) { + grpc_connected_subchannel **target, + grpc_error *error) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; pending_pick *pp; gpr_mu_lock(&p->mu); @@ -140,8 +141,9 @@ static void pf_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_polling_entity_del_from_pollset_set(exec_ctx, pp->pollent, p->base.interested_parties); *target = NULL; - grpc_exec_ctx_sched(exec_ctx, pp->on_complete, - GRPC_ERROR_CREATE("Pick Cancelled"), NULL); + grpc_exec_ctx_sched( + exec_ctx, pp->on_complete, + GRPC_ERROR_CREATE_REFERENCING("Pick Cancelled", &error, 1), NULL); gpr_free(pp); } else { pp->next = p->pending_picks; @@ -150,11 +152,13 @@ static void pf_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, pp = next; } gpr_mu_unlock(&p->mu); + GRPC_ERROR_UNREF(error); } static void pf_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, uint32_t initial_metadata_flags_mask, - uint32_t initial_metadata_flags_eq) { + uint32_t initial_metadata_flags_eq, + grpc_error *error) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; pending_pick *pp; gpr_mu_lock(&p->mu); @@ -166,8 +170,9 @@ static void pf_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, initial_metadata_flags_eq) { grpc_polling_entity_del_from_pollset_set(exec_ctx, pp->pollent, p->base.interested_parties); - grpc_exec_ctx_sched(exec_ctx, pp->on_complete, - GRPC_ERROR_CREATE("Pick Cancelled"), NULL); + grpc_exec_ctx_sched( + exec_ctx, pp->on_complete, + GRPC_ERROR_CREATE_REFERENCING("Pick Cancelled", &error, 1), NULL); gpr_free(pp); } else { pp->next = p->pending_picks; @@ -176,6 +181,7 @@ static void pf_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, pp = next; } gpr_mu_unlock(&p->mu); + GRPC_ERROR_UNREF(error); } static void start_picking(grpc_exec_ctx *exec_ctx, pick_first_lb_policy *p) { @@ -466,6 +472,8 @@ static grpc_lb_policy *create_pick_first(grpc_exec_ctx *exec_ctx, } memset(&sc_args, 0, sizeof(grpc_subchannel_args)); + /* server_name will be copied as part of the subchannel creation. This makes + * the copying of args->server_name (a borrowed pointer) OK. */ sc_args.server_name = args->server_name; sc_args.addr = (struct sockaddr *)(&args->addresses->addresses[i].address.addr); diff --git a/src/core/ext/lb_policy/round_robin/round_robin.c b/src/core/ext/lb_policy/round_robin/round_robin.c index 037f180a9e7fe7e144a1b863270696566f9a69d5..930fa86aca1148d1670df7b501a1519ee6d2ea82 100644 --- a/src/core/ext/lb_policy/round_robin/round_robin.c +++ b/src/core/ext/lb_policy/round_robin/round_robin.c @@ -308,7 +308,8 @@ static void rr_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { } static void rr_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, - grpc_connected_subchannel **target) { + grpc_connected_subchannel **target, + grpc_error *error) { round_robin_lb_policy *p = (round_robin_lb_policy *)pol; pending_pick *pp; gpr_mu_lock(&p->mu); @@ -320,8 +321,9 @@ static void rr_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_polling_entity_del_from_pollset_set(exec_ctx, pp->pollent, p->base.interested_parties); *target = NULL; - grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_CANCELLED, - NULL); + grpc_exec_ctx_sched( + exec_ctx, pp->on_complete, + GRPC_ERROR_CREATE_REFERENCING("Pick cancelled", &error, 1), NULL); gpr_free(pp); } else { pp->next = p->pending_picks; @@ -330,11 +332,13 @@ static void rr_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, pp = next; } gpr_mu_unlock(&p->mu); + GRPC_ERROR_UNREF(error); } static void rr_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, uint32_t initial_metadata_flags_mask, - uint32_t initial_metadata_flags_eq) { + uint32_t initial_metadata_flags_eq, + grpc_error *error) { round_robin_lb_policy *p = (round_robin_lb_policy *)pol; pending_pick *pp; gpr_mu_lock(&p->mu); @@ -347,8 +351,9 @@ static void rr_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_polling_entity_del_from_pollset_set(exec_ctx, pp->pollent, p->base.interested_parties); *pp->target = NULL; - grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_CANCELLED, - NULL); + grpc_exec_ctx_sched( + exec_ctx, pp->on_complete, + GRPC_ERROR_CREATE_REFERENCING("Pick cancelled", &error, 1), NULL); gpr_free(pp); } else { pp->next = p->pending_picks; @@ -357,6 +362,7 @@ static void rr_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, pp = next; } gpr_mu_unlock(&p->mu); + GRPC_ERROR_UNREF(error); } static void start_picking(grpc_exec_ctx *exec_ctx, round_robin_lb_policy *p) { @@ -629,6 +635,8 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx, if (args->addresses->addresses[i].is_balancer) continue; memset(&sc_args, 0, sizeof(grpc_subchannel_args)); + /* server_name will be copied as part of the subchannel creation. This makes + * the copying of args->server_name (a borrowed pointer) OK. */ sc_args.server_name = args->server_name; sc_args.addr = (struct sockaddr *)(&args->addresses->addresses[i].address.addr); diff --git a/src/core/lib/channel/channel_stack.c b/src/core/lib/channel/channel_stack.c index 0655b9353f6fea4c737daa768ef0b4538f2c14c8..57d34d9e9abd91512b59ce9bc0d89f253de931b5 100644 --- a/src/core/lib/channel/channel_stack.c +++ b/src/core/lib/channel/channel_stack.c @@ -158,13 +158,11 @@ void grpc_channel_stack_destroy(grpc_exec_ctx *exec_ctx, } } -grpc_error *grpc_call_stack_init(grpc_exec_ctx *exec_ctx, - grpc_channel_stack *channel_stack, - int initial_refs, grpc_iomgr_cb_func destroy, - void *destroy_arg, - grpc_call_context_element *context, - const void *transport_server_data, - grpc_call_stack *call_stack) { +grpc_error *grpc_call_stack_init( + grpc_exec_ctx *exec_ctx, grpc_channel_stack *channel_stack, + int initial_refs, grpc_iomgr_cb_func destroy, void *destroy_arg, + grpc_call_context_element *context, const void *transport_server_data, + gpr_timespec deadline, grpc_call_stack *call_stack) { grpc_channel_element *channel_elems = CHANNEL_ELEMS_FROM_STACK(channel_stack); grpc_call_element_args args; size_t count = channel_stack->count; @@ -185,6 +183,7 @@ grpc_error *grpc_call_stack_init(grpc_exec_ctx *exec_ctx, args.call_stack = call_stack; args.server_transport_data = transport_server_data; args.context = context; + args.deadline = deadline; call_elems[i].filter = channel_elems[i].filter; call_elems[i].channel_data = channel_elems[i].channel_data; call_elems[i].call_data = user_data; @@ -276,16 +275,16 @@ static void destroy_op(grpc_exec_ctx *exec_ctx, void *op, grpc_error *error) { } void grpc_call_element_send_cancel(grpc_exec_ctx *exec_ctx, - grpc_call_element *cur_elem) { + grpc_call_element *elem) { grpc_transport_stream_op *op = gpr_malloc(sizeof(*op)); memset(op, 0, sizeof(*op)); op->cancel_error = GRPC_ERROR_CANCELLED; op->on_complete = grpc_closure_create(destroy_op, op); - grpc_call_next_op(exec_ctx, cur_elem, op); + elem->filter->start_transport_stream_op(exec_ctx, elem, op); } void grpc_call_element_send_cancel_with_message(grpc_exec_ctx *exec_ctx, - grpc_call_element *cur_elem, + grpc_call_element *elem, grpc_status_code status, gpr_slice *optional_message) { grpc_transport_stream_op *op = gpr_malloc(sizeof(*op)); @@ -293,5 +292,16 @@ void grpc_call_element_send_cancel_with_message(grpc_exec_ctx *exec_ctx, op->on_complete = grpc_closure_create(destroy_op, op); grpc_transport_stream_op_add_cancellation_with_message(op, status, optional_message); - grpc_call_next_op(exec_ctx, cur_elem, op); + elem->filter->start_transport_stream_op(exec_ctx, elem, op); +} + +void grpc_call_element_send_close_with_message(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, + grpc_status_code status, + gpr_slice *optional_message) { + grpc_transport_stream_op *op = gpr_malloc(sizeof(*op)); + memset(op, 0, sizeof(*op)); + op->on_complete = grpc_closure_create(destroy_op, op); + grpc_transport_stream_op_add_close(op, status, optional_message); + elem->filter->start_transport_stream_op(exec_ctx, elem, op); } diff --git a/src/core/lib/channel/channel_stack.h b/src/core/lib/channel/channel_stack.h index 6b73cce380f4555453068ce0b42cdef8c570969d..1cfe2885d819adefac776989a3c0ae1dfd051fa9 100644 --- a/src/core/lib/channel/channel_stack.h +++ b/src/core/lib/channel/channel_stack.h @@ -74,6 +74,7 @@ typedef struct { grpc_call_stack *call_stack; const void *server_transport_data; grpc_call_context_element *context; + gpr_timespec deadline; } grpc_call_element_args; typedef struct { @@ -220,13 +221,11 @@ void grpc_channel_stack_destroy(grpc_exec_ctx *exec_ctx, /* Initialize a call stack given a channel stack. transport_server_data is expected to be NULL on a client, or an opaque transport owned pointer on the server. */ -grpc_error *grpc_call_stack_init(grpc_exec_ctx *exec_ctx, - grpc_channel_stack *channel_stack, - int initial_refs, grpc_iomgr_cb_func destroy, - void *destroy_arg, - grpc_call_context_element *context, - const void *transport_server_data, - grpc_call_stack *call_stack); +grpc_error *grpc_call_stack_init( + grpc_exec_ctx *exec_ctx, grpc_channel_stack *channel_stack, + int initial_refs, grpc_iomgr_cb_func destroy, void *destroy_arg, + grpc_call_context_element *context, const void *transport_server_data, + gpr_timespec deadline, grpc_call_stack *call_stack); /* Set a pollset or a pollset_set for a call stack: must occur before the first * op is started */ void grpc_call_stack_set_pollset_or_pollset_set(grpc_exec_ctx *exec_ctx, @@ -290,6 +289,11 @@ void grpc_call_element_send_cancel_with_message(grpc_exec_ctx *exec_ctx, grpc_status_code status, gpr_slice *optional_message); +void grpc_call_element_send_close_with_message(grpc_exec_ctx *exec_ctx, + grpc_call_element *cur_elem, + grpc_status_code status, + gpr_slice *optional_message); + extern int grpc_trace_channel; #define GRPC_CALL_LOG_OP(sev, elem, op) \ diff --git a/src/core/lib/channel/deadline_filter.c b/src/core/lib/channel/deadline_filter.c new file mode 100644 index 0000000000000000000000000000000000000000..079b98a2f85c0fee45816de2be7f12603bfd57d4 --- /dev/null +++ b/src/core/lib/channel/deadline_filter.c @@ -0,0 +1,302 @@ +// +// Copyright 2016, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +// + +#include "src/core/lib/channel/deadline_filter.h" + +#include <stdbool.h> +#include <string.h> + +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/sync.h> +#include <grpc/support/time.h> + +#include "src/core/lib/iomgr/exec_ctx.h" +#include "src/core/lib/iomgr/timer.h" + +// +// grpc_deadline_state +// + +// Timer callback. +static void timer_callback(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { + grpc_call_element* elem = arg; + grpc_deadline_state* deadline_state = elem->call_data; + gpr_mu_lock(&deadline_state->timer_mu); + deadline_state->timer_pending = false; + gpr_mu_unlock(&deadline_state->timer_mu); + if (error != GRPC_ERROR_CANCELLED) { + gpr_slice msg = gpr_slice_from_static_string("Deadline Exceeded"); + grpc_call_element_send_cancel_with_message( + exec_ctx, elem, GRPC_STATUS_DEADLINE_EXCEEDED, &msg); + gpr_slice_unref(msg); + } + GRPC_CALL_STACK_UNREF(exec_ctx, deadline_state->call_stack, "deadline_timer"); +} + +// Starts the deadline timer. +static void start_timer_if_needed(grpc_exec_ctx* exec_ctx, + grpc_call_element* elem, + gpr_timespec deadline) { + grpc_deadline_state* deadline_state = elem->call_data; + deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC); + if (gpr_time_cmp(deadline, gpr_inf_future(GPR_CLOCK_MONOTONIC)) != 0) { + // Take a reference to the call stack, to be owned by the timer. + GRPC_CALL_STACK_REF(deadline_state->call_stack, "deadline_timer"); + gpr_mu_lock(&deadline_state->timer_mu); + deadline_state->timer_pending = true; + grpc_timer_init(exec_ctx, &deadline_state->timer, deadline, timer_callback, + elem, gpr_now(GPR_CLOCK_MONOTONIC)); + gpr_mu_unlock(&deadline_state->timer_mu); + } +} + +// Cancels the deadline timer. +static void cancel_timer_if_needed(grpc_exec_ctx* exec_ctx, + grpc_deadline_state* deadline_state) { + gpr_mu_lock(&deadline_state->timer_mu); + if (deadline_state->timer_pending) { + grpc_timer_cancel(exec_ctx, &deadline_state->timer); + deadline_state->timer_pending = false; + } + gpr_mu_unlock(&deadline_state->timer_mu); +} + +// Callback run when the call is complete. +static void on_complete(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { + grpc_deadline_state* deadline_state = arg; + cancel_timer_if_needed(exec_ctx, deadline_state); + // Invoke the next callback. + deadline_state->next_on_complete->cb( + exec_ctx, deadline_state->next_on_complete->cb_arg, error); +} + +// Inject our own on_complete callback into op. +static void inject_on_complete_cb(grpc_deadline_state* deadline_state, + grpc_transport_stream_op* op) { + deadline_state->next_on_complete = op->on_complete; + grpc_closure_init(&deadline_state->on_complete, on_complete, deadline_state); + op->on_complete = &deadline_state->on_complete; +} + +// Callback and associated state for starting the timer after call stack +// initialization has been completed. +struct start_timer_after_init_state { + grpc_call_element* elem; + gpr_timespec deadline; + grpc_closure closure; +}; +static void start_timer_after_init(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { + struct start_timer_after_init_state* state = arg; + start_timer_if_needed(exec_ctx, state->elem, state->deadline); + gpr_free(state); +} + +void grpc_deadline_state_init(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, + grpc_call_element_args* args) { + grpc_deadline_state* deadline_state = elem->call_data; + memset(deadline_state, 0, sizeof(*deadline_state)); + deadline_state->call_stack = args->call_stack; + gpr_mu_init(&deadline_state->timer_mu); + // Deadline will always be infinite on servers, so the timer will only be + // set on clients with a finite deadline. + const gpr_timespec deadline = + gpr_convert_clock_type(args->deadline, GPR_CLOCK_MONOTONIC); + if (gpr_time_cmp(deadline, gpr_inf_future(GPR_CLOCK_MONOTONIC)) != 0) { + // When the deadline passes, we indicate the failure by sending down + // an op with cancel_error set. However, we can't send down any ops + // until after the call stack is fully initialized. If we start the + // timer here, we have no guarantee that the timer won't pop before + // call stack initialization is finished. To avoid that problem, we + // create a closure to start the timer, and we schedule that closure + // to be run after call stack initialization is done. + struct start_timer_after_init_state* state = gpr_malloc(sizeof(*state)); + state->elem = elem; + state->deadline = deadline; + grpc_closure_init(&state->closure, start_timer_after_init, state); + grpc_exec_ctx_sched(exec_ctx, &state->closure, GRPC_ERROR_NONE, NULL); + } +} + +void grpc_deadline_state_destroy(grpc_exec_ctx* exec_ctx, + grpc_call_element* elem) { + grpc_deadline_state* deadline_state = elem->call_data; + cancel_timer_if_needed(exec_ctx, deadline_state); + gpr_mu_destroy(&deadline_state->timer_mu); +} + +void grpc_deadline_state_client_start_transport_stream_op( + grpc_exec_ctx* exec_ctx, grpc_call_element* elem, + grpc_transport_stream_op* op) { + grpc_deadline_state* deadline_state = elem->call_data; + if (op->cancel_error != GRPC_ERROR_NONE || + op->close_error != GRPC_ERROR_NONE) { + cancel_timer_if_needed(exec_ctx, deadline_state); + } else { + // Make sure we know when the call is complete, so that we can cancel + // the timer. + if (op->recv_trailing_metadata != NULL) { + inject_on_complete_cb(deadline_state, op); + } + } +} + +// +// filter code +// + +// Constructor for channel_data. Used for both client and server filters. +static void init_channel_elem(grpc_exec_ctx* exec_ctx, + grpc_channel_element* elem, + grpc_channel_element_args* args) { + GPR_ASSERT(!args->is_last); +} + +// Destructor for channel_data. Used for both client and server filters. +static void destroy_channel_elem(grpc_exec_ctx* exec_ctx, + grpc_channel_element* elem) {} + +// Call data used for both client and server filter. +typedef struct base_call_data { + grpc_deadline_state deadline_state; +} base_call_data; + +// Additional call data used only for the server filter. +typedef struct server_call_data { + base_call_data base; // Must be first. + // The closure for receiving initial metadata. + grpc_closure recv_initial_metadata_ready; + // Received initial metadata batch. + grpc_metadata_batch* recv_initial_metadata; + // The original recv_initial_metadata_ready closure, which we chain to + // after our own closure is invoked. + grpc_closure* next_recv_initial_metadata_ready; +} server_call_data; + +// Constructor for call_data. Used for both client and server filters. +static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx, + grpc_call_element* elem, + grpc_call_element_args* args) { + // Note: size of call data is different between client and server. + memset(elem->call_data, 0, elem->filter->sizeof_call_data); + grpc_deadline_state_init(exec_ctx, elem, args); + return GRPC_ERROR_NONE; +} + +// Destructor for call_data. Used for both client and server filters. +static void destroy_call_elem(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, + const grpc_call_final_info* final_info, + void* and_free_memory) { + grpc_deadline_state_destroy(exec_ctx, elem); +} + +// Method for starting a call op for client filter. +static void client_start_transport_stream_op(grpc_exec_ctx* exec_ctx, + grpc_call_element* elem, + grpc_transport_stream_op* op) { + grpc_deadline_state_client_start_transport_stream_op(exec_ctx, elem, op); + // Chain to next filter. + grpc_call_next_op(exec_ctx, elem, op); +} + +// Callback for receiving initial metadata on the server. +static void recv_initial_metadata_ready(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { + grpc_call_element* elem = arg; + server_call_data* calld = elem->call_data; + // Get deadline from metadata and start the timer if needed. + start_timer_if_needed(exec_ctx, elem, calld->recv_initial_metadata->deadline); + // Invoke the next callback. + calld->next_recv_initial_metadata_ready->cb( + exec_ctx, calld->next_recv_initial_metadata_ready->cb_arg, error); +} + +// Method for starting a call op for server filter. +static void server_start_transport_stream_op(grpc_exec_ctx* exec_ctx, + grpc_call_element* elem, + grpc_transport_stream_op* op) { + server_call_data* calld = elem->call_data; + if (op->cancel_error != GRPC_ERROR_NONE || + op->close_error != GRPC_ERROR_NONE) { + cancel_timer_if_needed(exec_ctx, &calld->base.deadline_state); + } else { + // If we're receiving initial metadata, we need to get the deadline + // from the recv_initial_metadata_ready callback. So we inject our + // own callback into that hook. + if (op->recv_initial_metadata_ready != NULL) { + calld->next_recv_initial_metadata_ready = op->recv_initial_metadata_ready; + calld->recv_initial_metadata = op->recv_initial_metadata; + grpc_closure_init(&calld->recv_initial_metadata_ready, + recv_initial_metadata_ready, elem); + op->recv_initial_metadata_ready = &calld->recv_initial_metadata_ready; + } + // Make sure we know when the call is complete, so that we can cancel + // the timer. + // Note that we trigger this on recv_trailing_metadata, even though + // the client never sends trailing metadata, because this is the + // hook that tells us when the call is complete on the server side. + if (op->recv_trailing_metadata != NULL) { + inject_on_complete_cb(&calld->base.deadline_state, op); + } + } + // Chain to next filter. + grpc_call_next_op(exec_ctx, elem, op); +} + +const grpc_channel_filter grpc_client_deadline_filter = { + client_start_transport_stream_op, + grpc_channel_next_op, + sizeof(base_call_data), + init_call_elem, + grpc_call_stack_ignore_set_pollset_or_pollset_set, + destroy_call_elem, + 0, // sizeof(channel_data) + init_channel_elem, + destroy_channel_elem, + grpc_call_next_get_peer, + "deadline", +}; + +const grpc_channel_filter grpc_server_deadline_filter = { + server_start_transport_stream_op, + grpc_channel_next_op, + sizeof(server_call_data), + init_call_elem, + grpc_call_stack_ignore_set_pollset_or_pollset_set, + destroy_call_elem, + 0, // sizeof(channel_data) + init_channel_elem, + destroy_channel_elem, + grpc_call_next_get_peer, + "deadline", +}; diff --git a/src/core/lib/channel/deadline_filter.h b/src/core/lib/channel/deadline_filter.h new file mode 100644 index 0000000000000000000000000000000000000000..685df877617e0475c88d335ba6011f26b8b25e7c --- /dev/null +++ b/src/core/lib/channel/deadline_filter.h @@ -0,0 +1,79 @@ +// +// Copyright 2016, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +// + +#ifndef GRPC_CORE_LIB_CHANNEL_DEADLINE_FILTER_H +#define GRPC_CORE_LIB_CHANNEL_DEADLINE_FILTER_H + +#include "src/core/lib/channel/channel_stack.h" +#include "src/core/lib/iomgr/timer.h" + +// State used for filters that enforce call deadlines. +// Must be the first field in the filter's call_data. +typedef struct grpc_deadline_state { + // We take a reference to the call stack for the timer callback. + grpc_call_stack* call_stack; + // Guards access to timer_pending and timer. + gpr_mu timer_mu; + // True if the timer callback is currently pending. + bool timer_pending; + // The deadline timer. + grpc_timer timer; + // Closure to invoke when the call is complete. + // We use this to cancel the timer. + grpc_closure on_complete; + // The original on_complete closure, which we chain to after our own + // closure is invoked. + grpc_closure* next_on_complete; +} grpc_deadline_state; + +// To be used in a filter's init_call_elem(), destroy_call_elem(), and +// start_transport_stream_op() methods to enforce call deadlines. +// +// REQUIRES: The first field in elem->call_data is a grpc_deadline_state. +// +// For grpc_deadline_state_client_start_transport_stream_op(), it is the +// caller's responsibility to chain to the next filter if necessary +// after the function returns. +void grpc_deadline_state_init(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, + grpc_call_element_args* args); +void grpc_deadline_state_destroy(grpc_exec_ctx* exec_ctx, + grpc_call_element* elem); +void grpc_deadline_state_client_start_transport_stream_op( + grpc_exec_ctx* exec_ctx, grpc_call_element* elem, + grpc_transport_stream_op* op); + +// Deadline filters for direct client channels and server channels. +// Note: Deadlines for non-direct client channels are handled by the +// client_channel filter. +extern const grpc_channel_filter grpc_client_deadline_filter; +extern const grpc_channel_filter grpc_server_deadline_filter; + +#endif /* GRPC_CORE_LIB_CHANNEL_DEADLINE_FILTER_H */ diff --git a/src/core/lib/channel/http_client_filter.c b/src/core/lib/channel/http_client_filter.c index edcc741ff6f86c1e00747ba15a22cdd4d823398a..1dc05fb20d1a03b4f144b3f69db5c05136142fab 100644 --- a/src/core/lib/channel/http_client_filter.c +++ b/src/core/lib/channel/http_client_filter.c @@ -103,8 +103,8 @@ static grpc_mdelem *client_recv_filter(void *user_data, grpc_mdelem *md) { grpc_mdstr_as_c_string(md->value)); gpr_slice message = gpr_slice_from_copied_string(message_string); gpr_free(message_string); - grpc_call_element_send_cancel_with_message(a->exec_ctx, a->elem, - GRPC_STATUS_CANCELLED, &message); + grpc_call_element_send_close_with_message(a->exec_ctx, a->elem, + GRPC_STATUS_CANCELLED, &message); return NULL; } else if (md == GRPC_MDELEM_CONTENT_TYPE_APPLICATION_SLASH_GRPC) { return NULL; diff --git a/src/core/lib/channel/message_size_filter.c b/src/core/lib/channel/message_size_filter.c index 6613785deaaa2450e58f2a100a450995d1850271..02fc68fc3aec6ba25f746701fe50300f99648b17 100644 --- a/src/core/lib/channel/message_size_filter.c +++ b/src/core/lib/channel/message_size_filter.c @@ -40,8 +40,9 @@ #include "src/core/lib/channel/channel_args.h" +#define DEFAULT_MAX_SEND_MESSAGE_LENGTH -1 // Unlimited. // The protobuf library will (by default) start warning at 100 megs. -#define DEFAULT_MAX_MESSAGE_LENGTH (4 * 1024 * 1024) +#define DEFAULT_MAX_RECV_MESSAGE_LENGTH (4 * 1024 * 1024) typedef struct call_data { // Receive closures are chained: we inject this closure as the @@ -55,8 +56,8 @@ typedef struct call_data { } call_data; typedef struct channel_data { - size_t max_send_size; - size_t max_recv_size; + int max_send_size; + int max_recv_size; } channel_data; // Callback invoked when we receive a message. Here we check the max @@ -66,15 +67,15 @@ static void recv_message_ready(grpc_exec_ctx* exec_ctx, void* user_data, grpc_call_element* elem = user_data; call_data* calld = elem->call_data; channel_data* chand = elem->channel_data; - if (*calld->recv_message != NULL && - (*calld->recv_message)->length > chand->max_recv_size) { + if (*calld->recv_message != NULL && chand->max_recv_size >= 0 && + (*calld->recv_message)->length > (size_t)chand->max_recv_size) { char* message_string; - gpr_asprintf( - &message_string, "Received message larger than max (%u vs. %lu)", - (*calld->recv_message)->length, (unsigned long)chand->max_recv_size); + gpr_asprintf(&message_string, + "Received message larger than max (%u vs. %d)", + (*calld->recv_message)->length, chand->max_recv_size); gpr_slice message = gpr_slice_from_copied_string(message_string); gpr_free(message_string); - grpc_call_element_send_cancel_with_message( + grpc_call_element_send_close_with_message( exec_ctx, elem, GRPC_STATUS_INVALID_ARGUMENT, &message); } // Invoke the next callback. @@ -88,14 +89,14 @@ static void start_transport_stream_op(grpc_exec_ctx* exec_ctx, call_data* calld = elem->call_data; channel_data* chand = elem->channel_data; // Check max send message size. - if (op->send_message != NULL && - op->send_message->length > chand->max_send_size) { + if (op->send_message != NULL && chand->max_send_size >= 0 && + op->send_message->length > (size_t)chand->max_send_size) { char* message_string; - gpr_asprintf(&message_string, "Sent message larger than max (%u vs. %lu)", - op->send_message->length, (unsigned long)chand->max_send_size); + gpr_asprintf(&message_string, "Sent message larger than max (%u vs. %d)", + op->send_message->length, chand->max_send_size); gpr_slice message = gpr_slice_from_copied_string(message_string); gpr_free(message_string); - grpc_call_element_send_cancel_with_message( + grpc_call_element_send_close_with_message( exec_ctx, elem, GRPC_STATUS_INVALID_ARGUMENT, &message); } // Inject callback for receiving a message. @@ -130,19 +131,22 @@ static void init_channel_elem(grpc_exec_ctx* exec_ctx, GPR_ASSERT(!args->is_last); channel_data* chand = elem->channel_data; memset(chand, 0, sizeof(*chand)); - chand->max_send_size = DEFAULT_MAX_MESSAGE_LENGTH; - chand->max_recv_size = DEFAULT_MAX_MESSAGE_LENGTH; - const grpc_integer_options options = {DEFAULT_MAX_MESSAGE_LENGTH, 0, INT_MAX}; + chand->max_send_size = DEFAULT_MAX_SEND_MESSAGE_LENGTH; + chand->max_recv_size = DEFAULT_MAX_RECV_MESSAGE_LENGTH; for (size_t i = 0; i < args->channel_args->num_args; ++i) { if (strcmp(args->channel_args->args[i].key, GRPC_ARG_MAX_SEND_MESSAGE_LENGTH) == 0) { - chand->max_send_size = (size_t)grpc_channel_arg_get_integer( - &args->channel_args->args[i], options); + const grpc_integer_options options = {DEFAULT_MAX_SEND_MESSAGE_LENGTH, 0, + INT_MAX}; + chand->max_send_size = + grpc_channel_arg_get_integer(&args->channel_args->args[i], options); } if (strcmp(args->channel_args->args[i].key, GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH) == 0) { - chand->max_recv_size = (size_t)grpc_channel_arg_get_integer( - &args->channel_args->args[i], options); + const grpc_integer_options options = {DEFAULT_MAX_RECV_MESSAGE_LENGTH, 0, + INT_MAX}; + chand->max_recv_size = + grpc_channel_arg_get_integer(&args->channel_args->args[i], options); } } } diff --git a/src/core/lib/iomgr/error.c b/src/core/lib/iomgr/error.c index 45ef75e04df50e2722d1b365dec2df570dfe839b..38fd1e096057cab609bf8cfb0ec05af6c2955a06 100644 --- a/src/core/lib/iomgr/error.c +++ b/src/core/lib/iomgr/error.c @@ -324,6 +324,64 @@ const char *grpc_error_get_str(grpc_error *err, grpc_error_strs which) { return gpr_avl_get(err->strs, (void *)(uintptr_t)which); } +typedef struct { + grpc_error *error; + grpc_status_code code; + const char *msg; +} special_error_status_map; +static special_error_status_map error_status_map[] = { + {GRPC_ERROR_NONE, GRPC_STATUS_OK, ""}, + {GRPC_ERROR_CANCELLED, GRPC_STATUS_CANCELLED, "RPC cancelled"}, + {GRPC_ERROR_OOM, GRPC_STATUS_RESOURCE_EXHAUSTED, "Out of memory"}, +}; + +static grpc_error *recursively_find_error_with_status(grpc_error *error, + intptr_t *status) { + // If the error itself has a status code, return it. + if (grpc_error_get_int(error, GRPC_ERROR_INT_GRPC_STATUS, status)) { + return error; + } + // Otherwise, search through its children. + intptr_t key = 0; + while (true) { + grpc_error *child_error = gpr_avl_get(error->errs, (void *)key++); + if (child_error == NULL) break; + grpc_error *result = + recursively_find_error_with_status(child_error, status); + if (result != NULL) return result; + } + return NULL; +} + +void grpc_error_get_status(grpc_error *error, grpc_status_code *code, + const char **msg) { + // Handle special errors via the static map. + for (size_t i = 0; i < GPR_ARRAY_SIZE(error_status_map); ++i) { + if (error == error_status_map[i].error) { + *code = error_status_map[i].code; + *msg = error_status_map[i].msg; + return; + } + } + // Populate code. + // Start with the parent error and recurse through the tree of children + // until we find the first one that has a status code. + intptr_t status = GRPC_STATUS_UNKNOWN; // Default in case we don't find one. + grpc_error *found_error = recursively_find_error_with_status(error, &status); + *code = (grpc_status_code)status; + // Now populate msg. + // If we found an error with a status code above, use that; otherwise, + // fall back to using the parent error. + if (found_error == NULL) found_error = error; + // If the error has a status message, use it. Otherwise, fall back to + // the error description. + *msg = grpc_error_get_str(found_error, GRPC_ERROR_STR_GRPC_MESSAGE); + if (*msg == NULL) { + *msg = grpc_error_get_str(found_error, GRPC_ERROR_STR_DESCRIPTION); + if (*msg == NULL) *msg = "uknown error"; // Just in case. + } +} + grpc_error *grpc_error_add_child(grpc_error *src, grpc_error *child) { GPR_TIMER_BEGIN("grpc_error_add_child", 0); grpc_error *new = copy_error_and_unref(src); diff --git a/src/core/lib/iomgr/error.h b/src/core/lib/iomgr/error.h index 2ab3ef9f4025daa26e85e6267eddf8ca04bcf303..2d715500d1c6c30641cdbeb0d9e0dcd6d54eccc5 100644 --- a/src/core/lib/iomgr/error.h +++ b/src/core/lib/iomgr/error.h @@ -37,6 +37,7 @@ #include <stdbool.h> #include <stdint.h> +#include <grpc/status.h> #include <grpc/support/time.h> /// Opaque representation of an error. @@ -179,6 +180,13 @@ grpc_error *grpc_error_set_str(grpc_error *src, grpc_error_strs which, /// Returns NULL if the specified string is not set. /// Caller does NOT own return value. const char *grpc_error_get_str(grpc_error *error, grpc_error_strs which); + +/// A utility function to get the status code and message to be returned +/// to the application. If not set in the top-level message, looks +/// through child errors until it finds the first one with these attributes. +void grpc_error_get_status(grpc_error *error, grpc_status_code *code, + const char **msg); + /// Add a child error: an error that is believed to have contributed to this /// error occurring. Allows root causing high level errors from lower level /// errors that contributed to them. diff --git a/src/core/lib/iomgr/tcp_client_posix.c b/src/core/lib/iomgr/tcp_client_posix.c index 80c7a3f12873d195158d7238172f316ebfa0cbc5..3496b6094f256a4fab4b8db1f50c17b3143073b3 100644 --- a/src/core/lib/iomgr/tcp_client_posix.c +++ b/src/core/lib/iomgr/tcp_client_posix.c @@ -146,61 +146,57 @@ static void on_writable(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) { grpc_timer_cancel(exec_ctx, &ac->alarm); gpr_mu_lock(&ac->mu); - if (error == GRPC_ERROR_NONE) { - do { - so_error_size = sizeof(so_error); - err = getsockopt(grpc_fd_wrapped_fd(fd), SOL_SOCKET, SO_ERROR, &so_error, - &so_error_size); - } while (err < 0 && errno == EINTR); - if (err < 0) { - error = GRPC_OS_ERROR(errno, "getsockopt"); - goto finish; - } else if (so_error != 0) { - if (so_error == ENOBUFS) { - /* We will get one of these errors if we have run out of - memory in the kernel for the data structures allocated - when you connect a socket. If this happens it is very - likely that if we wait a little bit then try again the - connection will work (since other programs or this - program will close their network connections and free up - memory). This does _not_ indicate that there is anything - wrong with the server we are connecting to, this is a - local problem. - - If you are looking at this code, then chances are that - your program or another program on the same computer - opened too many network connections. The "easy" fix: - don't do that! */ - gpr_log(GPR_ERROR, "kernel out of buffers"); - gpr_mu_unlock(&ac->mu); - grpc_fd_notify_on_write(exec_ctx, fd, &ac->write_closure); - return; - } else { - switch (so_error) { - case ECONNREFUSED: - error = grpc_error_set_int(error, GRPC_ERROR_INT_ERRNO, errno); - error = grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR, - "Connection refused"); - break; - default: - error = GRPC_OS_ERROR(errno, "getsockopt(SO_ERROR)"); - break; - } - goto finish; - } - } else { - grpc_pollset_set_del_fd(exec_ctx, ac->interested_parties, fd); - *ep = grpc_tcp_create(fd, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, ac->addr_str); - fd = NULL; - goto finish; - } - } else { + if (error != GRPC_ERROR_NONE) { error = grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR, "Timeout occurred"); goto finish; } - GPR_UNREACHABLE_CODE(return ); + do { + so_error_size = sizeof(so_error); + err = getsockopt(grpc_fd_wrapped_fd(fd), SOL_SOCKET, SO_ERROR, &so_error, + &so_error_size); + } while (err < 0 && errno == EINTR); + if (err < 0) { + error = GRPC_OS_ERROR(errno, "getsockopt"); + goto finish; + } + + switch (so_error) { + case 0: + grpc_pollset_set_del_fd(exec_ctx, ac->interested_parties, fd); + *ep = grpc_tcp_create(fd, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, ac->addr_str); + fd = NULL; + break; + case ENOBUFS: + /* We will get one of these errors if we have run out of + memory in the kernel for the data structures allocated + when you connect a socket. If this happens it is very + likely that if we wait a little bit then try again the + connection will work (since other programs or this + program will close their network connections and free up + memory). This does _not_ indicate that there is anything + wrong with the server we are connecting to, this is a + local problem. + + If you are looking at this code, then chances are that + your program or another program on the same computer + opened too many network connections. The "easy" fix: + don't do that! */ + gpr_log(GPR_ERROR, "kernel out of buffers"); + gpr_mu_unlock(&ac->mu); + grpc_fd_notify_on_write(exec_ctx, fd, &ac->write_closure); + return; + case ECONNREFUSED: + /* This error shouldn't happen for anything other than connect(). */ + error = GRPC_OS_ERROR(so_error, "connect"); + break; + default: + /* We don't really know which syscall triggered the problem here, + so punt by reporting getsockopt(). */ + error = GRPC_OS_ERROR(so_error, "getsockopt(SO_ERROR)"); + break; + } finish: if (fd != NULL) { diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index 6b2badf71bd2f7b9ce22bd9e2a0c7dd59ada248a..601b0c8580955b38bbad97744db6b482cf32d120 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -126,8 +126,6 @@ struct grpc_call { /* client or server call */ bool is_client; - /* is the alarm set */ - bool have_alarm; /** has grpc_call_destroy been called */ bool destroy_called; /** flag indicating that cancellation is inherited */ @@ -170,9 +168,6 @@ struct grpc_call { /* Contexts for various subsystems (security, tracing, ...). */ grpc_call_context_element context[GRPC_CONTEXT_COUNT]; - /* Deadline alarm - if have_alarm is non-zero */ - grpc_timer alarm; - /* for the client, extra metadata is initial metadata; for the server, it's trailing metadata */ grpc_linked_mdelem send_extra_metadata[MAX_SEND_EXTRA_METADATA_COUNT]; @@ -215,8 +210,6 @@ struct grpc_call { #define CALL_FROM_TOP_ELEM(top_elem) \ CALL_FROM_CALL_STACK(grpc_call_stack_from_top_element(top_elem)) -static void set_deadline_alarm(grpc_exec_ctx *exec_ctx, grpc_call *call, - gpr_timespec deadline); static void execute_op(grpc_exec_ctx *exec_ctx, grpc_call *call, grpc_transport_stream_op *op); static grpc_call_error cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c, @@ -264,40 +257,9 @@ grpc_error *grpc_call_create(const grpc_call_create_args *args, call->metadata_batch[i][j].deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); } } - call->send_deadline = + gpr_timespec send_deadline = gpr_convert_clock_type(args->send_deadline, GPR_CLOCK_MONOTONIC); - GRPC_CHANNEL_INTERNAL_REF(args->channel, "call"); - /* initial refcount dropped by grpc_call_destroy */ - grpc_error *error = grpc_call_stack_init( - &exec_ctx, channel_stack, 1, destroy_call, call, call->context, - args->server_transport_data, CALL_STACK_FROM_CALL(call)); - if (error != GRPC_ERROR_NONE) { - intptr_t status; - if (!grpc_error_get_int(error, GRPC_ERROR_INT_GRPC_STATUS, &status)) { - status = GRPC_STATUS_UNKNOWN; - } - const char *error_str = - grpc_error_get_str(error, GRPC_ERROR_STR_DESCRIPTION); - close_with_status(&exec_ctx, call, (grpc_status_code)status, - error_str == NULL ? "unknown error" : error_str); - } - if (args->cq != NULL) { - GPR_ASSERT( - args->pollset_set_alternative == NULL && - "Only one of 'cq' and 'pollset_set_alternative' should be non-NULL."); - GRPC_CQ_INTERNAL_REF(args->cq, "bind"); - call->pollent = - grpc_polling_entity_create_from_pollset(grpc_cq_pollset(args->cq)); - } - if (args->pollset_set_alternative != NULL) { - call->pollent = grpc_polling_entity_create_from_pollset_set( - args->pollset_set_alternative); - } - if (!grpc_polling_entity_is_empty(&call->pollent)) { - grpc_call_stack_set_pollset_or_pollset_set( - &exec_ctx, CALL_STACK_FROM_CALL(call), &call->pollent); - } - gpr_timespec send_deadline = args->send_deadline; + if (args->parent_call != NULL) { GRPC_CALL_INTERNAL_REF(args->parent_call, "child"); GPR_ASSERT(call->is_client); @@ -339,10 +301,38 @@ grpc_error *grpc_call_create(const grpc_call_create_args *args, gpr_mu_unlock(&args->parent_call->mu); } - if (gpr_time_cmp(send_deadline, gpr_inf_future(send_deadline.clock_type)) != - 0) { - set_deadline_alarm(&exec_ctx, call, send_deadline); + + call->send_deadline = send_deadline; + + GRPC_CHANNEL_INTERNAL_REF(args->channel, "call"); + /* initial refcount dropped by grpc_call_destroy */ + grpc_error *error = grpc_call_stack_init( + &exec_ctx, channel_stack, 1, destroy_call, call, call->context, + args->server_transport_data, send_deadline, CALL_STACK_FROM_CALL(call)); + if (error != GRPC_ERROR_NONE) { + grpc_status_code status; + const char *error_str; + grpc_error_get_status(error, &status, &error_str); + close_with_status(&exec_ctx, call, status, error_str); + GRPC_ERROR_UNREF(error); } + if (args->cq != NULL) { + GPR_ASSERT( + args->pollset_set_alternative == NULL && + "Only one of 'cq' and 'pollset_set_alternative' should be non-NULL."); + GRPC_CQ_INTERNAL_REF(args->cq, "bind"); + call->pollent = + grpc_polling_entity_create_from_pollset(grpc_cq_pollset(args->cq)); + } + if (args->pollset_set_alternative != NULL) { + call->pollent = grpc_polling_entity_create_from_pollset_set( + args->pollset_set_alternative); + } + if (!grpc_polling_entity_is_empty(&call->pollent)) { + grpc_call_stack_set_pollset_or_pollset_set( + &exec_ctx, CALL_STACK_FROM_CALL(call), &call->pollent); + } + grpc_exec_ctx_finish(&exec_ctx); GPR_TIMER_END("grpc_call_create", 0); return error; @@ -455,20 +445,11 @@ static void set_status_details(grpc_call *call, status_source source, static void set_status_from_error(grpc_call *call, status_source source, grpc_error *error) { - intptr_t status; - if (grpc_error_get_int(error, GRPC_ERROR_INT_GRPC_STATUS, &status)) { - set_status_code(call, source, (uint32_t)status); - } else { - set_status_code(call, source, GRPC_STATUS_INTERNAL); - } - const char *msg = grpc_error_get_str(error, GRPC_ERROR_STR_GRPC_MESSAGE); - bool free_msg = false; - if (msg == NULL) { - free_msg = true; - msg = grpc_error_string(error); - } + grpc_status_code status; + const char *msg; + grpc_error_get_status(error, &status, &msg); + set_status_code(call, source, (uint32_t)status); set_status_details(call, source, grpc_mdstr_from_string(msg)); - if (free_msg) grpc_error_free_string(msg); } static void set_incoming_compression_algorithm( @@ -741,9 +722,6 @@ void grpc_call_destroy(grpc_call *c) { gpr_mu_lock(&c->mu); GPR_ASSERT(!c->destroy_called); c->destroy_called = 1; - if (c->have_alarm) { - grpc_timer_cancel(&exec_ctx, &c->alarm); - } cancel = !c->received_final_op; gpr_mu_unlock(&c->mu); if (cancel) grpc_call_cancel(c, NULL); @@ -781,7 +759,6 @@ typedef struct termination_closure { grpc_closure closure; grpc_call *call; grpc_error *error; - grpc_closure *op_closure; enum { TC_CANCEL, TC_CLOSE } type; grpc_transport_stream_op op; } termination_closure; @@ -798,7 +775,6 @@ static void done_termination(grpc_exec_ctx *exec_ctx, void *tcp, break; } GRPC_ERROR_UNREF(tc->error); - grpc_exec_ctx_sched(exec_ctx, tc->op_closure, GRPC_ERROR_NONE, NULL); gpr_free(tc); } @@ -818,7 +794,6 @@ static void send_close(grpc_exec_ctx *exec_ctx, void *tcp, grpc_error *error) { tc->op.close_error = tc->error; /* reuse closure to catch completion */ grpc_closure_init(&tc->closure, done_termination, tc); - tc->op_closure = tc->op.on_complete; tc->op.on_complete = &tc->closure; execute_op(exec_ctx, tc->call, &tc->op); } @@ -901,32 +876,6 @@ grpc_call *grpc_call_from_top_element(grpc_call_element *elem) { return CALL_FROM_TOP_ELEM(elem); } -static void call_alarm(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - grpc_call *call = arg; - gpr_mu_lock(&call->mu); - call->have_alarm = 0; - if (error != GRPC_ERROR_CANCELLED) { - cancel_with_status(exec_ctx, call, GRPC_STATUS_DEADLINE_EXCEEDED, - "Deadline Exceeded"); - } - gpr_mu_unlock(&call->mu); - GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "alarm"); -} - -static void set_deadline_alarm(grpc_exec_ctx *exec_ctx, grpc_call *call, - gpr_timespec deadline) { - if (call->have_alarm) { - gpr_log(GPR_ERROR, "Attempt to set deadline alarm twice"); - assert(0); - return; - } - GRPC_CALL_INTERNAL_REF(call, "alarm"); - call->have_alarm = 1; - call->send_deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC); - grpc_timer_init(exec_ctx, &call->alarm, call->send_deadline, call_alarm, call, - gpr_now(GPR_CLOCK_MONOTONIC)); -} - /* we offset status by a small amount when storing it into transport metadata as metadata cannot store a 0 value (which is used as OK for grpc_status_codes */ @@ -1280,9 +1229,8 @@ static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx, if (gpr_time_cmp(md->deadline, gpr_inf_future(md->deadline.clock_type)) != 0 && !call->is_client) { - GPR_TIMER_BEGIN("set_deadline_alarm", 0); - set_deadline_alarm(exec_ctx, call, md->deadline); - GPR_TIMER_END("set_deadline_alarm", 0); + call->send_deadline = + gpr_convert_clock_type(md->deadline, GPR_CLOCK_MONOTONIC); } } @@ -1311,9 +1259,17 @@ static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp, GRPC_ERROR_REF(error); gpr_mu_lock(&call->mu); + + // If the error has an associated status code, set the call's status. + intptr_t status; + if (error != GRPC_ERROR_NONE && + grpc_error_get_int(error, GRPC_ERROR_INT_GRPC_STATUS, &status)) { + set_status_from_error(call, STATUS_FROM_CORE, error); + } + if (bctl->send_initial_metadata) { if (error != GRPC_ERROR_NONE) { - set_status_code(call, STATUS_FROM_CORE, GRPC_STATUS_UNAVAILABLE); + set_status_from_error(call, STATUS_FROM_CORE, error); } grpc_metadata_batch_destroy( &call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */]); @@ -1331,9 +1287,6 @@ static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp, grpc_metadata_batch_filter(md, recv_trailing_filter, call); call->received_final_op = true; - if (call->have_alarm) { - grpc_timer_cancel(exec_ctx, &call->alarm); - } /* propagate cancellation to any interested children */ child_call = call->first_child; if (child_call != NULL) { diff --git a/src/core/lib/surface/completion_queue.h b/src/core/lib/surface/completion_queue.h index e9d840df777eb05485457c32dcbfe8ea735b5684..c1cafba5f2cb791cb1c8c1d8e465c72b6aad48b6 100644 --- a/src/core/lib/surface/completion_queue.h +++ b/src/core/lib/surface/completion_queue.h @@ -60,6 +60,8 @@ typedef struct grpc_cq_completion { uintptr_t next; } grpc_cq_completion; +//#define GRPC_CQ_REF_COUNT_DEBUG + #ifdef GRPC_CQ_REF_COUNT_DEBUG void grpc_cq_internal_ref(grpc_completion_queue *cc, const char *reason, const char *file, int line); diff --git a/src/core/lib/surface/init.c b/src/core/lib/surface/init.c index 3cbbaa7b0cee424bc2652e672ae7b7e5cf37f118..8ca0643ba7ef0ced0270d9f02b6e67a9c002eb55 100644 --- a/src/core/lib/surface/init.c +++ b/src/core/lib/surface/init.c @@ -43,6 +43,7 @@ #include "src/core/lib/channel/channel_stack.h" #include "src/core/lib/channel/compress_filter.h" #include "src/core/lib/channel/connected_channel.h" +#include "src/core/lib/channel/deadline_filter.h" #include "src/core/lib/channel/http_client_filter.h" #include "src/core/lib/channel/http_server_filter.h" #include "src/core/lib/channel/message_size_filter.h" @@ -99,6 +100,12 @@ static bool maybe_add_http_filter(grpc_channel_stack_builder *builder, } static void register_builtin_channel_init() { + grpc_channel_init_register_stage( + GRPC_CLIENT_DIRECT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, + prepend_filter, (void *)&grpc_client_deadline_filter); + grpc_channel_init_register_stage( + GRPC_SERVER_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, prepend_filter, + (void *)&grpc_server_deadline_filter); grpc_channel_init_register_stage( GRPC_CLIENT_SUBCHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, prepend_filter, (void *)&grpc_message_size_filter); diff --git a/src/core/lib/transport/transport.c b/src/core/lib/transport/transport.c index b95121813093147191868d65c93aa1a153d0ed61..75aec7a5b44a426fa6a64f0bde93f52ac6a3e85c 100644 --- a/src/core/lib/transport/transport.c +++ b/src/core/lib/transport/transport.c @@ -226,7 +226,7 @@ void grpc_transport_stream_op_add_cancellation_with_message( error = GRPC_ERROR_CREATE("Call cancelled"); } error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS, status); - add_error(op, &op->close_error, error); + add_error(op, &op->cancel_error, error); } void grpc_transport_stream_op_add_close(grpc_transport_stream_op *op, diff --git a/src/cpp/ext/reflection.grpc.pb.cc b/src/cpp/ext/reflection.grpc.pb.cc index b046dfc1b8bf4ca6794bcfb2fe46ce3cec86ee7d..8139c8ea16276f5db937c4e90ecb6130649f8618 100644 --- a/src/cpp/ext/reflection.grpc.pb.cc +++ b/src/cpp/ext/reflection.grpc.pb.cc @@ -32,7 +32,7 @@ */ -// Generated by the gRPC protobuf plugin. +// Generated by tools/codegen/extensions/gen_reflection_proto.sh // If you make any local change, they will be lost. // source: reflection.proto diff --git a/src/cpp/ext/reflection.pb.cc b/src/cpp/ext/reflection.pb.cc index a84494f9a98ed58860c21b80329845cbc25f57ec..b50465b9b59314c1e685ddcc21b272ea25c1b135 100644 --- a/src/cpp/ext/reflection.pb.cc +++ b/src/cpp/ext/reflection.pb.cc @@ -32,7 +32,7 @@ */ -// Generated by the protocol buffer compiler. DO NOT EDIT! +// Generated by tools/codegen/extensions/gen_reflection_proto.sh // source: reflection.proto #define INTERNAL_SUPPRESS_PROTOBUF_FIELD_DEPRECATION diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi index dae28e15f2cb9bea0766915fc0da6ad6cf781825..ad20c94de9954bf932fb2be8135e1e2cc35610ca 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi @@ -136,6 +136,7 @@ cdef extern from "grpc/grpc.h": const char *GRPC_ARG_ENABLE_CENSUS const char *GRPC_ARG_MAX_CONCURRENT_STREAMS const char *GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH + const char *GRPC_ARG_MAX_SEND_MESSAGE_LENGTH const char *GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER const char *GRPC_ARG_DEFAULT_AUTHORITY const char *GRPC_ARG_PRIMARY_USER_AGENT_STRING diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi index f444e33cf078dd09f5e27fd704cf23b0c3dcb14a..5a11a08f541b29d0f0a31ab0b2e05fbe4555ce9a 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi @@ -39,7 +39,8 @@ class ConnectivityState: class ChannelArgKey: enable_census = GRPC_ARG_ENABLE_CENSUS max_concurrent_streams = GRPC_ARG_MAX_CONCURRENT_STREAMS - max_message_length = GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH + max_receive_message_length = GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH + max_send_message_length = GRPC_ARG_MAX_SEND_MESSAGE_LENGTH http2_initial_sequence_number = GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER default_authority = GRPC_ARG_DEFAULT_AUTHORITY primary_user_agent_string = GRPC_ARG_PRIMARY_USER_AGENT_STRING diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index 6b635819325b09069300a9055ae749cdd0eaeafa..0c6a66e9c819f11e3a5be466e3f66e597129d919 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -83,6 +83,7 @@ CORE_SOURCE_FILES = [ 'src/core/lib/channel/channel_stack_builder.c', 'src/core/lib/channel/compress_filter.c', 'src/core/lib/channel/connected_channel.c', + 'src/core/lib/channel/deadline_filter.c', 'src/core/lib/channel/handshaker.c', 'src/core/lib/channel/http_client_filter.c', 'src/core/lib/channel/http_server_filter.c', diff --git a/test/core/channel/channel_stack_test.c b/test/core/channel/channel_stack_test.c index 569b3f7cd23babb473fcf8c2e4df9015f88b1820..b1c1ed9039e73303400c5626ce9568dd79c18319 100644 --- a/test/core/channel/channel_stack_test.c +++ b/test/core/channel/channel_stack_test.c @@ -135,9 +135,9 @@ static void test_create_channel_stack(void) { GPR_ASSERT(*channel_data == 0); call_stack = gpr_malloc(channel_stack->call_stack_size); - grpc_error *error = - grpc_call_stack_init(&exec_ctx, channel_stack, 1, free_call, call_stack, - NULL, NULL, call_stack); + grpc_error *error = grpc_call_stack_init( + &exec_ctx, channel_stack, 1, free_call, call_stack, NULL, NULL, + gpr_inf_future(GPR_CLOCK_MONOTONIC), call_stack); GPR_ASSERT(error == GRPC_ERROR_NONE); GPR_ASSERT(call_stack->count == 1); call_elem = grpc_call_stack_element(call_stack, 0); diff --git a/test/core/nanopb/fuzzer_response.c b/test/core/nanopb/fuzzer_response.c index 75a99faf3f34bff2dd786d224794f74c7379db40..a82f20df831f1af7993f19efb874b8298aaf5f97 100644 --- a/test/core/nanopb/fuzzer_response.c +++ b/test/core/nanopb/fuzzer_response.c @@ -41,7 +41,10 @@ bool squelch = true; bool leak_check = true; +static void dont_log(gpr_log_func_args *args) {} + int LLVMFuzzerTestOneInput(const uint8_t *data, size_t size) { + if (squelch) gpr_set_log_function(dont_log); gpr_slice slice = gpr_slice_from_copied_buffer((const char *)data, size); grpc_grpclb_initial_response *response; if ((response = grpc_grpclb_initial_response_parse(slice))) { diff --git a/test/core/nanopb/fuzzer_serverlist.c b/test/core/nanopb/fuzzer_serverlist.c index df2044d9077d18ac06aa1da26bc551638c8fa475..9700bf1cdafdc3872b6dcb1ac5a1371130d340b0 100644 --- a/test/core/nanopb/fuzzer_serverlist.c +++ b/test/core/nanopb/fuzzer_serverlist.c @@ -41,7 +41,10 @@ bool squelch = true; bool leak_check = true; +static void dont_log(gpr_log_func_args *args) {} + int LLVMFuzzerTestOneInput(const uint8_t *data, size_t size) { + if (squelch) gpr_set_log_function(dont_log); gpr_slice slice = gpr_slice_from_copied_buffer((const char *)data, size); grpc_grpclb_serverlist *serverlist; if ((serverlist = grpc_grpclb_response_parse_serverlist(slice))) { diff --git a/tools/codegen/extensions/gen_reflection_proto.sh b/tools/codegen/extensions/gen_reflection_proto.sh index bd8aac6a7b15b4689c06e2cc0d2100ab22a6b8f1..ea7689f7e88f5a77286c5b3552ea3dee9bdae463 100755 --- a/tools/codegen/extensions/gen_reflection_proto.sh +++ b/tools/codegen/extensions/gen_reflection_proto.sh @@ -29,20 +29,39 @@ # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +set -e +cd $(dirname $0)/../../.. + PROTO_DIR="src/proto/grpc/reflection/v1alpha" PROTO_FILE="reflection" HEADER_DIR="include/grpc++/ext" SRC_DIR="src/cpp/ext" INCLUDE_DIR="grpc++/ext" TMP_DIR="tmp" -GRPC_PLUGIN="bins/opt/grpc_cpp_plugin" -PROTOC="bins/opt/protobuf/protoc" -set -e +if hash grpc_cpp_plugin 2>/dev/null; then + GRPC_PLUGIN=$(which grpc_cpp_plugin) +else + if [ -f bins/opt/grpc_cpp_plugin ]; then + GRPC_PLUGIN="bins/opt/grpc_cpp_plugin" + else + echo "gRPC protoc plugin not found" + exit 1 + fi +fi -TMP_DIR=${TMP_DIR}_${PROTO_FILE} +if hash protoc 2>/dev/null; then + PROTOC=$(which protoc) +else + if [ -f bins/opt/protobuf/protoc ]; then + PROTOC="bins/opt/protobuf/protoc" + else + echo "protoc not found" + exit 1 + fi +fi -cd $(dirname $0)/../../.. +TMP_DIR=${TMP_DIR}_${PROTO_FILE} [ ! -d $HEADER_DIR ] && mkdir -p $HEADER_DIR || : [ ! -d $SRC_DIR ] && mkdir -p $SRC_DIR || : @@ -56,6 +75,9 @@ sed -i "s/\"${PROTO_FILE}.pb.h\"/<${INCLUDE_DIR/\//\\\/}\/${PROTO_FILE}.pb.h>/g" sed -i "s/\"${PROTO_FILE}.pb.h\"/<${INCLUDE_DIR/\//\\\/}\/${PROTO_FILE}.pb.h>/g" ${TMP_DIR}/${PROTO_FILE}.grpc.pb.cc sed -i "s/\"${PROTO_FILE}.grpc.pb.h\"/<${INCLUDE_DIR/\//\\\/}\/${PROTO_FILE}.grpc.pb.h>/g" ${TMP_DIR}/${PROTO_FILE}.grpc.pb.cc +sed -i "1s/.*/\/\/ Generated by tools\/codegen\/extensions\/gen_reflection_proto.sh/g" ${TMP_DIR}/*.pb.h +sed -i "1s/.*/\/\/ Generated by tools\/codegen\/extensions\/gen_reflection_proto.sh/g" ${TMP_DIR}/*.pb.cc + /bin/cp LICENSE ${TMP_DIR}/TMP_LICENSE sed -i -e "s/./ &/" -e "s/.*/ \*&/" ${TMP_DIR}/TMP_LICENSE sed -i -r "\$a\ *\n *\/\n\n" ${TMP_DIR}/TMP_LICENSE diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal index b34989937b5d84a5722a762f1db0b76553870967..239dad51fd1ef632cfb383166c9ea4bb229e567e 100644 --- a/tools/doxygen/Doxyfile.c++.internal +++ b/tools/doxygen/Doxyfile.c++.internal @@ -876,6 +876,7 @@ src/core/lib/channel/channel_stack_builder.h \ src/core/lib/channel/compress_filter.h \ src/core/lib/channel/connected_channel.h \ src/core/lib/channel/context.h \ +src/core/lib/channel/deadline_filter.h \ src/core/lib/channel/handshaker.h \ src/core/lib/channel/http_client_filter.h \ src/core/lib/channel/http_server_filter.h \ @@ -989,6 +990,7 @@ src/core/lib/channel/channel_stack.c \ src/core/lib/channel/channel_stack_builder.c \ src/core/lib/channel/compress_filter.c \ src/core/lib/channel/connected_channel.c \ +src/core/lib/channel/deadline_filter.c \ src/core/lib/channel/handshaker.c \ src/core/lib/channel/http_client_filter.c \ src/core/lib/channel/http_server_filter.c \ diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal index d8e73c5860b1dddf310cb0aaa14257e5a430a7e6..2c63f36729cec538f16dc8bedb4887aeb3fbfe00 100644 --- a/tools/doxygen/Doxyfile.core.internal +++ b/tools/doxygen/Doxyfile.core.internal @@ -792,6 +792,7 @@ src/core/lib/channel/channel_stack_builder.h \ src/core/lib/channel/compress_filter.h \ src/core/lib/channel/connected_channel.h \ src/core/lib/channel/context.h \ +src/core/lib/channel/deadline_filter.h \ src/core/lib/channel/handshaker.h \ src/core/lib/channel/http_client_filter.h \ src/core/lib/channel/http_server_filter.h \ @@ -954,6 +955,7 @@ src/core/lib/channel/channel_stack.c \ src/core/lib/channel/channel_stack_builder.c \ src/core/lib/channel/compress_filter.c \ src/core/lib/channel/connected_channel.c \ +src/core/lib/channel/deadline_filter.c \ src/core/lib/channel/handshaker.c \ src/core/lib/channel/http_client_filter.c \ src/core/lib/channel/http_server_filter.c \ diff --git a/tools/run_tests/sources_and_headers.json b/tools/run_tests/sources_and_headers.json index 94615a81307f56bf1be2735f6bb2649e5d488ade..761ba5ac682b00065e8063eb479216ade6fc94fb 100644 --- a/tools/run_tests/sources_and_headers.json +++ b/tools/run_tests/sources_and_headers.json @@ -5978,6 +5978,7 @@ "src/core/lib/channel/compress_filter.h", "src/core/lib/channel/connected_channel.h", "src/core/lib/channel/context.h", + "src/core/lib/channel/deadline_filter.h", "src/core/lib/channel/handshaker.h", "src/core/lib/channel/http_client_filter.h", "src/core/lib/channel/http_server_filter.h", @@ -6075,6 +6076,8 @@ "src/core/lib/channel/connected_channel.c", "src/core/lib/channel/connected_channel.h", "src/core/lib/channel/context.h", + "src/core/lib/channel/deadline_filter.c", + "src/core/lib/channel/deadline_filter.h", "src/core/lib/channel/handshaker.c", "src/core/lib/channel/handshaker.h", "src/core/lib/channel/http_client_filter.c", diff --git a/vsprojects/vcxproj/grpc++/grpc++.vcxproj b/vsprojects/vcxproj/grpc++/grpc++.vcxproj index 0b1784f1f632755b5fb7e6c3d0aad001ff5c6c3c..7f9fdb234651cf99ec272865bc592d97692a0d22 100644 --- a/vsprojects/vcxproj/grpc++/grpc++.vcxproj +++ b/vsprojects/vcxproj/grpc++/grpc++.vcxproj @@ -376,6 +376,7 @@ <ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\compress_filter.h" /> <ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\connected_channel.h" /> <ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\context.h" /> + <ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\deadline_filter.h" /> <ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\handshaker.h" /> <ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\http_client_filter.h" /> <ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\http_server_filter.h" /> @@ -529,6 +530,8 @@ </ClCompile> <ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\connected_channel.c"> </ClCompile> + <ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\deadline_filter.c"> + </ClCompile> <ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\handshaker.c"> </ClCompile> <ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\http_client_filter.c"> diff --git a/vsprojects/vcxproj/grpc++/grpc++.vcxproj.filters b/vsprojects/vcxproj/grpc++/grpc++.vcxproj.filters index 31930fc62f19cca871a923a70d6ccf0d16165dcb..9b6cfb4000b64887fb7b752babc7ccf2fe703b64 100644 --- a/vsprojects/vcxproj/grpc++/grpc++.vcxproj.filters +++ b/vsprojects/vcxproj/grpc++/grpc++.vcxproj.filters @@ -115,6 +115,9 @@ <ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\connected_channel.c"> <Filter>src\core\lib\channel</Filter> </ClCompile> + <ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\deadline_filter.c"> + <Filter>src\core\lib\channel</Filter> + </ClCompile> <ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\handshaker.c"> <Filter>src\core\lib\channel</Filter> </ClCompile> @@ -725,6 +728,9 @@ <ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\context.h"> <Filter>src\core\lib\channel</Filter> </ClInclude> + <ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\deadline_filter.h"> + <Filter>src\core\lib\channel</Filter> + </ClInclude> <ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\handshaker.h"> <Filter>src\core\lib\channel</Filter> </ClInclude> diff --git a/vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj b/vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj index 6ccb7a79025452aa3d8b3a48352006f53947d76e..b0bbccabb3f0f37fabcce5f42e35e49060e1a7b3 100644 --- a/vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj +++ b/vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj @@ -372,6 +372,7 @@ <ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\compress_filter.h" /> <ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\connected_channel.h" /> <ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\context.h" /> + <ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\deadline_filter.h" /> <ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\handshaker.h" /> <ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\http_client_filter.h" /> <ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\http_server_filter.h" /> @@ -515,6 +516,8 @@ </ClCompile> <ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\connected_channel.c"> </ClCompile> + <ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\deadline_filter.c"> + </ClCompile> <ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\handshaker.c"> </ClCompile> <ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\http_client_filter.c"> diff --git a/vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj.filters b/vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj.filters index 54acbe2e472f9c16a398fafb87ffbf5c5a0de4f4..9a48ebf78ae479c98bcb48aebfb0cbb616ce1bd7 100644 --- a/vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj.filters +++ b/vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj.filters @@ -100,6 +100,9 @@ <ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\connected_channel.c"> <Filter>src\core\lib\channel</Filter> </ClCompile> + <ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\deadline_filter.c"> + <Filter>src\core\lib\channel</Filter> + </ClCompile> <ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\handshaker.c"> <Filter>src\core\lib\channel</Filter> </ClCompile> @@ -698,6 +701,9 @@ <ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\context.h"> <Filter>src\core\lib\channel</Filter> </ClInclude> + <ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\deadline_filter.h"> + <Filter>src\core\lib\channel</Filter> + </ClInclude> <ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\handshaker.h"> <Filter>src\core\lib\channel</Filter> </ClInclude> diff --git a/vsprojects/vcxproj/grpc/grpc.vcxproj b/vsprojects/vcxproj/grpc/grpc.vcxproj index 1b5891d6c9842a4f8f843c8f989a382d5337803a..21561f0057360eb725f6140468b1ab3d3a71a7f8 100644 --- a/vsprojects/vcxproj/grpc/grpc.vcxproj +++ b/vsprojects/vcxproj/grpc/grpc.vcxproj @@ -301,6 +301,7 @@ <ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\compress_filter.h" /> <ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\connected_channel.h" /> <ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\context.h" /> + <ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\deadline_filter.h" /> <ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\handshaker.h" /> <ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\http_client_filter.h" /> <ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\http_server_filter.h" /> @@ -471,6 +472,8 @@ </ClCompile> <ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\connected_channel.c"> </ClCompile> + <ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\deadline_filter.c"> + </ClCompile> <ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\handshaker.c"> </ClCompile> <ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\http_client_filter.c"> diff --git a/vsprojects/vcxproj/grpc/grpc.vcxproj.filters b/vsprojects/vcxproj/grpc/grpc.vcxproj.filters index 1499618240260112467604b7b52fa3c6cf761512..ea61c438593bbe610d3f3ef90e80228d62d35edc 100644 --- a/vsprojects/vcxproj/grpc/grpc.vcxproj.filters +++ b/vsprojects/vcxproj/grpc/grpc.vcxproj.filters @@ -19,6 +19,9 @@ <ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\connected_channel.c"> <Filter>src\core\lib\channel</Filter> </ClCompile> + <ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\deadline_filter.c"> + <Filter>src\core\lib\channel</Filter> + </ClCompile> <ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\handshaker.c"> <Filter>src\core\lib\channel</Filter> </ClCompile> @@ -680,6 +683,9 @@ <ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\context.h"> <Filter>src\core\lib\channel</Filter> </ClInclude> + <ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\deadline_filter.h"> + <Filter>src\core\lib\channel</Filter> + </ClInclude> <ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\handshaker.h"> <Filter>src\core\lib\channel</Filter> </ClInclude> diff --git a/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj b/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj index 0abce838025e12f4c74bf4394536dafdb21f071b..bb5a56271f4cb2793c8a11aa6d85404528bd4b48 100644 --- a/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj +++ b/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj @@ -193,6 +193,7 @@ <ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\compress_filter.h" /> <ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\connected_channel.h" /> <ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\context.h" /> + <ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\deadline_filter.h" /> <ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\handshaker.h" /> <ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\http_client_filter.h" /> <ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\http_server_filter.h" /> @@ -316,6 +317,8 @@ </ClCompile> <ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\connected_channel.c"> </ClCompile> + <ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\deadline_filter.c"> + </ClCompile> <ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\handshaker.c"> </ClCompile> <ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\http_client_filter.c"> diff --git a/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj.filters b/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj.filters index 582fb1447ea82577fe7b25cf852618113759ede1..39188a409b647b78eb79e68562fcf9d078696585 100644 --- a/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj.filters +++ b/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj.filters @@ -70,6 +70,9 @@ <ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\connected_channel.c"> <Filter>src\core\lib\channel</Filter> </ClCompile> + <ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\deadline_filter.c"> + <Filter>src\core\lib\channel</Filter> + </ClCompile> <ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\handshaker.c"> <Filter>src\core\lib\channel</Filter> </ClCompile> @@ -461,6 +464,9 @@ <ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\context.h"> <Filter>src\core\lib\channel</Filter> </ClInclude> + <ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\deadline_filter.h"> + <Filter>src\core\lib\channel</Filter> + </ClInclude> <ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\handshaker.h"> <Filter>src\core\lib\channel</Filter> </ClInclude> diff --git a/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj b/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj index 66824be53031b25f55cb4f5fc1ffc421a6fb4d71..efe285640a54a9a4e6f9caf5cfffa70e50eda41f 100644 --- a/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj +++ b/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj @@ -291,6 +291,7 @@ <ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\compress_filter.h" /> <ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\connected_channel.h" /> <ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\context.h" /> + <ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\deadline_filter.h" /> <ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\handshaker.h" /> <ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\http_client_filter.h" /> <ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\http_server_filter.h" /> @@ -439,6 +440,8 @@ </ClCompile> <ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\connected_channel.c"> </ClCompile> + <ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\deadline_filter.c"> + </ClCompile> <ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\handshaker.c"> </ClCompile> <ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\http_client_filter.c"> diff --git a/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters b/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters index db77774d00786b1f8f6067bdf50b6d43533957c3..911cf112cf6762c0882ec53b0855c037a560aec8 100644 --- a/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters +++ b/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters @@ -22,6 +22,9 @@ <ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\connected_channel.c"> <Filter>src\core\lib\channel</Filter> </ClCompile> + <ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\deadline_filter.c"> + <Filter>src\core\lib\channel</Filter> + </ClCompile> <ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\handshaker.c"> <Filter>src\core\lib\channel</Filter> </ClCompile> @@ -590,6 +593,9 @@ <ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\context.h"> <Filter>src\core\lib\channel</Filter> </ClInclude> + <ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\deadline_filter.h"> + <Filter>src\core\lib\channel</Filter> + </ClInclude> <ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\handshaker.h"> <Filter>src\core\lib\channel</Filter> </ClInclude>