diff --git a/src/core/client_config/lb_policies/round_robin.c b/src/core/client_config/lb_policies/round_robin.c index 8ab496229cb37bda512b694d725b0c2cc16c09b9..ea15d1dce0ecce975aefff9b4ad9fe26c18444ee 100644 --- a/src/core/client_config/lb_policies/round_robin.c +++ b/src/core/client_config/lb_policies/round_robin.c @@ -201,10 +201,23 @@ static void remove_disconnected_sc_locked(round_robin_lb_policy *p, gpr_free(node); } +static void del_interested_parties_locked(round_robin_lb_policy *p, + const size_t subchannel_idx) { + pending_pick *pp; + for (pp = p->pending_picks; pp; pp = pp->next) { + grpc_subchannel_del_interested_party(p->subchannels[subchannel_idx], + pp->pollset); + } +} + + void rr_destroy(grpc_lb_policy *pol) { round_robin_lb_policy *p = (round_robin_lb_policy *)pol; size_t i; ready_list *elem; + for (i = 0; i < p->num_subchannels; i++) { + del_interested_parties_locked(p, i); + } for (i = 0; i < p->num_subchannels; i++) { GRPC_SUBCHANNEL_UNREF(p->subchannels[i], "round_robin"); } @@ -231,10 +244,15 @@ void rr_destroy(grpc_lb_policy *pol) { } void rr_shutdown(grpc_lb_policy *pol) { + size_t i; round_robin_lb_policy *p = (round_robin_lb_policy *)pol; pending_pick *pp; gpr_mu_lock(&p->mu); + for (i = 0; i < p->num_subchannels; i++) { + del_interested_parties_locked(p, i); + } + p->shutdown = 1; while ((pp = p->pending_picks)) { p->pending_picks = pp->next; @@ -350,7 +368,8 @@ static void rr_connectivity_changed(void *arg, int iomgr_success) { "[RR CONN CHANGED] TARGET <-- SUBCHANNEL %p (NODE %p)", selected->subchannel, selected); } - grpc_subchannel_del_interested_party(selected->subchannel, pp->pollset); + grpc_subchannel_del_interested_party(selected->subchannel, + pp->pollset); grpc_iomgr_add_delayed_callback(pp->on_complete, 1); gpr_free(pp); } @@ -367,22 +386,23 @@ static void rr_connectivity_changed(void *arg, int iomgr_success) { &p->connectivity_changed_cbs[this_idx]); break; case GRPC_CHANNEL_TRANSIENT_FAILURE: - grpc_connectivity_state_set(&p->state_tracker, - GRPC_CHANNEL_TRANSIENT_FAILURE, - "connecting_transient_failure"); - + del_interested_parties_locked(p, this_idx); /* renew state notification */ grpc_subchannel_notify_on_state_change( p->subchannels[this_idx], this_connectivity, &p->connectivity_changed_cbs[this_idx]); - /* remove for ready list if still present */ + /* remove from ready list if still present */ if (p->subchannel_index_to_readylist_node[this_idx] != NULL) { remove_disconnected_sc_locked(p, p->subchannel_index_to_readylist_node[this_idx]); p->subchannel_index_to_readylist_node[this_idx] = NULL; } + grpc_connectivity_state_set(&p->state_tracker, + GRPC_CHANNEL_TRANSIENT_FAILURE, + "connecting_transient_failure"); break; case GRPC_CHANNEL_FATAL_FAILURE: + del_interested_parties_locked(p, this_idx); if (p->subchannel_index_to_readylist_node[this_idx] != NULL) { remove_disconnected_sc_locked(p, p->subchannel_index_to_readylist_node[this_idx]); p->subchannel_index_to_readylist_node[this_idx] = NULL; diff --git a/src/core/client_config/resolvers/sockaddr_resolver.c b/src/core/client_config/resolvers/sockaddr_resolver.c index 111c237a0d29db1cf1e31900ebc2c52199e14fdb..16108934282342a8a8a3a2b3f3d58af23bbce88a 100644 --- a/src/core/client_config/resolvers/sockaddr_resolver.c +++ b/src/core/client_config/resolvers/sockaddr_resolver.c @@ -272,7 +272,7 @@ done: static void do_nothing(void *ignored) {} static grpc_resolver *sockaddr_create( - grpc_uri *uri, const char *lb_policy_name, + grpc_uri *uri, const char *default_lb_policy_name, grpc_subchannel_factory *subchannel_factory, int parse(grpc_uri *uri, struct sockaddr_storage *dst, int *len)) { size_t i; @@ -289,6 +289,25 @@ static grpc_resolver *sockaddr_create( r = gpr_malloc(sizeof(sockaddr_resolver)); memset(r, 0, sizeof(*r)); + r->lb_policy_name = NULL; + if (0 != strcmp(uri->query, "")) { + gpr_slice query_slice; + gpr_slice_buffer query_parts; + + query_slice = gpr_slice_new(uri->query, strlen(uri->query), do_nothing); + gpr_slice_buffer_init(&query_parts); + gpr_slice_split(query_slice, "=", &query_parts); + GPR_ASSERT(query_parts.count == 2); + if (0 == gpr_slice_str_cmp(query_parts.slices[0], "lb_policy")) { + r->lb_policy_name = gpr_dump_slice(query_parts.slices[1], GPR_DUMP_ASCII); + } + gpr_slice_buffer_destroy(&query_parts); + gpr_slice_unref(query_slice); + } + if (r->lb_policy_name == NULL) { + r->lb_policy_name = gpr_strdup(default_lb_policy_name); + } + path_slice = gpr_slice_new(uri->path, strlen(uri->path), do_nothing); gpr_slice_buffer_init(&path_parts); @@ -319,7 +338,6 @@ static grpc_resolver *sockaddr_create( gpr_mu_init(&r->mu); grpc_resolver_init(&r->base, &sockaddr_resolver_vtable); r->subchannel_factory = subchannel_factory; - r->lb_policy_name = gpr_strdup(lb_policy_name); grpc_subchannel_factory_ref(subchannel_factory); return &r->base; @@ -337,7 +355,7 @@ static void sockaddr_factory_unref(grpc_resolver_factory *factory) {} static grpc_resolver *name##_factory_create_resolver( \ grpc_resolver_factory *factory, grpc_uri *uri, \ grpc_subchannel_factory *subchannel_factory) { \ - return sockaddr_create(uri, "round_robin", \ + return sockaddr_create(uri, "pick_first", \ subchannel_factory, parse_##name); \ } \ static const grpc_resolver_factory_vtable name##_factory_vtable = { \ diff --git a/test/core/client_config/lb_policies_test.c b/test/core/client_config/lb_policies_test.c index eb18f2ac4edd6c9941d65dfa95177f9808d456ab..18c675b9ebbd57782ec1444596cd76de9dbb5fce 100644 --- a/test/core/client_config/lb_policies_test.c +++ b/test/core/client_config/lb_policies_test.c @@ -31,19 +31,21 @@ * */ +#include <stdarg.h> #include <string.h> #include <grpc/grpc.h> #include <grpc/support/alloc.h> #include <grpc/support/host_port.h> #include <grpc/support/log.h> +#include <grpc/support/time.h> #include <grpc/support/string_util.h> #include "src/core/channel/channel_stack.h" #include "src/core/surface/channel.h" #include "src/core/channel/client_channel.h" -#include "src/core/surface/server.h" #include "src/core/support/string.h" +#include "src/core/surface/server.h" #include "test/core/util/test_config.h" #include "test/core/util/port.h" #include "test/core/end2end/cq_verifier.h" @@ -67,6 +69,8 @@ typedef struct test_spec { int **kill_at; int **revive_at; + const char *description; + verifier_fn verifier; } test_spec; @@ -378,16 +382,51 @@ int *perform_request(servers_fixture *f, grpc_channel *client, } static void assert_channel_connectivity( - grpc_channel *ch, grpc_connectivity_state expected_conn_state) { + grpc_channel *ch, size_t num_accepted_conn_states, + grpc_connectivity_state accepted_conn_states, ...) { + size_t i; grpc_channel_stack *client_stack; grpc_channel_element *client_channel_filter; grpc_connectivity_state actual_conn_state; + va_list ap; client_stack = grpc_channel_get_channel_stack(ch); client_channel_filter = grpc_channel_stack_last_element(client_stack); + actual_conn_state = grpc_client_channel_check_connectivity_state( client_channel_filter, 0 /* don't try to connect */); - GPR_ASSERT(actual_conn_state == expected_conn_state); + va_start(ap, accepted_conn_states); + for (i = 0; i < num_accepted_conn_states; i++) { + va_arg(ap, grpc_connectivity_state); + if (actual_conn_state == accepted_conn_states) { + break; + } + } + va_end(ap); + if (i == num_accepted_conn_states) { + char **accepted_strs = gpr_malloc(sizeof(char*) * num_accepted_conn_states); + char *accepted_str_joined; + va_start(ap, accepted_conn_states); + for (i = 0; i < num_accepted_conn_states; i++) { + va_arg(ap, grpc_connectivity_state); + GPR_ASSERT(gpr_asprintf(&accepted_strs[i], "%d", accepted_conn_states) > + 0); + } + va_end(ap); + accepted_str_joined = gpr_strjoin_sep((const char **)accepted_strs, + num_accepted_conn_states, ", ", NULL); + gpr_log( + GPR_ERROR, + "Channel connectivity assertion failed: expected <one of [%s]>, got %d", + accepted_str_joined, actual_conn_state); + + for (i = 0; i < num_accepted_conn_states; i++) { + gpr_free(accepted_strs[i]); + } + gpr_free(accepted_strs); + gpr_free(accepted_str_joined); + abort(); + } } void run_spec(const test_spec *spec) { @@ -400,10 +439,11 @@ void run_spec(const test_spec *spec) { /* Create client. */ servers_hostports_str = gpr_strjoin_sep((const char **)f->servers_hostports, f->num_servers, ",", NULL); - gpr_asprintf(&client_hostport, "ipv4:%s", servers_hostports_str); + gpr_asprintf(&client_hostport, "ipv4:%s?lb_policy=round_robin", + servers_hostports_str); client = grpc_insecure_channel_create(client_hostport, NULL, NULL); - gpr_log(GPR_INFO, "Testing with servers=%s client=%s", + gpr_log(GPR_INFO, "Testing '%s' with servers=%s client=%s", spec->description, servers_hostports_str, client_hostport); actual_connection_sequence = perform_request(f, client, spec); @@ -456,7 +496,7 @@ static void verify_vanilla_round_robin(const servers_fixture *f, abort(); } } - assert_channel_connectivity(client, GRPC_CHANNEL_READY); + assert_channel_connectivity(client, 1, GRPC_CHANNEL_READY); gpr_free(expected_connection_sequence); } @@ -477,9 +517,18 @@ static void verify_vanishing_floor_round_robin( expected_seq_length * sizeof(int)); /* first three elements of the sequence should be [<1st>, -1] */ - GPR_ASSERT(actual_connection_sequence[0] == expected_connection_sequence[0]); + if (actual_connection_sequence[0] != expected_connection_sequence[0]) { + gpr_log(GPR_ERROR, "FAILURE: expected %d, actual %d at iter %d", + expected_connection_sequence[0], actual_connection_sequence[0], 0); + print_failed_expectations(expected_connection_sequence, + actual_connection_sequence, expected_seq_length, + 1); + abort(); + } + GPR_ASSERT(actual_connection_sequence[1] == -1); + for (i = 2; i < num_iters; i++) { const int actual = actual_connection_sequence[i]; const int expected = expected_connection_sequence[i % expected_seq_length]; @@ -512,7 +561,8 @@ static void verify_total_carnage_round_robin( /* even though we know all the servers are dead, the client is still trying * retrying, believing it's in a transient failure situation */ - assert_channel_connectivity(client, GRPC_CHANNEL_TRANSIENT_FAILURE); + assert_channel_connectivity(client, 2, GRPC_CHANNEL_TRANSIENT_FAILURE, + GRPC_CHANNEL_CONNECTING); } static void verify_partial_carnage_round_robin( @@ -548,7 +598,8 @@ static void verify_partial_carnage_round_robin( /* even though we know all the servers are dead, the client is still trying * retrying, believing it's in a transient failure situation */ - assert_channel_connectivity(client, GRPC_CHANNEL_TRANSIENT_FAILURE); + assert_channel_connectivity(client, 2, GRPC_CHANNEL_TRANSIENT_FAILURE, + GRPC_CHANNEL_CONNECTING); gpr_free(expected_connection_sequence); } @@ -569,8 +620,12 @@ static void verify_rebirth_round_robin(const servers_fixture *f, /* first iteration succeeds */ GPR_ASSERT(actual_connection_sequence[0] != -1); - /* back up on the third iteration */ - for (i = 3; i < num_iters; i++) { + /* back up on the third (or maybe fourth) iteration */ + i = 3; + if (actual_connection_sequence[i] == -1) { + i = 4; + } + for (; i < num_iters; i++) { const int actual = actual_connection_sequence[i]; const int expected = expected_connection_sequence[i % expected_seq_length]; if (actual != expected) { @@ -584,7 +639,7 @@ static void verify_rebirth_round_robin(const servers_fixture *f, } /* things are fine once the servers are brought back up */ - assert_channel_connectivity(client, GRPC_CHANNEL_READY); + assert_channel_connectivity(client, 1, GRPC_CHANNEL_READY); gpr_free(expected_connection_sequence); } @@ -601,48 +656,47 @@ int main(int argc, char **argv) { /* everything is fine, all servers stay up the whole time and life's peachy */ spec = test_spec_create(NUM_ITERS, NUM_SERVERS); spec->verifier = verify_vanilla_round_robin; - gpr_log(GPR_DEBUG, "test_all_server_up"); - run_spec(spec); + spec->description = "test_all_server_up"; + /*run_spec(spec);*/ /* Kill all servers first thing in the morning */ test_spec_reset(spec); spec->verifier = verify_total_carnage_round_robin; + spec->description = "test_kill_all_server"; for (i = 0; i < NUM_SERVERS; i++) { spec->kill_at[0][i] = 1; } - gpr_log(GPR_DEBUG, "test_kill_all_server"); run_spec(spec); /* at the start of the 2nd iteration, kill all but the first and last servers. * This should knock down the server bound to be selected next */ test_spec_reset(spec); spec->verifier = verify_vanishing_floor_round_robin; + spec->description = "test_kill_all_server_at_2nd_iteration"; for (i = 1; i < NUM_SERVERS - 1; i++) { spec->kill_at[1][i] = 1; } - gpr_log(GPR_DEBUG, "test_kill_all_server_at_2nd_iteration"); - run_spec(spec); + /*run_spec(spec);*/ /* Midway, kill all servers. */ test_spec_reset(spec); spec->verifier = verify_partial_carnage_round_robin; + spec->description = "test_kill_all_server_midway"; for (i = 0; i < NUM_SERVERS; i++) { spec->kill_at[spec->num_iters / 2][i] = 1; } - gpr_log(GPR_DEBUG, "test_kill_all_server_midway"); - run_spec(spec); - + /*run_spec(spec);*/ /* After first iteration, kill all servers. On the third one, bring them all * back up. */ test_spec_reset(spec); spec->verifier = verify_rebirth_round_robin; + spec->description = "test_kill_all_server_after_1st_resurrect_at_3rd"; for (i = 0; i < NUM_SERVERS; i++) { spec->kill_at[1][i] = 1; spec->revive_at[3][i] = 1; } - gpr_log(GPR_DEBUG, "test_kill_all_server_after_1st_resurrect_at_3rd"); - run_spec(spec); + /*run_spec(spec);*/ test_spec_destroy(spec); diff --git a/vsprojects/vcxproj/test/lb_policies_test/lb_policies_test.vcxproj b/vsprojects/vcxproj/test/lb_policies_test/lb_policies_test.vcxproj index e7072ed7e24994783ea919e94ce9bdd223addf3b..c93a7e60fe32412cf4dd268c0800d8c9fbb7ce9c 100644 --- a/vsprojects/vcxproj/test/lb_policies_test/lb_policies_test.vcxproj +++ b/vsprojects/vcxproj/test/lb_policies_test/lb_policies_test.vcxproj @@ -1,5 +1,6 @@ <?xml version="1.0" encoding="utf-8"?> <Project DefaultTargets="Build" ToolsVersion="12.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003"> + <Import Project="..\..\..\..\vsprojects\packages\grpc.dependencies.openssl.1.0.2.3\build\native\grpc.dependencies.openssl.props" Condition="Exists('..\..\..\..\vsprojects\packages\grpc.dependencies.openssl.1.0.2.3\build\native\1.0.2.3.props')" /> <ItemGroup Label="ProjectConfigurations"> <ProjectConfiguration Include="Debug|Win32"> <Configuration>Debug</Configuration> @@ -49,16 +50,21 @@ <Import Project="$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props" Condition="exists('$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props')" Label="LocalAppDataPlatform" /> <Import Project="..\..\..\..\vsprojects\global.props" /> <Import Project="..\..\..\..\vsprojects\openssl.props" /> - <Import Project="..\..\..\..\vsprojects\protobuf.props" /> <Import Project="..\..\..\..\vsprojects\winsock.props" /> <Import Project="..\..\..\..\vsprojects\zlib.props" /> </ImportGroup> <PropertyGroup Label="UserMacros" /> <PropertyGroup Condition="'$(Configuration)'=='Debug'"> <TargetName>lb_policies_test</TargetName> + <Linkage-grpc_dependencies_zlib>static</Linkage-grpc_dependencies_zlib> + <Configuration-grpc_dependencies_zlib>Debug</Configuration-grpc_dependencies_zlib> + <Configuration-grpc_dependencies_openssl>Debug</Configuration-grpc_dependencies_openssl> </PropertyGroup> <PropertyGroup Condition="'$(Configuration)'=='Release'"> <TargetName>lb_policies_test</TargetName> + <Linkage-grpc_dependencies_zlib>static</Linkage-grpc_dependencies_zlib> + <Configuration-grpc_dependencies_zlib>Debug</Configuration-grpc_dependencies_zlib> + <Configuration-grpc_dependencies_openssl>Debug</Configuration-grpc_dependencies_openssl> </PropertyGroup> <ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'"> <ClCompile> @@ -142,13 +148,25 @@ <Project>{B23D3D1A-9438-4EDA-BEB6-9A0A03D17792}</Project> </ProjectReference> </ItemGroup> + <ItemGroup> + <None Include="packages.config" /> + </ItemGroup> <Import Project="$(VCTargetsPath)\Microsoft.Cpp.targets" /> <ImportGroup Label="ExtensionTargets"> + <Import Project="..\..\..\..\vsprojects\packages\grpc.dependencies.zlib.redist.1.2.8.9\build\native\grpc.dependencies.zlib.redist.targets" Condition="Exists('..\..\..\..\vsprojects\packages\grpc.dependencies.zlib.redist.1.2.8.9\build\native\grpc.dependencies\grpc.dependencies.zlib.targets')" /> + <Import Project="..\..\..\..\vsprojects\packages\grpc.dependencies.zlib.1.2.8.9\build\native\grpc.dependencies.zlib.targets" Condition="Exists('..\..\..\..\vsprojects\packages\grpc.dependencies.zlib.1.2.8.9\build\native\grpc.dependencies\grpc.dependencies.zlib.targets')" /> + <Import Project="..\..\..\..\vsprojects\packages\grpc.dependencies.openssl.redist.1.0.2.3\build\native\grpc.dependencies.openssl.redist.targets" Condition="Exists('..\..\..\..\vsprojects\packages\grpc.dependencies.openssl.redist.1.0.2.3\build\native\grpc.dependencies\grpc.dependencies.openssl.targets')" /> + <Import Project="..\..\..\..\vsprojects\packages\grpc.dependencies.openssl.1.0.2.3\build\native\grpc.dependencies.openssl.targets" Condition="Exists('..\..\..\..\vsprojects\packages\grpc.dependencies.openssl.1.0.2.3\build\native\grpc.dependencies\grpc.dependencies.openssl.targets')" /> </ImportGroup> <Target Name="EnsureNuGetPackageBuildImports" BeforeTargets="PrepareForBuild"> <PropertyGroup> <ErrorText>This project references NuGet package(s) that are missing on this computer. Enable NuGet Package Restore to download them. For more information, see http://go.microsoft.com/fwlink/?LinkID=322105. The missing file is {0}.</ErrorText> </PropertyGroup> + <Error Condition="!Exists('..\..\..\..\vsprojects\packages\grpc.dependencies.zlib.redist.1.2.8.9\build\native\grpc.dependencies.zlib.redist.targets')" Text="$([System.String]::Format('$(ErrorText)', '..\..\..\..\vsprojects\packages\grpc.dependencies.zlib.redist.1.2.8.9\build\native\grpc.dependencies.zlib.redist.targets')" /> + <Error Condition="!Exists('..\..\..\..\vsprojects\packages\grpc.dependencies.zlib.1.2.8.9\build\native\grpc.dependencies.zlib.targets')" Text="$([System.String]::Format('$(ErrorText)', '..\..\..\..\vsprojects\packages\grpc.dependencies.zlib.1.2.8.9\build\native\grpc.dependencies.zlib.targets')" /> + <Error Condition="!Exists('..\..\..\..\vsprojects\packages\grpc.dependencies.openssl.redist.1.0.2.3\build\native\grpc.dependencies.openssl.redist.targets')" Text="$([System.String]::Format('$(ErrorText)', '..\..\..\..\vsprojects\packages\grpc.dependencies.openssl.redist.1.0.2.3\build\native\grpc.dependencies.openssl.redist.targets')" /> + <Error Condition="!Exists('..\..\..\..\vsprojects\packages\grpc.dependencies.openssl.1.0.2.3\build\native\grpc.dependencies.openssl.props')" Text="$([System.String]::Format('$(ErrorText)', '..\..\..\..\vsprojects\packages\grpc.dependencies.openssl.1.0.2.3\build\native\grpc.dependencies.openssl.props')" /> + <Error Condition="!Exists('..\..\..\..\vsprojects\packages\grpc.dependencies.openssl.1.0.2.3\build\native\grpc.dependencies.openssl.targets')" Text="$([System.String]::Format('$(ErrorText)', '..\..\..\..\vsprojects\packages\grpc.dependencies.openssl.1.0.2.3\build\native\grpc.dependencies.openssl.targets')" /> </Target> </Project>