diff --git a/BUILD b/BUILD index 083639c856bf7dade8a465d801c7598467cf38a8..5c2e2f976a357ed74ada1bc99ae76b45e9635095 100644 --- a/BUILD +++ b/BUILD @@ -1543,6 +1543,7 @@ grpc_cc_library( ":grpc++", "//src/proto/grpc/reflection/v1alpha:reflection_proto", ], + alwayslink = 1, ) grpc_cc_library( diff --git a/bazel/grpc_build_system.bzl b/bazel/grpc_build_system.bzl index 8057f27a12305a4249d2331b4b735e39c6bb9ca5..f793cae56d1d901a55271b39c0bbd1a0e0231d4a 100644 --- a/bazel/grpc_build_system.bzl +++ b/bazel/grpc_build_system.bzl @@ -25,7 +25,8 @@ def grpc_cc_library(name, srcs = [], public_hdrs = [], hdrs = [], external_deps = [], deps = [], standalone = False, - language = "C++", testonly = False, visibility = None): + language = "C++", testonly = False, visibility = None, + alwayslink = 0): copts = [] if language.upper() == "C": copts = ["-std=c99"] @@ -40,7 +41,8 @@ def grpc_cc_library(name, srcs = [], public_hdrs = [], hdrs = [], linkopts = ["-pthread"], includes = [ "include" - ] + ], + alwayslink = alwayslink, ) def grpc_proto_plugin(name, srcs = [], deps = []): diff --git a/src/compiler/php_generator.cc b/src/compiler/php_generator.cc index 6d34761fdfd6ed6e0c57de6b0a026afef3d1504b..38ec46e656ef7b37e7bf01433fca5823694b01c6 100644 --- a/src/compiler/php_generator.cc +++ b/src/compiler/php_generator.cc @@ -97,13 +97,14 @@ void PrintMethod(const MethodDescriptor *method, Printer *out) { } // Prints out the service descriptor object -void PrintService(const ServiceDescriptor *service, Printer *out) { +void PrintService(const ServiceDescriptor *service, + const grpc::string ¶meter, Printer *out) { map<grpc::string, grpc::string> vars; out->Print("/**\n"); out->Print(GetPHPComments(service, " *").c_str()); out->Print(" */\n"); - vars["name"] = service->name(); - out->Print(vars, "class $name$Client extends \\Grpc\\BaseStub {\n\n"); + vars["name"] = GetPHPServiceClassname(service, parameter); + out->Print(vars, "class $name$ extends \\Grpc\\BaseStub {\n\n"); out->Indent(); out->Indent(); out->Print( @@ -131,7 +132,8 @@ void PrintService(const ServiceDescriptor *service, Printer *out) { } grpc::string GenerateFile(const FileDescriptor *file, - const ServiceDescriptor *service) { + const ServiceDescriptor *service, + const grpc::string ¶meter) { grpc::string output; { StringOutputStream output_stream(&output); @@ -150,7 +152,7 @@ grpc::string GenerateFile(const FileDescriptor *file, vars["package"] = MessageIdentifierName(file->package()); out.Print(vars, "namespace $package$;\n\n"); - PrintService(service, &out); + PrintService(service, parameter, &out); } return output; } diff --git a/src/compiler/php_generator.h b/src/compiler/php_generator.h index 4518bc24f9132b6495de5db3727aea6a1dc7d01b..9a04bd33d7027f9c46521bc200dc1089e5705d90 100644 --- a/src/compiler/php_generator.h +++ b/src/compiler/php_generator.h @@ -24,7 +24,8 @@ namespace grpc_php_generator { grpc::string GenerateFile(const grpc::protobuf::FileDescriptor *file, - const grpc::protobuf::ServiceDescriptor *service); + const grpc::protobuf::ServiceDescriptor *service, + const grpc::string ¶meter); } // namespace grpc_php_generator diff --git a/src/compiler/php_generator_helpers.h b/src/compiler/php_generator_helpers.h index 3a5c08b3e69946854afbc0f2dd30d7a95a85a7ee..5edebf6290adac9c939b8dda586ea1ddc87de13d 100644 --- a/src/compiler/php_generator_helpers.h +++ b/src/compiler/php_generator_helpers.h @@ -26,9 +26,22 @@ namespace grpc_php_generator { +inline grpc::string GetPHPServiceClassname( + const grpc::protobuf::ServiceDescriptor *service, + const grpc::string ¶meter) { + grpc::string suffix; + if (parameter == "") { + suffix = "Client"; + } else { + suffix = parameter; + } + return service->name() + suffix; +} + inline grpc::string GetPHPServiceFilename( const grpc::protobuf::FileDescriptor *file, - const grpc::protobuf::ServiceDescriptor *service) { + const grpc::protobuf::ServiceDescriptor *service, + const grpc::string ¶meter) { std::vector<grpc::string> tokens = grpc_generator::tokenize(file->package(), "."); std::ostringstream oss; @@ -36,7 +49,7 @@ inline grpc::string GetPHPServiceFilename( oss << (i == 0 ? "" : "/") << grpc_generator::CapitalizeFirstLetter(tokens[i]); } - return oss.str() + "/" + service->name() + "Client.php"; + return oss.str() + "/" + GetPHPServiceClassname(service, parameter) + ".php"; } // ReplaceAll replaces all instances of search with replace in s. diff --git a/src/compiler/php_plugin.cc b/src/compiler/php_plugin.cc index 7a581fd7bcb74d6f1b98fcab2777eea57d173579..bbe91656d5e4e4bb7031eb1cf5e27bffaedeb35b 100644 --- a/src/compiler/php_plugin.cc +++ b/src/compiler/php_plugin.cc @@ -41,10 +41,11 @@ class PHPGrpcGenerator : public grpc::protobuf::compiler::CodeGenerator { } for (int i = 0; i < file->service_count(); i++) { - grpc::string code = GenerateFile(file, file->service(i)); + grpc::string code = GenerateFile(file, file->service(i), parameter); // Get output file name - grpc::string file_name = GetPHPServiceFilename(file, file->service(i)); + grpc::string file_name = + GetPHPServiceFilename(file, file->service(i), parameter); std::unique_ptr<grpc::protobuf::io::ZeroCopyOutputStream> output( context->Open(file_name)); diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c index cccc3e871a657609fbbee49fba13ee1cb197d3b7..fdb18f687f34156c7537118ae6316234ba49812d 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c @@ -1705,7 +1705,6 @@ static void lb_on_server_status_received_locked(grpc_exec_ctx *exec_ctx, static void glb_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, const grpc_lb_policy_args *args) { glb_lb_policy *glb_policy = (glb_lb_policy *)policy; - if (glb_policy->updating_lb_channel) { if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { gpr_log(GPR_INFO, @@ -1813,9 +1812,11 @@ static void glb_lb_channel_on_connectivity_changed_cb(grpc_exec_ctx *exec_ctx, // lb_on_server_status_received will pick up the cancel and reinit // lb_call. if (glb_policy->pending_update_args != NULL) { - const grpc_lb_policy_args *args = glb_policy->pending_update_args; + grpc_lb_policy_args *args = glb_policy->pending_update_args; glb_policy->pending_update_args = NULL; glb_update_locked(exec_ctx, &glb_policy->base, args); + grpc_channel_args_destroy(exec_ctx, args->args); + gpr_free(args); } } else if (glb_policy->started_picking && !glb_policy->shutting_down) { if (glb_policy->retry_timer_active) { diff --git a/src/core/ext/transport/inproc/inproc_transport.c b/src/core/ext/transport/inproc/inproc_transport.c index 4df64d81e2be7c47c672418609b7ce6492a3bbac..14498021ebdd0063ba15841474db961d947a9174 100644 --- a/src/core/ext/transport/inproc/inproc_transport.c +++ b/src/core/ext/transport/inproc/inproc_transport.c @@ -190,8 +190,11 @@ typedef struct inproc_stream { static bool inproc_slice_byte_stream_next(grpc_exec_ctx *exec_ctx, grpc_byte_stream *bs, size_t max, grpc_closure *on_complete) { - inproc_slice_byte_stream *stream = (inproc_slice_byte_stream *)bs; - return (stream->le->sb.count != 0); + // Because inproc transport always provides the entire message atomically, + // the byte stream always has data available when this function is called. + // Thus, this function always returns true (unlike other transports) and + // there is never any need to schedule a closure + return true; } static grpc_error *inproc_slice_byte_stream_pull(grpc_exec_ctx *exec_ctx, diff --git a/src/core/lib/iomgr/sockaddr_utils.c b/src/core/lib/iomgr/sockaddr_utils.c index 99dc2f1c78a355ecbec8d7fda092d1669ae76bea..3f4145d104e13f359cf2f68de86a93e1af96781d 100644 --- a/src/core/lib/iomgr/sockaddr_utils.c +++ b/src/core/lib/iomgr/sockaddr_utils.c @@ -220,6 +220,11 @@ const char *grpc_sockaddr_get_uri_scheme( return NULL; } +int grpc_sockaddr_get_family(const grpc_resolved_address *resolved_addr) { + const struct sockaddr *addr = (const struct sockaddr *)resolved_addr->addr; + return addr->sa_family; +} + int grpc_sockaddr_get_port(const grpc_resolved_address *resolved_addr) { const struct sockaddr *addr = (const struct sockaddr *)resolved_addr->addr; switch (addr->sa_family) { diff --git a/src/core/lib/iomgr/sockaddr_utils.h b/src/core/lib/iomgr/sockaddr_utils.h index 7692b969f2b2d0cc62e6a7c132254a4e9c9a7fe6..a589a1970547d0480a36b78a00f7141853cf039a 100644 --- a/src/core/lib/iomgr/sockaddr_utils.h +++ b/src/core/lib/iomgr/sockaddr_utils.h @@ -75,4 +75,6 @@ char *grpc_sockaddr_to_uri(const grpc_resolved_address *addr); /* Returns the URI scheme corresponding to \a addr */ const char *grpc_sockaddr_get_uri_scheme(const grpc_resolved_address *addr); +int grpc_sockaddr_get_family(const grpc_resolved_address *resolved_addr); + #endif /* GRPC_CORE_LIB_IOMGR_SOCKADDR_UTILS_H */ diff --git a/src/core/lib/iomgr/tcp_server_uv.c b/src/core/lib/iomgr/tcp_server_uv.c index 2ab836cc34d29428097ddb72340390c4d82a5fc7..079c91357960d52c8cf007b2a46736224caeb8de 100644 --- a/src/core/lib/iomgr/tcp_server_uv.c +++ b/src/core/lib/iomgr/tcp_server_uv.c @@ -316,6 +316,7 @@ grpc_error *grpc_tcp_server_add_port(grpc_tcp_server *s, unsigned port_index = 0; int status; grpc_error *error = GRPC_ERROR_NONE; + int family; if (s->tail != NULL) { port_index = s->tail->port_index + 1; @@ -353,7 +354,18 @@ grpc_error *grpc_tcp_server_add_port(grpc_tcp_server *s, } handle = gpr_malloc(sizeof(uv_tcp_t)); - status = uv_tcp_init(uv_default_loop(), handle); + + family = grpc_sockaddr_get_family(addr); + status = uv_tcp_init_ex(uv_default_loop(), handle, (unsigned int)family); +#if defined(GPR_LINUX) && defined(SO_REUSEPORT) + if (family == AF_INET || family == AF_INET6) { + int fd; + uv_fileno((uv_handle_t *)handle, &fd); + int enable = 1; + setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &enable, sizeof(enable)); + } +#endif /* GPR_LINUX && SO_REUSEPORT */ + if (status == 0) { error = add_socket_to_server(s, handle, addr, port_index, &sp); } else { diff --git a/src/core/lib/support/env.h b/src/core/lib/support/env.h index 18bc08ac62a9f7702ddea6c0e6550c89848cd07c..e2c012a728887f2e6450e8089aafd29d3f591826 100644 --- a/src/core/lib/support/env.h +++ b/src/core/lib/support/env.h @@ -36,6 +36,12 @@ char *gpr_getenv(const char *name); /* Sets the the environment with the specified name to the specified value. */ void gpr_setenv(const char *name, const char *value); +/* This is a version of gpr_getenv that does not produce any output if it has to + use an insecure version of the function. It is ONLY to be used to solve the + problem in which we need to check an env variable to configure the verbosity + level of logging. So DO NOT USE THIS. */ +const char *gpr_getenv_silent(const char *name, char **dst); + #ifdef __cplusplus } #endif diff --git a/src/core/lib/support/env_linux.c b/src/core/lib/support/env_linux.c index 0c79a2c4014c9892f968ddec8d5ad450afd7bd40..4c45a977caa316ea5862acbc7027c2cdcbd27b41 100644 --- a/src/core/lib/support/env_linux.c +++ b/src/core/lib/support/env_linux.c @@ -38,7 +38,9 @@ #include "src/core/lib/support/string.h" -char *gpr_getenv(const char *name) { +const char *gpr_getenv_silent(const char *name, char **dst) { + const char *insecure_func_used = NULL; + char *result = NULL; #if defined(GPR_BACKWARDS_COMPATIBILITY_MODE) typedef char *(*getenv_type)(const char *); static getenv_type getenv_func = NULL; @@ -48,22 +50,28 @@ char *gpr_getenv(const char *name) { for (size_t i = 0; getenv_func == NULL && i < GPR_ARRAY_SIZE(names); i++) { getenv_func = (getenv_type)dlsym(RTLD_DEFAULT, names[i]); if (getenv_func != NULL && strstr(names[i], "secure") == NULL) { - gpr_log(GPR_DEBUG, - "Warning: insecure environment read function '%s' used", - names[i]); + insecure_func_used = names[i]; } } - char *result = getenv_func(name); - return result == NULL ? result : gpr_strdup(result); + result = getenv_func(name); #elif __GLIBC__ > 2 || (__GLIBC__ == 2 && __GLIBC_MINOR__ >= 17) - char *result = secure_getenv(name); - return result == NULL ? result : gpr_strdup(result); + result = secure_getenv(name); #else - gpr_log(GPR_DEBUG, "Warning: insecure environment read function '%s' used", - "getenv"); - char *result = getenv(name); - return result == NULL ? result : gpr_strdup(result); + result = getenv(name); + insecure_func_used = "getenv"; #endif + *dst = result == NULL ? result : gpr_strdup(result); + return insecure_func_used; +} + +char *gpr_getenv(const char *name) { + char *result = NULL; + const char *insecure_func_used = gpr_getenv_silent(name, &result); + if (insecure_func_used != NULL) { + gpr_log(GPR_DEBUG, "Warning: insecure environment read function '%s' used", + insecure_func_used); + } + return result; } void gpr_setenv(const char *name, const char *value) { diff --git a/src/core/lib/support/env_posix.c b/src/core/lib/support/env_posix.c index bdbc4da95a58f26d76d71b402ee8c0e5c62ac80c..b88822ca025f07c8144efa19a42a338db9489e53 100644 --- a/src/core/lib/support/env_posix.c +++ b/src/core/lib/support/env_posix.c @@ -29,6 +29,11 @@ #include <grpc/support/string_util.h> #include "src/core/lib/support/string.h" +const char *gpr_getenv_silent(const char *name, char **dst) { + *dst = gpr_getenv(name); + return NULL; +} + char *gpr_getenv(const char *name) { char *result = getenv(name); return result == NULL ? result : gpr_strdup(result); diff --git a/src/core/lib/support/env_windows.c b/src/core/lib/support/env_windows.c index c1d557e2199f795a871a3b83387718aa11cb3f9e..652eeb61c6740dd95e4be2647446bf08d5174610 100644 --- a/src/core/lib/support/env_windows.c +++ b/src/core/lib/support/env_windows.c @@ -30,6 +30,11 @@ #include <grpc/support/log.h> #include <grpc/support/string_util.h> +const char *gpr_getenv_silent(const char *name, char **dst) { + *dst = gpr_getenv(name); + return NULL; +} + char *gpr_getenv(const char *name) { char *result = NULL; DWORD size; diff --git a/src/core/lib/support/log.c b/src/core/lib/support/log.c index bcc336b8ae7a8e5a753369b12673da311c27aa70..fadb4d9a2cf61f92c66fdff6a63f801eb1362e28 100644 --- a/src/core/lib/support/log.c +++ b/src/core/lib/support/log.c @@ -64,7 +64,8 @@ void gpr_set_log_verbosity(gpr_log_severity min_severity_to_print) { } void gpr_log_verbosity_init() { - char *verbosity = gpr_getenv("GRPC_VERBOSITY"); + char *verbosity = NULL; + const char *insecure_getenv = gpr_getenv_silent("GRPC_VERBOSITY", &verbosity); gpr_atm min_severity_to_print = GPR_LOG_SEVERITY_ERROR; if (verbosity != NULL) { @@ -81,6 +82,11 @@ void gpr_log_verbosity_init() { GPR_LOG_VERBOSITY_UNSET) { gpr_atm_no_barrier_store(&g_min_severity_to_print, min_severity_to_print); } + + if (insecure_getenv != NULL) { + gpr_log(GPR_DEBUG, "Warning: insecure environment read function '%s' used", + insecure_getenv); + } } void gpr_set_log_function(gpr_log_func f) { diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc index c90f96c0b721832d140b4913db4482826a20f2ac..200e477822c40c8ff3dbd6a1723b756e948543c8 100644 --- a/src/cpp/server/server_builder.cc +++ b/src/cpp/server/server_builder.cc @@ -250,14 +250,6 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() { has_sync_methods && num_frequently_polled_cqs > 0; if (has_sync_methods) { - // This is a Sync server - gpr_log(GPR_INFO, - "Synchronous server. Num CQs: %d, Min pollers: %d, Max Pollers: " - "%d, CQ timeout (msec): %d", - sync_server_settings_.num_cqs, sync_server_settings_.min_pollers, - sync_server_settings_.max_pollers, - sync_server_settings_.cq_timeout_msec); - grpc_cq_polling_type polling_type = is_hybrid_server ? GRPC_CQ_NON_POLLING : GRPC_CQ_DEFAULT_POLLING; @@ -272,6 +264,16 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() { sync_server_settings_.min_pollers, sync_server_settings_.max_pollers, sync_server_settings_.cq_timeout_msec)); + if (has_sync_methods) { + // This is a Sync server + gpr_log(GPR_INFO, + "Synchronous server. Num CQs: %d, Min pollers: %d, Max Pollers: " + "%d, CQ timeout (msec): %d", + sync_server_settings_.num_cqs, sync_server_settings_.min_pollers, + sync_server_settings_.max_pollers, + sync_server_settings_.cq_timeout_msec); + } + ServerInitializer* initializer = server->initializer(); // Register all the completion queues with the server. i.e diff --git a/src/ruby/lib/grpc/generic/active_call.rb b/src/ruby/lib/grpc/generic/active_call.rb index 67c984ab49bc2dd052672fa1b6acc996755ee303..87b29c26eaed6301e63fa33d6eb79946fa102521 100644 --- a/src/ruby/lib/grpc/generic/active_call.rb +++ b/src/ruby/lib/grpc/generic/active_call.rb @@ -40,13 +40,13 @@ end module GRPC # The ActiveCall class provides simple methods for sending marshallable # data to a call - class ActiveCall + class ActiveCall # rubocop:disable Metrics/ClassLength include Core::TimeConsts include Core::CallOps extend Forwardable - attr_reader :deadline, :metadata_sent, :metadata_to_send + attr_reader :deadline, :metadata_sent, :metadata_to_send, :peer, :peer_cert def_delegators :@call, :cancel, :metadata, :write_flag, :write_flag=, - :peer, :peer_cert, :trailing_metadata + :trailing_metadata, :status # client_invoke begins a client invocation. # @@ -100,6 +100,18 @@ module GRPC fail(ArgumentError, 'Already sent md') if started && metadata_to_send @metadata_to_send = metadata_to_send || {} unless started @send_initial_md_mutex = Mutex.new + + @output_stream_done = false + @input_stream_done = false + @call_finished = false + @call_finished_mu = Mutex.new + + @client_call_executed = false + @client_call_executed_mu = Mutex.new + + # set the peer now so that the accessor can still function + # after the server closes the call + @peer = call.peer end # Sends the initial metadata that has yet to be sent. @@ -142,11 +154,9 @@ module GRPC Operation.new(self) end - # finished waits until a client call is completed. - # - # It blocks until the remote endpoint acknowledges by sending a status. - def finished + def receive_and_check_status batch_result = @call.run_batch(RECV_STATUS_ON_CLIENT => nil) + set_input_stream_done attach_status_results_and_complete_call(batch_result) end @@ -155,8 +165,6 @@ module GRPC @call.trailing_metadata = recv_status_batch_result.status.metadata end @call.status = recv_status_batch_result.status - @call.close - op_is_done # The RECV_STATUS in run_batch always succeeds # Check the status for a bad status or failed run batch @@ -193,9 +201,19 @@ module GRPC } ops[RECV_CLOSE_ON_SERVER] = nil if assert_finished @call.run_batch(ops) + set_output_stream_done + nil end + # Intended for use on server-side calls when a single request from + # the client is expected (i.e., unary and server-streaming RPC types). + def read_unary_request + req = remote_read + set_input_stream_done + req + end + def server_unary_response(req, trailing_metadata: {}, code: Core::StatusCodes::OK, details: 'OK') ops = {} @@ -211,6 +229,7 @@ module GRPC ops[RECV_CLOSE_ON_SERVER] = nil @call.run_batch(ops) + set_output_stream_done end # remote_read reads a response from the remote endpoint. @@ -241,6 +260,8 @@ module GRPC # each_remote_read passes each response to the given block or returns an # enumerator the responses if no block is given. + # Used to generate the request enumerable for + # server-side client-streaming RPC's. # # == Enumerator == # @@ -258,10 +279,14 @@ module GRPC # @return [Enumerator] if no block was given def each_remote_read return enum_for(:each_remote_read) unless block_given? - loop do - resp = remote_read - break if resp.nil? # the last response was received - yield resp + begin + loop do + resp = remote_read + break if resp.nil? # the last response was received + yield resp + end + ensure + set_input_stream_done end end @@ -287,13 +312,17 @@ module GRPC # @return [Enumerator] if no block was given def each_remote_read_then_finish return enum_for(:each_remote_read_then_finish) unless block_given? - loop do - resp = remote_read - if resp.nil? # the last response was received, but not finished yet - finished - break + begin + loop do + resp = remote_read + if resp.nil? # the last response was received + receive_and_check_status + break + end + yield resp end - yield resp + ensure + set_input_stream_done end end @@ -305,6 +334,7 @@ module GRPC # a list, multiple metadata for its key are sent # @return [Object] the response received from the server def request_response(req, metadata: {}) + raise_error_if_already_executed ops = { SEND_MESSAGE => @marshal.call(req), SEND_CLOSE_FROM_CLIENT => nil, @@ -319,7 +349,15 @@ module GRPC end @metadata_sent = true end - batch_result = @call.run_batch(ops) + + begin + batch_result = @call.run_batch(ops) + # no need to check for cancellation after a CallError because this + # batch contains a RECV_STATUS op + ensure + set_input_stream_done + set_output_stream_done + end @call.metadata = batch_result.metadata attach_status_results_and_complete_call(batch_result) @@ -339,10 +377,20 @@ module GRPC # a list, multiple metadata for its key are sent # @return [Object] the response received from the server def client_streamer(requests, metadata: {}) - # Metadata might have already been sent if this is an operation view - merge_metadata_and_send_if_not_already_sent(metadata) + raise_error_if_already_executed + begin + merge_metadata_and_send_if_not_already_sent(metadata) + requests.each { |r| @call.run_batch(SEND_MESSAGE => @marshal.call(r)) } + rescue GRPC::Core::CallError => e + receive_and_check_status # check for Cancelled + raise e + rescue => e + set_input_stream_done + raise e + ensure + set_output_stream_done + end - requests.each { |r| @call.run_batch(SEND_MESSAGE => @marshal.call(r)) } batch_result = @call.run_batch( SEND_CLOSE_FROM_CLIENT => nil, RECV_INITIAL_METADATA => nil, @@ -350,12 +398,11 @@ module GRPC RECV_STATUS_ON_CLIENT => nil ) + set_input_stream_done + @call.metadata = batch_result.metadata attach_status_results_and_complete_call(batch_result) get_message_from_batch_result(batch_result) - rescue GRPC::Core::CallError => e - finished # checks for Cancelled - raise e end # server_streamer sends one request to the GRPC server, which yields a @@ -373,6 +420,7 @@ module GRPC # a list, multiple metadata for its key are sent # @return [Enumerator|nil] a response Enumerator def server_streamer(req, metadata: {}) + raise_error_if_already_executed ops = { SEND_MESSAGE => @marshal.call(req), SEND_CLOSE_FROM_CLIENT => nil @@ -384,13 +432,22 @@ module GRPC end @metadata_sent = true end - @call.run_batch(ops) + + begin + @call.run_batch(ops) + rescue GRPC::Core::CallError => e + receive_and_check_status # checks for Cancelled + raise e + rescue => e + set_input_stream_done + raise e + ensure + set_output_stream_done + end + replies = enum_for(:each_remote_read_then_finish) return replies unless block_given? replies.each { |r| yield r } - rescue GRPC::Core::CallError => e - finished # checks for Cancelled - raise e end # bidi_streamer sends a stream of requests to the GRPC server, and yields @@ -421,6 +478,7 @@ module GRPC # a list, multiple metadata for its key are sent # @return [Enumerator, nil] a response Enumerator def bidi_streamer(requests, metadata: {}, &blk) + raise_error_if_already_executed # Metadata might have already been sent if this is an operation view merge_metadata_and_send_if_not_already_sent(metadata) bd = BidiCall.new(@call, @@ -428,7 +486,10 @@ module GRPC @unmarshal, metadata_received: @metadata_received) - bd.run_on_client(requests, @op_notifier, &blk) + bd.run_on_client(requests, + proc { set_input_stream_done }, + proc { set_output_stream_done }, + &blk) end # run_server_bidi orchestrates a BiDi stream processing on a server. @@ -449,7 +510,7 @@ module GRPC metadata_received: @metadata_received, req_view: MultiReqView.new(self)) - bd.run_on_server(gen_each_reply) + bd.run_on_server(gen_each_reply, proc { set_input_stream_done }) end # Waits till an operation completes @@ -459,7 +520,8 @@ module GRPC @op_notifier.wait end - # Signals that an operation is done + # Signals that an operation is done. + # Only relevant on the client-side (this is a no-op on the server-side) def op_is_done return if @op_notifier.nil? @op_notifier.notify(self) @@ -484,8 +546,40 @@ module GRPC end end + def attach_peer_cert(peer_cert) + @peer_cert = peer_cert + end + private + # To be called once the "input stream" has been completelly + # read through (i.e, done reading from client or received status) + # note this is idempotent + def set_input_stream_done + @call_finished_mu.synchronize do + @input_stream_done = true + maybe_finish_and_close_call_locked + end + end + + # To be called once the "output stream" has been completelly + # sent through (i.e, done sending from client or sent status) + # note this is idempotent + def set_output_stream_done + @call_finished_mu.synchronize do + @output_stream_done = true + maybe_finish_and_close_call_locked + end + end + + def maybe_finish_and_close_call_locked + return unless @output_stream_done && @input_stream_done + return if @call_finished + @call_finished = true + op_is_done + @call.close + end + # Starts the call if not already started # @param metadata [Hash] metadata to be sent to the server. If a value is # a list, multiple metadata for its key are sent @@ -493,6 +587,15 @@ module GRPC merge_metadata_to_send(metadata) && send_initial_metadata end + def raise_error_if_already_executed + @client_call_executed_mu.synchronize do + if @client_call_executed + fail GRPC::Core::CallError, 'attempting to re-run a call' + end + @client_call_executed = true + end + end + def self.view_class(*visible_methods) Class.new do extend ::Forwardable @@ -518,6 +621,7 @@ module GRPC # server client_streamer handlers. MultiReqView = view_class(:cancelled?, :deadline, :each_remote_read, :metadata, :output_metadata, + :peer, :peer_cert, :send_initial_metadata, :metadata_to_send, :merge_metadata_to_send, diff --git a/src/ruby/lib/grpc/generic/bidi_call.rb b/src/ruby/lib/grpc/generic/bidi_call.rb index e54cf78969b4cd12487335438f3edeb2720bb4aa..9e125cd986b6119479d0e9a2d015736142b70045 100644 --- a/src/ruby/lib/grpc/generic/bidi_call.rb +++ b/src/ruby/lib/grpc/generic/bidi_call.rb @@ -62,12 +62,19 @@ module GRPC # block that can be invoked with each response. # # @param requests the Enumerable of requests to send - # @param op_notifier a Notifier used to signal completion + # @param set_input_stream_done [Proc] called back when we're done + # reading the input stream + # @param set_input_stream_done [Proc] called back when we're done + # sending data on the output stream # @return an Enumerator of requests to yield - def run_on_client(requests, op_notifier, &blk) - @op_notifier = op_notifier - @enq_th = Thread.new { write_loop(requests) } - read_loop(&blk) + def run_on_client(requests, + set_input_stream_done, + set_output_stream_done, + &blk) + @enq_th = Thread.new do + write_loop(requests, set_output_stream_done: set_output_stream_done) + end + read_loop(set_input_stream_done, &blk) end # Begins orchestration of the Bidi stream for a server generating replies. @@ -81,12 +88,17 @@ module GRPC # produced by gen_each_reply could ignore the received_msgs # # @param gen_each_reply [Proc] generates the BiDi stream replies. - def run_on_server(gen_each_reply) + # @param set_input_steam_done [Proc] call back to call when + # the reads have been completely read through. + def run_on_server(gen_each_reply, set_input_stream_done) # Pass in the optional call object parameter if possible if gen_each_reply.arity == 1 - replys = gen_each_reply.call(read_loop(is_client: false)) + replys = gen_each_reply.call( + read_loop(set_input_stream_done, is_client: false)) elsif gen_each_reply.arity == 2 - replys = gen_each_reply.call(read_loop(is_client: false), @req_view) + replys = gen_each_reply.call( + read_loop(set_input_stream_done, is_client: false), + @req_view) else fail 'Illegal arity of reply generator' end @@ -99,22 +111,6 @@ module GRPC END_OF_READS = :end_of_reads END_OF_WRITES = :end_of_writes - # signals that bidi operation is complete - def notify_done - return unless @op_notifier - GRPC.logger.debug("bidi-notify-done: notifying #{@op_notifier}") - @op_notifier.notify(self) - end - - # signals that a bidi operation is complete (read + write) - def finished - @done_mutex.synchronize do - return unless @reads_complete && @writes_complete && !@complete - @call.close - @complete = true - end - end - # performs a read using @call.run_batch, ensures metadata is set up def read_using_run_batch ops = { RECV_MESSAGE => nil } @@ -127,7 +123,8 @@ module GRPC batch_result end - def write_loop(requests, is_client: true) + # set_output_stream_done is relevant on client-side + def write_loop(requests, is_client: true, set_output_stream_done: nil) GRPC.logger.debug('bidi-write-loop: starting') count = 0 requests.each do |req| @@ -151,23 +148,20 @@ module GRPC GRPC.logger.debug("bidi-write-loop: client sent #{count}, waiting") @call.run_batch(SEND_CLOSE_FROM_CLIENT => nil) GRPC.logger.debug('bidi-write-loop: done') - notify_done - @writes_complete = true - finished end GRPC.logger.debug('bidi-write-loop: finished') rescue StandardError => e GRPC.logger.warn('bidi-write-loop: failed') GRPC.logger.warn(e) - notify_done - @writes_complete = true - finished raise e + ensure + set_output_stream_done.call if is_client end # Provides an enumerator that yields results of remote reads - def read_loop(is_client: true) + def read_loop(set_input_stream_done, is_client: true) return enum_for(:read_loop, + set_input_stream_done, is_client: is_client) unless block_given? GRPC.logger.debug('bidi-read-loop: starting') begin @@ -201,10 +195,10 @@ module GRPC GRPC.logger.warn('bidi: read-loop failed') GRPC.logger.warn(e) raise e + ensure + set_input_stream_done.call end GRPC.logger.debug('bidi-read-loop: finished') - @reads_complete = true - finished # Make sure that the write loop is done done before finishing the call. # Note that blocking is ok at this point because we've already received # a status diff --git a/src/ruby/lib/grpc/generic/rpc_desc.rb b/src/ruby/lib/grpc/generic/rpc_desc.rb index ce0097573a662cd47d4bb1207a1eae3dc3f9ef50..89cf8ff6a0a402b8257abdc3aa2c524be230da70 100644 --- a/src/ruby/lib/grpc/generic/rpc_desc.rb +++ b/src/ruby/lib/grpc/generic/rpc_desc.rb @@ -48,7 +48,7 @@ module GRPC end def handle_request_response(active_call, mth) - req = active_call.remote_read + req = active_call.read_unary_request resp = mth.call(req, active_call.single_req_view) active_call.server_unary_response( resp, trailing_metadata: active_call.output_metadata) @@ -61,7 +61,7 @@ module GRPC end def handle_server_streamer(active_call, mth) - req = active_call.remote_read + req = active_call.read_unary_request replys = mth.call(req, active_call.single_req_view) replys.each { |r| active_call.remote_send(r) } send_status(active_call, OK, 'OK', active_call.output_metadata) diff --git a/src/ruby/lib/grpc/generic/rpc_server.rb b/src/ruby/lib/grpc/generic/rpc_server.rb index ef2cc0ce9180402c7911e0200faef82b8febca3b..33b3cea1fc3b07000162425952fcbdd66300e4bb 100644 --- a/src/ruby/lib/grpc/generic/rpc_server.rb +++ b/src/ruby/lib/grpc/generic/rpc_server.rb @@ -418,6 +418,7 @@ module GRPC metadata_received: true, started: false, metadata_to_send: connect_md) + c.attach_peer_cert(an_rpc.call.peer_cert) mth = an_rpc.method.to_sym [c, mth] end diff --git a/src/ruby/spec/client_auth_spec.rb b/src/ruby/spec/client_auth_spec.rb new file mode 100644 index 0000000000000000000000000000000000000000..79c9192aa5fe9763e4264f18b9851ea58b277f65 --- /dev/null +++ b/src/ruby/spec/client_auth_spec.rb @@ -0,0 +1,137 @@ +# Copyright 2015 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +require 'grpc' + +def create_channel_creds + test_root = File.join(File.dirname(__FILE__), 'testdata') + files = ['ca.pem', 'client.key', 'client.pem'] + creds = files.map { |f| File.open(File.join(test_root, f)).read } + GRPC::Core::ChannelCredentials.new(creds[0], creds[1], creds[2]) +end + +def client_cert + test_root = File.join(File.dirname(__FILE__), 'testdata') + cert = File.open(File.join(test_root, 'client.pem')).read + fail unless cert.is_a?(String) + cert +end + +def create_server_creds + test_root = File.join(File.dirname(__FILE__), 'testdata') + p "test root: #{test_root}" + files = ['ca.pem', 'server1.key', 'server1.pem'] + creds = files.map { |f| File.open(File.join(test_root, f)).read } + GRPC::Core::ServerCredentials.new( + creds[0], + [{ private_key: creds[1], cert_chain: creds[2] }], + true) # force client auth +end + +# A test message +class EchoMsg + def self.marshal(_o) + '' + end + + def self.unmarshal(_o) + EchoMsg.new + end +end + +# a test service that checks the cert of its peer +class SslTestService + include GRPC::GenericService + rpc :an_rpc, EchoMsg, EchoMsg + rpc :a_client_streaming_rpc, stream(EchoMsg), EchoMsg + rpc :a_server_streaming_rpc, EchoMsg, stream(EchoMsg) + rpc :a_bidi_rpc, stream(EchoMsg), stream(EchoMsg) + + def check_peer_cert(call) + error_msg = "want:\n#{client_cert}\n\ngot:\n#{call.peer_cert}" + fail(error_msg) unless call.peer_cert == client_cert + end + + def an_rpc(req, call) + check_peer_cert(call) + req + end + + def a_client_streaming_rpc(call) + check_peer_cert(call) + call.each_remote_read.each { |r| p r } + EchoMsg.new + end + + def a_server_streaming_rpc(_, call) + check_peer_cert(call) + [EchoMsg.new, EchoMsg.new] + end + + def a_bidi_rpc(requests, call) + check_peer_cert(call) + requests.each { |r| p r } + [EchoMsg.new, EchoMsg.new] + end +end + +SslTestServiceStub = SslTestService.rpc_stub_class + +describe 'client-server auth' do + RpcServer = GRPC::RpcServer + + before(:all) do + server_opts = { + poll_period: 1 + } + @srv = RpcServer.new(**server_opts) + port = @srv.add_http2_port('0.0.0.0:0', create_server_creds) + @srv.handle(SslTestService) + @srv_thd = Thread.new { @srv.run } + @srv.wait_till_running + + client_opts = { + channel_args: { + GRPC::Core::Channel::SSL_TARGET => 'foo.test.google.fr' + } + } + @stub = SslTestServiceStub.new("localhost:#{port}", + create_channel_creds, + **client_opts) + end + + after(:all) do + expect(@srv.stopped?).to be(false) + @srv.stop + @srv_thd.join + end + + it 'client-server auth with unary RPCs' do + @stub.an_rpc(EchoMsg.new) + end + + it 'client-server auth with client streaming RPCs' do + @stub.a_client_streaming_rpc([EchoMsg.new, EchoMsg.new]) + end + + it 'client-server auth with server streaming RPCs' do + responses = @stub.a_server_streaming_rpc(EchoMsg.new) + responses.each { |r| p r } + end + + it 'client-server auth with bidi RPCs' do + responses = @stub.a_bidi_rpc([EchoMsg.new, EchoMsg.new]) + responses.each { |r| p r } + end +end diff --git a/src/ruby/spec/generic/active_call_spec.rb b/src/ruby/spec/generic/active_call_spec.rb index 72e55ebcce0090775d385bae9511433ed46c059b..ec0c29417415868591cd7fe3e624fe6b6064ec22 100644 --- a/src/ruby/spec/generic/active_call_spec.rb +++ b/src/ruby/spec/generic/active_call_spec.rb @@ -473,7 +473,7 @@ describe GRPC::ActiveCall do server_call.remote_send('server_response') expect(client_call.remote_read).to eq('server_response') server_call.send_status(OK, 'status code is OK') - expect { client_call.finished }.to_not raise_error + expect { client_call.receive_and_check_status }.to_not raise_error end it 'finishes ok if the server sends an early status response' do @@ -490,7 +490,7 @@ describe GRPC::ActiveCall do expect do call.run_batch(CallOps::SEND_CLOSE_FROM_CLIENT => nil) end.to_not raise_error - expect { client_call.finished }.to_not raise_error + expect { client_call.receive_and_check_status }.to_not raise_error end it 'finishes ok if SEND_CLOSE and RECV_STATUS has been sent' do diff --git a/src/ruby/spec/generic/client_stub_spec.rb b/src/ruby/spec/generic/client_stub_spec.rb index 09b88c7cef4c0a2ca74a2aff255efe0595c424e4..a8653e73cfd618ef4a60c052c7fe887ffe76c888 100644 --- a/src/ruby/spec/generic/client_stub_spec.rb +++ b/src/ruby/spec/generic/client_stub_spec.rb @@ -36,6 +36,53 @@ include GRPC::Core::StatusCodes include GRPC::Core::TimeConsts include GRPC::Core::CallOps +# check that methods on a finished/closed call t crash +def check_op_view_of_finished_client_call(op_view, + expected_metadata, + expected_trailing_metadata) + # use read_response_stream to try to iterate through + # possible response stream + fail('need something to attempt reads') unless block_given? + expect do + resp = op_view.execute + yield resp + end.to raise_error(GRPC::Core::CallError) + + expect { op_view.start_call }.to raise_error(RuntimeError) + + sanity_check_values_of_accessors(op_view, + expected_metadata, + expected_trailing_metadata) + + expect do + op_view.wait + op_view.cancel + op_view.write_flag = 1 + end.to_not raise_error +end + +def sanity_check_values_of_accessors(op_view, + expected_metadata, + expected_trailing_metadata) + expected_status = Struct::Status.new + expected_status.code = 0 + expected_status.details = 'OK' + expected_status.metadata = expected_trailing_metadata + + expect(op_view.status).to eq(expected_status) + expect(op_view.metadata).to eq(expected_metadata) + expect(op_view.trailing_metadata).to eq(expected_trailing_metadata) + + expect(op_view.cancelled?).to be(false) + expect(op_view.write_flag).to be(nil) + + # The deadline attribute of a call can be either + # a GRPC::Core::TimeSpec or a Time, which are mutually exclusive. + # TODO: fix so that the accessor always returns the same type. + expect(op_view.deadline.is_a?(GRPC::Core::TimeSpec) || + op_view.deadline.is_a?(Time)).to be(true) +end + describe 'ClientStub' do let(:noop) { proc { |x| x } } @@ -45,6 +92,7 @@ describe 'ClientStub' do @method = 'an_rpc_method' @pass = OK @fail = INTERNAL + @metadata = { k1: 'v1', k2: 'v2' } end after(:each) do @@ -107,7 +155,7 @@ describe 'ClientStub' do end end - describe '#request_response' do + describe '#request_response', request_response: true do before(:each) do @sent_msg, @resp = 'a_msg', 'a_reply' end @@ -126,7 +174,7 @@ describe 'ClientStub' do server_port = create_test_server host = "localhost:#{server_port}" th = run_request_response(@sent_msg, @resp, @pass, - k1: 'v1', k2: 'v2') + expected_metadata: { k1: 'v1', k2: 'v2' }) stub = GRPC::ClientStub.new(host, :this_channel_is_insecure) expect(get_response(stub)).to eq(@resp) th.join @@ -187,13 +235,24 @@ describe 'ClientStub' do # Kill the server thread so tests can complete th.kill end + + it 'should raise ArgumentError if metadata contains invalid values' do + @metadata.merge!(k3: 3) + server_port = create_test_server + host = "localhost:#{server_port}" + stub = GRPC::ClientStub.new(host, :this_channel_is_insecure) + expect do + get_response(stub) + end.to raise_error(ArgumentError, + /Header values must be of type string or array/) + end end describe 'without a call operation' do def get_response(stub, credentials: nil) puts credentials.inspect stub.request_response(@method, @sent_msg, noop, noop, - metadata: { k1: 'v1', k2: 'v2' }, + metadata: @metadata, credentials: credentials) end @@ -201,40 +260,62 @@ describe 'ClientStub' do end describe 'via a call operation' do + after(:each) do + # make sure op.wait doesn't hang, even if there's a bad status + @op.wait + end def get_response(stub, run_start_call_first: false, credentials: nil) - op = stub.request_response(@method, @sent_msg, noop, noop, - return_op: true, - metadata: { k1: 'v1', k2: 'v2' }, - deadline: from_relative_time(2), - credentials: credentials) - expect(op).to be_a(GRPC::ActiveCall::Operation) - op.start_call if run_start_call_first - result = op.execute - op.wait # make sure wait doesn't hang + @op = stub.request_response(@method, @sent_msg, noop, noop, + return_op: true, + metadata: @metadata, + deadline: from_relative_time(2), + credentials: credentials) + expect(@op).to be_a(GRPC::ActiveCall::Operation) + @op.start_call if run_start_call_first + result = @op.execute result end it_behaves_like 'request response' - it 'sends metadata to the server ok when running start_call first' do + def run_op_view_metadata_test(run_start_call_first) server_port = create_test_server host = "localhost:#{server_port}" - th = run_request_response(@sent_msg, @resp, @pass, - k1: 'v1', k2: 'v2') + + @server_initial_md = { 'sk1' => 'sv1', 'sk2' => 'sv2' } + @server_trailing_md = { 'tk1' => 'tv1', 'tk2' => 'tv2' } + th = run_request_response( + @sent_msg, @resp, @pass, + expected_metadata: @metadata, + server_initial_md: @server_initial_md, + server_trailing_md: @server_trailing_md) stub = GRPC::ClientStub.new(host, :this_channel_is_insecure) - expect(get_response(stub)).to eq(@resp) + expect( + get_response(stub, + run_start_call_first: run_start_call_first)).to eq(@resp) th.join end + + it 'sends metadata to the server ok when running start_call first' do + run_op_view_metadata_test(true) + check_op_view_of_finished_client_call( + @op, @server_initial_md, @server_trailing_md) { |r| p r } + end + + it 'does not crash when used after the call has been finished' do + run_op_view_metadata_test(false) + check_op_view_of_finished_client_call( + @op, @server_initial_md, @server_trailing_md) { |r| p r } + end end end - describe '#client_streamer' do + describe '#client_streamer', client_streamer: true do before(:each) do Thread.abort_on_exception = true server_port = create_test_server host = "localhost:#{server_port}" @stub = GRPC::ClientStub.new(host, :this_channel_is_insecure) - @metadata = { k1: 'v1', k2: 'v2' } @sent_msgs = Array.new(3) { |i| 'msg_' + (i + 1).to_s } @resp = 'a_reply' end @@ -247,7 +328,8 @@ describe 'ClientStub' do end it 'should send metadata to the server ok' do - th = run_client_streamer(@sent_msgs, @resp, @pass, **@metadata) + th = run_client_streamer(@sent_msgs, @resp, @pass, + expected_metadata: @metadata) expect(get_response(@stub)).to eq(@resp) th.join end @@ -278,27 +360,50 @@ describe 'ClientStub' do end describe 'via a call operation' do + after(:each) do + # make sure op.wait doesn't hang, even if there's a bad status + @op.wait + end def get_response(stub, run_start_call_first: false) - op = stub.client_streamer(@method, @sent_msgs, noop, noop, - return_op: true, metadata: @metadata) - expect(op).to be_a(GRPC::ActiveCall::Operation) - op.start_call if run_start_call_first - result = op.execute - op.wait # make sure wait doesn't hang + @op = stub.client_streamer(@method, @sent_msgs, noop, noop, + return_op: true, metadata: @metadata) + expect(@op).to be_a(GRPC::ActiveCall::Operation) + @op.start_call if run_start_call_first + result = @op.execute result end it_behaves_like 'client streaming' - it 'sends metadata to the server ok when running start_call first' do - th = run_client_streamer(@sent_msgs, @resp, @pass, **@metadata) - expect(get_response(@stub, run_start_call_first: true)).to eq(@resp) + def run_op_view_metadata_test(run_start_call_first) + @server_initial_md = { 'sk1' => 'sv1', 'sk2' => 'sv2' } + @server_trailing_md = { 'tk1' => 'tv1', 'tk2' => 'tv2' } + th = run_client_streamer( + @sent_msgs, @resp, @pass, + expected_metadata: @metadata, + server_initial_md: @server_initial_md, + server_trailing_md: @server_trailing_md) + expect( + get_response(@stub, + run_start_call_first: run_start_call_first)).to eq(@resp) th.join end + + it 'sends metadata to the server ok when running start_call first' do + run_op_view_metadata_test(true) + check_op_view_of_finished_client_call( + @op, @server_initial_md, @server_trailing_md) { |r| p r } + end + + it 'does not crash when used after the call has been finished' do + run_op_view_metadata_test(false) + check_op_view_of_finished_client_call( + @op, @server_initial_md, @server_trailing_md) { |r| p r } + end end end - describe '#server_streamer' do + describe '#server_streamer', server_streamer: true do before(:each) do @sent_msg = 'a_msg' @replys = Array.new(3) { |i| 'reply_' + (i + 1).to_s } @@ -328,18 +433,42 @@ describe 'ClientStub' do server_port = create_test_server host = "localhost:#{server_port}" th = run_server_streamer(@sent_msg, @replys, @fail, - k1: 'v1', k2: 'v2') + expected_metadata: { k1: 'v1', k2: 'v2' }) stub = GRPC::ClientStub.new(host, :this_channel_is_insecure) e = get_responses(stub) expect { e.collect { |r| r } }.to raise_error(GRPC::BadStatus) th.join end + + it 'should raise ArgumentError if metadata contains invalid values' do + @metadata.merge!(k3: 3) + server_port = create_test_server + host = "localhost:#{server_port}" + stub = GRPC::ClientStub.new(host, :this_channel_is_insecure) + expect do + get_responses(stub) + end.to raise_error(ArgumentError, + /Header values must be of type string or array/) + end + + it 'the call terminates when there is an unmarshalling error' do + server_port = create_test_server + host = "localhost:#{server_port}" + th = run_server_streamer(@sent_msg, @replys, @pass) + stub = GRPC::ClientStub.new(host, :this_channel_is_insecure) + + unmarshal = proc { fail(ArgumentError, 'test unmarshalling error') } + expect do + get_responses(stub, unmarshal: unmarshal).collect { |r| r } + end.to raise_error(ArgumentError, 'test unmarshalling error') + th.join + end end describe 'without a call operation' do - def get_responses(stub) - e = stub.server_streamer(@method, @sent_msg, noop, noop, - metadata: { k1: 'v1', k2: 'v2' }) + def get_responses(stub, unmarshal: noop) + e = stub.server_streamer(@method, @sent_msg, noop, unmarshal, + metadata: @metadata) expect(e).to be_a(Enumerator) e end @@ -351,10 +480,10 @@ describe 'ClientStub' do after(:each) do @op.wait # make sure wait doesn't hang end - def get_responses(stub, run_start_call_first: false) - @op = stub.server_streamer(@method, @sent_msg, noop, noop, + def get_responses(stub, run_start_call_first: false, unmarshal: noop) + @op = stub.server_streamer(@method, @sent_msg, noop, unmarshal, return_op: true, - metadata: { k1: 'v1', k2: 'v2' }) + metadata: @metadata) expect(@op).to be_a(GRPC::ActiveCall::Operation) @op.start_call if run_start_call_first e = @op.execute @@ -364,20 +493,41 @@ describe 'ClientStub' do it_behaves_like 'server streaming' - it 'should send metadata to the server ok when start_call is run first' do + def run_op_view_metadata_test(run_start_call_first) server_port = create_test_server host = "localhost:#{server_port}" - th = run_server_streamer(@sent_msg, @replys, @fail, - k1: 'v1', k2: 'v2') + @server_initial_md = { 'sk1' => 'sv1', 'sk2' => 'sv2' } + @server_trailing_md = { 'tk1' => 'tv1', 'tk2' => 'tv2' } + th = run_server_streamer( + @sent_msg, @replys, @pass, + expected_metadata: @metadata, + server_initial_md: @server_initial_md, + server_trailing_md: @server_trailing_md) stub = GRPC::ClientStub.new(host, :this_channel_is_insecure) - e = get_responses(stub, run_start_call_first: true) - expect { e.collect { |r| r } }.to raise_error(GRPC::BadStatus) + e = get_responses(stub, run_start_call_first: run_start_call_first) + expect(e.collect { |r| r }).to eq(@replys) th.join end + + it 'should send metadata to the server ok when start_call is run first' do + run_op_view_metadata_test(true) + check_op_view_of_finished_client_call( + @op, @server_initial_md, @server_trailing_md) do |responses| + responses.each { |r| p r } + end + end + + it 'does not crash when used after the call has been finished' do + run_op_view_metadata_test(false) + check_op_view_of_finished_client_call( + @op, @server_initial_md, @server_trailing_md) do |responses| + responses.each { |r| p r } + end + end end end - describe '#bidi_streamer' do + describe '#bidi_streamer', bidi: true do before(:each) do @sent_msgs = Array.new(3) { |i| 'msg_' + (i + 1).to_s } @replys = Array.new(3) { |i| 'reply_' + (i + 1).to_s } @@ -386,7 +536,7 @@ describe 'ClientStub' do end shared_examples 'bidi streaming' do - it 'supports sending all the requests first', bidi: true do + it 'supports sending all the requests first' do th = run_bidi_streamer_handle_inputs_first(@sent_msgs, @replys, @pass) stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure) @@ -395,7 +545,7 @@ describe 'ClientStub' do th.join end - it 'supports client-initiated ping pong', bidi: true do + it 'supports client-initiated ping pong' do th = run_bidi_streamer_echo_ping_pong(@sent_msgs, @pass, true) stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure) e = get_responses(stub) @@ -403,18 +553,39 @@ describe 'ClientStub' do th.join end - it 'supports a server-initiated ping pong', bidi: true do + it 'supports a server-initiated ping pong' do th = run_bidi_streamer_echo_ping_pong(@sent_msgs, @pass, false) stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure) e = get_responses(stub) expect(e.collect { |r| r }).to eq(@sent_msgs) th.join end + + it 'should raise an error if the status is not ok' do + th = run_bidi_streamer_echo_ping_pong(@sent_msgs, @fail, false) + stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure) + e = get_responses(stub) + expect { e.collect { |r| r } }.to raise_error(GRPC::BadStatus) + th.join + end + + # TODO: add test for metadata-related ArgumentError in a bidi call once + # issue mentioned in https://github.com/grpc/grpc/issues/10526 is fixed + + it 'should send metadata to the server ok' do + th = run_bidi_streamer_echo_ping_pong(@sent_msgs, @pass, true, + expected_metadata: @metadata) + stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure) + e = get_responses(stub) + expect(e.collect { |r| r }).to eq(@sent_msgs) + th.join + end end describe 'without a call operation' do def get_responses(stub) - e = stub.bidi_streamer(@method, @sent_msgs, noop, noop) + e = stub.bidi_streamer(@method, @sent_msgs, noop, noop, + metadata: @metadata) expect(e).to be_a(Enumerator) e end @@ -428,7 +599,8 @@ describe 'ClientStub' do end def get_responses(stub, run_start_call_first: false) @op = stub.bidi_streamer(@method, @sent_msgs, noop, noop, - return_op: true) + return_op: true, + metadata: @metadata) expect(@op).to be_a(GRPC::ActiveCall::Operation) @op.start_call if run_start_call_first e = @op.execute @@ -438,27 +610,53 @@ describe 'ClientStub' do it_behaves_like 'bidi streaming' - it 'can run start_call before executing the call' do - th = run_bidi_streamer_handle_inputs_first(@sent_msgs, @replys, - @pass) + def run_op_view_metadata_test(run_start_call_first) + @server_initial_md = { 'sk1' => 'sv1', 'sk2' => 'sv2' } + @server_trailing_md = { 'tk1' => 'tv1', 'tk2' => 'tv2' } + th = run_bidi_streamer_echo_ping_pong( + @sent_msgs, @pass, true, + expected_metadata: @metadata, + server_initial_md: @server_initial_md, + server_trailing_md: @server_trailing_md) stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure) - e = get_responses(stub, run_start_call_first: true) - expect(e.collect { |r| r }).to eq(@replys) + e = get_responses(stub, run_start_call_first: run_start_call_first) + expect(e.collect { |r| r }).to eq(@sent_msgs) th.join end + + it 'can run start_call before executing the call' do + run_op_view_metadata_test(true) + check_op_view_of_finished_client_call( + @op, @server_initial_md, @server_trailing_md) do |responses| + responses.each { |r| p r } + end + end + + it 'doesnt crash when op_view used after call has finished' do + run_op_view_metadata_test(false) + check_op_view_of_finished_client_call( + @op, @server_initial_md, @server_trailing_md) do |responses| + responses.each { |r| p r } + end + end end end - def run_server_streamer(expected_input, replys, status, **kw) - wanted_metadata = kw.clone + def run_server_streamer(expected_input, replys, status, + expected_metadata: {}, + server_initial_md: {}, + server_trailing_md: {}) + wanted_metadata = expected_metadata.clone wakey_thread do |notifier| - c = expect_server_to_be_invoked(notifier) + c = expect_server_to_be_invoked( + notifier, metadata_to_send: server_initial_md) wanted_metadata.each do |k, v| expect(c.metadata[k.to_s]).to eq(v) end expect(c.remote_read).to eq(expected_input) replys.each { |r| c.remote_send(r) } - c.send_status(status, status == @pass ? 'OK' : 'NOK', true) + c.send_status(status, status == @pass ? 'OK' : 'NOK', true, + metadata: server_trailing_md) end end @@ -472,9 +670,17 @@ describe 'ClientStub' do end end - def run_bidi_streamer_echo_ping_pong(expected_inputs, status, client_starts) + def run_bidi_streamer_echo_ping_pong(expected_inputs, status, client_starts, + expected_metadata: {}, + server_initial_md: {}, + server_trailing_md: {}) + wanted_metadata = expected_metadata.clone wakey_thread do |notifier| - c = expect_server_to_be_invoked(notifier) + c = expect_server_to_be_invoked( + notifier, metadata_to_send: server_initial_md) + wanted_metadata.each do |k, v| + expect(c.metadata[k.to_s]).to eq(v) + end expected_inputs.each do |i| if client_starts expect(c.remote_read).to eq(i) @@ -484,33 +690,44 @@ describe 'ClientStub' do expect(c.remote_read).to eq(i) end end - c.send_status(status, status == @pass ? 'OK' : 'NOK', true) + c.send_status(status, status == @pass ? 'OK' : 'NOK', true, + metadata: server_trailing_md) end end - def run_client_streamer(expected_inputs, resp, status, **kw) - wanted_metadata = kw.clone + def run_client_streamer(expected_inputs, resp, status, + expected_metadata: {}, + server_initial_md: {}, + server_trailing_md: {}) + wanted_metadata = expected_metadata.clone wakey_thread do |notifier| - c = expect_server_to_be_invoked(notifier) + c = expect_server_to_be_invoked( + notifier, metadata_to_send: server_initial_md) expected_inputs.each { |i| expect(c.remote_read).to eq(i) } wanted_metadata.each do |k, v| expect(c.metadata[k.to_s]).to eq(v) end c.remote_send(resp) - c.send_status(status, status == @pass ? 'OK' : 'NOK', true) + c.send_status(status, status == @pass ? 'OK' : 'NOK', true, + metadata: server_trailing_md) end end - def run_request_response(expected_input, resp, status, **kw) - wanted_metadata = kw.clone + def run_request_response(expected_input, resp, status, + expected_metadata: {}, + server_initial_md: {}, + server_trailing_md: {}) + wanted_metadata = expected_metadata.clone wakey_thread do |notifier| - c = expect_server_to_be_invoked(notifier) + c = expect_server_to_be_invoked( + notifier, metadata_to_send: server_initial_md) expect(c.remote_read).to eq(expected_input) wanted_metadata.each do |k, v| expect(c.metadata[k.to_s]).to eq(v) end c.remote_send(resp) - c.send_status(status, status == @pass ? 'OK' : 'NOK', true) + c.send_status(status, status == @pass ? 'OK' : 'NOK', true, + metadata: server_trailing_md) end end @@ -528,13 +745,13 @@ describe 'ClientStub' do @server.add_http2_port('0.0.0.0:0', :this_port_is_insecure) end - def expect_server_to_be_invoked(notifier) + def expect_server_to_be_invoked(notifier, metadata_to_send: nil) @server.start notifier.notify(nil) recvd_rpc = @server.request_call recvd_call = recvd_rpc.call recvd_call.metadata = recvd_rpc.metadata - recvd_call.run_batch(SEND_INITIAL_METADATA => nil) + recvd_call.run_batch(SEND_INITIAL_METADATA => metadata_to_send) GRPC::ActiveCall.new(recvd_call, noop, noop, INFINITE_FUTURE, metadata_received: true) end diff --git a/src/ruby/spec/generic/rpc_desc_spec.rb b/src/ruby/spec/generic/rpc_desc_spec.rb index 100e9e848766329ac9220fa2b5fe1ec61c1b994d..be578c40d3f529805e5c4a0c8116fbcfa5e9aebc 100644 --- a/src/ruby/spec/generic/rpc_desc_spec.rb +++ b/src/ruby/spec/generic/rpc_desc_spec.rb @@ -38,14 +38,14 @@ describe GRPC::RpcDesc do shared_examples 'it handles errors' do it 'sends the specified status if BadStatus is raised' do - expect(@call).to receive(:remote_read).once.and_return(Object.new) + expect(@call).to receive(:read_unary_request).once.and_return(Object.new) expect(@call).to receive(:send_status).once.with(@bs_code, 'NOK', false, metadata: {}) this_desc.run_server_method(@call, method(:bad_status)) end it 'sends status UNKNOWN if other StandardErrors are raised' do - expect(@call).to receive(:remote_read).once.and_return(Object.new) + expect(@call).to receive(:read_unary_request).once.and_return(Object.new) expect(@call).to receive(:send_status).once.with(UNKNOWN, arg_error_msg, false, metadata: {}) @@ -53,7 +53,7 @@ describe GRPC::RpcDesc do end it 'absorbs CallError with no further action' do - expect(@call).to receive(:remote_read).once.and_raise(CallError) + expect(@call).to receive(:read_unary_request).once.and_raise(CallError) blk = proc do this_desc.run_server_method(@call, method(:fake_reqresp)) end @@ -75,7 +75,7 @@ describe GRPC::RpcDesc do it 'sends a response and closes the stream if there no errors' do req = Object.new - expect(@call).to receive(:remote_read).once.and_return(req) + expect(@call).to receive(:read_unary_request).once.and_return(req) expect(@call).to receive(:output_metadata).once.and_return(fake_md) expect(@call).to receive(:server_unary_response).once .with(@ok_response, trailing_metadata: fake_md) @@ -133,7 +133,7 @@ describe GRPC::RpcDesc do it 'sends a response and closes the stream if there no errors' do req = Object.new - expect(@call).to receive(:remote_read).once.and_return(req) + expect(@call).to receive(:read_unary_request).once.and_return(req) expect(@call).to receive(:remote_send).twice.with(@ok_response) expect(@call).to receive(:output_metadata).and_return(fake_md) expect(@call).to receive(:send_status).once.with(OK, 'OK', true, diff --git a/src/ruby/spec/generic/rpc_server_spec.rb b/src/ruby/spec/generic/rpc_server_spec.rb index 9633a828a2a949ab015de2874cd1946dc5fe376a..e0646f4599773354c36f8247e0f13b707c596ea4 100644 --- a/src/ruby/spec/generic/rpc_server_spec.rb +++ b/src/ruby/spec/generic/rpc_server_spec.rb @@ -111,6 +111,47 @@ end SlowStub = SlowService.rpc_stub_class +# a test service that hangs onto call objects +# and uses them after the server-side call has been +# finished +class CheckCallAfterFinishedService + include GRPC::GenericService + rpc :an_rpc, EchoMsg, EchoMsg + rpc :a_client_streaming_rpc, stream(EchoMsg), EchoMsg + rpc :a_server_streaming_rpc, EchoMsg, stream(EchoMsg) + rpc :a_bidi_rpc, stream(EchoMsg), stream(EchoMsg) + attr_reader :server_side_call + + def an_rpc(req, call) + fail 'shouldnt reuse service' unless @server_side_call.nil? + @server_side_call = call + req + end + + def a_client_streaming_rpc(call) + fail 'shouldnt reuse service' unless @server_side_call.nil? + @server_side_call = call + # iterate through requests so call can complete + call.each_remote_read.each { |r| p r } + EchoMsg.new + end + + def a_server_streaming_rpc(_, call) + fail 'shouldnt reuse service' unless @server_side_call.nil? + @server_side_call = call + [EchoMsg.new, EchoMsg.new] + end + + def a_bidi_rpc(requests, call) + fail 'shouldnt reuse service' unless @server_side_call.nil? + @server_side_call = call + requests.each { |r| p r } + [EchoMsg.new, EchoMsg.new] + end +end + +CheckCallAfterFinishedServiceStub = CheckCallAfterFinishedService.rpc_stub_class + describe GRPC::RpcServer do RpcServer = GRPC::RpcServer StatusCodes = GRPC::Core::StatusCodes @@ -505,5 +546,109 @@ describe GRPC::RpcServer do t.join end end + + context 'when call objects are used after calls have completed' do + before(:each) do + server_opts = { + poll_period: 1 + } + @srv = RpcServer.new(**server_opts) + alt_port = @srv.add_http2_port('0.0.0.0:0', :this_port_is_insecure) + @alt_host = "0.0.0.0:#{alt_port}" + + @service = CheckCallAfterFinishedService.new + @srv.handle(@service) + @srv_thd = Thread.new { @srv.run } + @srv.wait_till_running + end + + # check that the server-side call is still in a usable state even + # after it has finished + def check_single_req_view_of_finished_call(call) + common_check_of_finished_server_call(call) + + expect(call.peer).to be_a(String) + expect(call.peer_cert).to be(nil) + end + + def check_multi_req_view_of_finished_call(call) + common_check_of_finished_server_call(call) + + expect do + call.each_remote_read.each { |r| p r } + end.to raise_error(GRPC::Core::CallError) + end + + def common_check_of_finished_server_call(call) + expect do + call.merge_metadata_to_send({}) + end.to raise_error(RuntimeError) + + expect do + call.send_initial_metadata + end.to_not raise_error + + expect(call.cancelled?).to be(false) + expect(call.metadata).to be_a(Hash) + expect(call.metadata['user-agent']).to be_a(String) + + expect(call.metadata_sent).to be(true) + expect(call.output_metadata).to eq({}) + expect(call.metadata_to_send).to eq({}) + expect(call.deadline.is_a?(Time)).to be(true) + end + + it 'should not crash when call used after an unary call is finished' do + req = EchoMsg.new + stub = CheckCallAfterFinishedServiceStub.new(@alt_host, + :this_channel_is_insecure) + resp = stub.an_rpc(req) + expect(resp).to be_a(EchoMsg) + @srv.stop + @srv_thd.join + + check_single_req_view_of_finished_call(@service.server_side_call) + end + + it 'should not crash when call used after client streaming finished' do + requests = [EchoMsg.new, EchoMsg.new] + stub = CheckCallAfterFinishedServiceStub.new(@alt_host, + :this_channel_is_insecure) + resp = stub.a_client_streaming_rpc(requests) + expect(resp).to be_a(EchoMsg) + @srv.stop + @srv_thd.join + + check_multi_req_view_of_finished_call(@service.server_side_call) + end + + it 'should not crash when call used after server streaming finished' do + req = EchoMsg.new + stub = CheckCallAfterFinishedServiceStub.new(@alt_host, + :this_channel_is_insecure) + responses = stub.a_server_streaming_rpc(req) + responses.each do |r| + expect(r).to be_a(EchoMsg) + end + @srv.stop + @srv_thd.join + + check_single_req_view_of_finished_call(@service.server_side_call) + end + + it 'should not crash when call used after a bidi call is finished' do + requests = [EchoMsg.new, EchoMsg.new] + stub = CheckCallAfterFinishedServiceStub.new(@alt_host, + :this_channel_is_insecure) + responses = stub.a_bidi_rpc(requests) + responses.each do |r| + expect(r).to be_a(EchoMsg) + end + @srv.stop + @srv_thd.join + + check_multi_req_view_of_finished_call(@service.server_side_call) + end + end end end diff --git a/src/ruby/spec/testdata/client.key b/src/ruby/spec/testdata/client.key new file mode 100644 index 0000000000000000000000000000000000000000..f48d0735d99ad58540ffa46eb003652916619f1c --- /dev/null +++ b/src/ruby/spec/testdata/client.key @@ -0,0 +1,16 @@ +-----BEGIN PRIVATE KEY----- +MIICeQIBADANBgkqhkiG9w0BAQEFAASCAmMwggJfAgEAAoGBAOxUR9uhvhbeVUIM +s5WbH0px0mehl2+6sZpNjzvE2KimZpHzMJHukVH0Ffkvhs0b8+S5Ut9VNUAqd3IM +JCCAEGtRNoQhM1t9Yr2zAckSvbRacp+FL/Cj9eDmyo00KsVGaeefA4Dh4OW+ZhkT +NKcldXqkSuj1sEf244JZYuqZp6/tAgMBAAECgYEAi2NSVqpZMafE5YYUTcMGe6QS +k2jtpsqYgggI2RnLJ/2tNZwYI5pwP8QVSbnMaiF4gokD5hGdrNDfTnb2v+yIwYEH +0w8+oG7Z81KodsiZSIDJfTGsAZhVNwOz9y0VD8BBZZ1/274Zh52AUKLjZS/ZwIbS +W2ywya855dPnH/wj+0ECQQD9X8D920kByTNHhBG18biAEZ4pxs9f0OAG8333eVcI +w2lJDLsYDZrCB2ocgA3lUdozlzPC7YDYw8reg0tkiRY5AkEA7sdNzOeQsQRn7++5 +0bP9DtT/iON1gbfxRzCfCfXdoOtfQWIzTePWtURt9X/5D9NofI0Rg5W2oGy/MLe5 +/sXHVQJBAIup5XrJDkQywNZyAUU2ecn2bCWBFjwtqd+LBmuMciI9fOKsZtEKZrz/ +U0lkeMRoSwvXE8wmGLjjrAbdfohrXFkCQQDZEx/LtIl6JINJQiswVe0tWr6k+ASP +1WXoTm+HYpoF/XUvv9LccNF1IazFj34hwRQwhx7w/V52Ieb+p0jUMYGxAkEAjDhd +9pBO1fKXWiXzi9ZKfoyTNcUq3eBSVKwPG2nItg5ycXengjT5sgcWDnciIzW7BIVI +JiqOszq9GWESErAatg== +-----END PRIVATE KEY----- diff --git a/src/ruby/spec/testdata/client.pem b/src/ruby/spec/testdata/client.pem new file mode 100644 index 0000000000000000000000000000000000000000..e332091019bdd97385dcabf17581ae9eb18feed5 --- /dev/null +++ b/src/ruby/spec/testdata/client.pem @@ -0,0 +1,14 @@ +-----BEGIN CERTIFICATE----- +MIICHzCCAYgCAQEwDQYJKoZIhvcNAQEFBQAwVjELMAkGA1UEBhMCQVUxEzARBgNV +BAgMClNvbWUtU3RhdGUxITAfBgNVBAoMGEludGVybmV0IFdpZGdpdHMgUHR5IEx0 +ZDEPMA0GA1UEAwwGdGVzdGNhMB4XDTE0MDcxNzIzNTYwMloXDTI0MDcxNDIzNTYw +MlowWjELMAkGA1UEBhMCQVUxEzARBgNVBAgMClNvbWUtU3RhdGUxITAfBgNVBAoM +GEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDETMBEGA1UEAwwKdGVzdGNsaWVudDCB +nzANBgkqhkiG9w0BAQEFAAOBjQAwgYkCgYEA7FRH26G+Ft5VQgyzlZsfSnHSZ6GX +b7qxmk2PO8TYqKZmkfMwke6RUfQV+S+GzRvz5LlS31U1QCp3cgwkIIAQa1E2hCEz +W31ivbMByRK9tFpyn4Uv8KP14ObKjTQqxUZp558DgOHg5b5mGRM0pyV1eqRK6PWw +R/bjglli6pmnr+0CAwEAATANBgkqhkiG9w0BAQUFAAOBgQAStSm5PM7ubROiKK6/ +T2FkKlhiTOx+Ryenm3Eio59emq+jXl+1nhPySX5G2PQzSR5vd1dIhwgZSR4Gyttk +tRZ57k/NI1brUW8joiEOMJA/Mr7H7asx7wIRYDE91Fs8GkKWd5LhoPAQj+qdG35C +OO+svdkmqH0KZo320ZUqdl2ooQ== +-----END CERTIFICATE----- diff --git a/tools/internal_ci/linux/grpc_build_artifacts.sh b/tools/internal_ci/linux/grpc_build_artifacts.sh index bc29014ab5fe12328fc95098b24f35e085067a64..3997a130878ac2cf06123eaba5916cd4f5525125 100755 --- a/tools/internal_ci/linux/grpc_build_artifacts.sh +++ b/tools/internal_ci/linux/grpc_build_artifacts.sh @@ -20,9 +20,10 @@ cd $(dirname $0)/../../.. source tools/internal_ci/helper_scripts/prepare_build_linux_rc -# TODO(jtattermusch): install ruby on the internal_ci worker -gpg --keyserver hkp://keys.gnupg.net --recv-keys 409B6B1796C275462A1703113804BB82D39DC0E3 -# TODO(jtattermusch): grep works around https://github.com/rvm/rvm/issues/4068 -curl -sSL https://get.rvm.io | grep -v __rvm_print_headline | bash -s stable --ruby +set +ex +[[ -s /etc/profile.d/rvm.sh ]] && . /etc/profile.d/rvm.sh +set -e # rvm commands are very verbose +rvm --default use ruby-2.4.1 +set -ex tools/run_tests/task_runner.py -f artifact linux diff --git a/tools/profiling/microbenchmarks/bm_diff/bm_diff.py b/tools/profiling/microbenchmarks/bm_diff/bm_diff.py index 809817a1a8c4e7f0d1887cefa75f0cba8912f929..1ac951f3d861e2f58988f9b7d285a2ff32d4eaf1 100755 --- a/tools/profiling/microbenchmarks/bm_diff/bm_diff.py +++ b/tools/profiling/microbenchmarks/bm_diff/bm_diff.py @@ -144,7 +144,7 @@ def _read_json(filename, badjson_files, nonexistant_files): def fmt_dict(d): return ''.join([" " + k + ": " + str(d[k]) + "\n" for k in d]) -def diff(bms, loops, track, old, new, counters): +def diff(bms, loops, regex, track, old, new, counters): benchmarks = collections.defaultdict(Benchmark) badjson_files = {} @@ -153,7 +153,8 @@ def diff(bms, loops, track, old, new, counters): for loop in range(0, loops): for line in subprocess.check_output( ['bm_diff_%s/opt/%s' % (old, bm), - '--benchmark_list_tests']).splitlines(): + '--benchmark_list_tests', + '--benchmark_filter=%s' % regex]).splitlines(): stripped_line = line.strip().replace("/", "_").replace( "<", "_").replace(">", "_").replace(", ", "_") js_new_opt = _read_json('%s.%s.opt.%s.%d.json' % diff --git a/tools/profiling/microbenchmarks/bm_diff/bm_main.py b/tools/profiling/microbenchmarks/bm_diff/bm_main.py index 8b4e0cb69a4519ba06174d073bc32a81c52b84b8..5aa11ac391ea79185f4d7c3e5184cf63144c5e47 100755 --- a/tools/profiling/microbenchmarks/bm_diff/bm_main.py +++ b/tools/profiling/microbenchmarks/bm_diff/bm_main.py @@ -63,10 +63,10 @@ def _args(): help='Name of baseline run to compare to. Ususally just called "old"') argp.add_argument( '-r', - '--repetitions', - type=int, - default=1, - help='Number of repetitions to pass to the benchmarks') + '--regex', + type=str, + default="", + help='Regex to filter benchmarks run') argp.add_argument( '-l', '--loops', @@ -125,10 +125,10 @@ def main(args): subprocess.check_call(['git', 'checkout', where_am_i]) subprocess.check_call(['git', 'submodule', 'update']) - bm_run.run('new', args.benchmarks, args.jobs, args.loops, args.repetitions, args.counters) - bm_run.run(old, args.benchmarks, args.jobs, args.loops, args.repetitions, args.counters) + bm_run.run('new', args.benchmarks, args.jobs, args.loops, args.regex, args.counters) + bm_run.run(old, args.benchmarks, args.jobs, args.loops, args.regex, args.counters) - diff, note = bm_diff.diff(args.benchmarks, args.loops, args.track, old, + diff, note = bm_diff.diff(args.benchmarks, args.loops, args.regex, args.track, old, 'new', args.counters) if diff: text = '[%s] Performance differences noted:\n%s' % (args.pr_comment_name, diff) diff --git a/tools/profiling/microbenchmarks/bm_diff/bm_run.py b/tools/profiling/microbenchmarks/bm_diff/bm_run.py index 72b3d3cf10627a5ef7f873ab3e5f7395093346c0..206f7c5845fb5c73cd7d908a98d92b2705e2a142 100755 --- a/tools/profiling/microbenchmarks/bm_diff/bm_run.py +++ b/tools/profiling/microbenchmarks/bm_diff/bm_run.py @@ -56,10 +56,10 @@ def _args(): ) argp.add_argument( '-r', - '--repetitions', - type=int, - default=1, - help='Number of repetitions to pass to the benchmarks') + '--regex', + type=str, + default="", + help='Regex to filter benchmarks run') argp.add_argument( '-l', '--loops', @@ -77,18 +77,17 @@ def _args(): return args -def _collect_bm_data(bm, cfg, name, reps, idx, loops): +def _collect_bm_data(bm, cfg, name, regex, idx, loops): jobs_list = [] for line in subprocess.check_output( ['bm_diff_%s/%s/%s' % (name, cfg, bm), - '--benchmark_list_tests']).splitlines(): + '--benchmark_list_tests', '--benchmark_filter=%s' % regex]).splitlines(): stripped_line = line.strip().replace("/", "_").replace( "<", "_").replace(">", "_").replace(", ", "_") cmd = [ 'bm_diff_%s/%s/%s' % (name, cfg, bm), '--benchmark_filter=^%s$' % line, '--benchmark_out=%s.%s.%s.%s.%d.json' % (bm, stripped_line, cfg, name, idx), '--benchmark_out_format=json', - '--benchmark_repetitions=%d' % (reps) ] jobs_list.append( jobset.JobSpec( @@ -100,13 +99,13 @@ def _collect_bm_data(bm, cfg, name, reps, idx, loops): return jobs_list -def run(name, benchmarks, jobs, loops, reps, counters): +def run(name, benchmarks, jobs, loops, regex, counters): jobs_list = [] for loop in range(0, loops): for bm in benchmarks: - jobs_list += _collect_bm_data(bm, 'opt', name, reps, loop, loops) + jobs_list += _collect_bm_data(bm, 'opt', name, regex, loop, loops) if counters: - jobs_list += _collect_bm_data(bm, 'counters', name, reps, loop, + jobs_list += _collect_bm_data(bm, 'counters', name, regex, loop, loops) random.shuffle(jobs_list, random.SystemRandom().random) jobset.run(jobs_list, maxjobs=jobs) @@ -114,4 +113,4 @@ def run(name, benchmarks, jobs, loops, reps, counters): if __name__ == '__main__': args = _args() - run(args.name, args.benchmarks, args.jobs, args.loops, args.repetitions, args.counters) + run(args.name, args.benchmarks, args.jobs, args.loops, args.regex, args.counters) diff --git a/tools/run_tests/sanity/core_banned_functions.py b/tools/run_tests/sanity/core_banned_functions.py index b394bbbeaf12816016bd3215499dd2532e77723a..1f139054847f6aaa03ccc4b727fb23f4605cd43b 100755 --- a/tools/run_tests/sanity/core_banned_functions.py +++ b/tools/run_tests/sanity/core_banned_functions.py @@ -41,6 +41,8 @@ BANNED_EXCEPT = { 'grpc_closure_sched(' : ['src/core/lib/iomgr/closure.c'], 'grpc_closure_run(' : ['src/core/lib/iomgr/closure.c'], 'grpc_closure_list_sched(' : ['src/core/lib/iomgr/closure.c'], + 'gpr_getenv_silent(' : ['src/core/lib/support/log.c', 'src/core/lib/support/env_linux.c', + 'src/core/lib/support/env_posix.c', 'src/core/lib/support/env_windows.c'], } errors = 0