Skip to content
Snippets Groups Projects
Commit 1e55bd45 authored by Craig Tiller's avatar Craig Tiller
Browse files

Add retry for dns resolution

parent e91ef68d
No related branches found
No related tags found
No related merge requests found
...@@ -53,7 +53,9 @@ void grpc_client_config_ref(grpc_client_config *c) { gpr_ref(&c->refs); } ...@@ -53,7 +53,9 @@ void grpc_client_config_ref(grpc_client_config *c) { gpr_ref(&c->refs); }
void grpc_client_config_unref(grpc_exec_ctx *exec_ctx, grpc_client_config *c) { void grpc_client_config_unref(grpc_exec_ctx *exec_ctx, grpc_client_config *c) {
if (gpr_unref(&c->refs)) { if (gpr_unref(&c->refs)) {
GRPC_LB_POLICY_UNREF(exec_ctx, c->lb_policy, "client_config"); if (c->lb_policy != NULL) {
GRPC_LB_POLICY_UNREF(exec_ctx, c->lb_policy, "client_config");
}
gpr_free(c); gpr_free(c);
} }
} }
......
...@@ -387,8 +387,8 @@ static void pick_first_factory_unref(grpc_lb_policy_factory *factory) {} ...@@ -387,8 +387,8 @@ static void pick_first_factory_unref(grpc_lb_policy_factory *factory) {}
static grpc_lb_policy *create_pick_first(grpc_lb_policy_factory *factory, static grpc_lb_policy *create_pick_first(grpc_lb_policy_factory *factory,
grpc_lb_policy_args *args) { grpc_lb_policy_args *args) {
if (args->num_subchannels == 0) return NULL;
pick_first_lb_policy *p = gpr_malloc(sizeof(*p)); pick_first_lb_policy *p = gpr_malloc(sizeof(*p));
GPR_ASSERT(args->num_subchannels > 0);
memset(p, 0, sizeof(*p)); memset(p, 0, sizeof(*p));
grpc_lb_policy_init(&p->base, &pick_first_lb_policy_vtable); grpc_lb_policy_init(&p->base, &pick_first_lb_policy_vtable);
p->subchannels = p->subchannels =
......
...@@ -41,6 +41,7 @@ ...@@ -41,6 +41,7 @@
#include "src/core/client_config/lb_policy_registry.h" #include "src/core/client_config/lb_policy_registry.h"
#include "src/core/iomgr/resolve_address.h" #include "src/core/iomgr/resolve_address.h"
#include "src/core/iomgr/timer.h"
#include "src/core/support/string.h" #include "src/core/support/string.h"
typedef struct { typedef struct {
...@@ -71,6 +72,9 @@ typedef struct { ...@@ -71,6 +72,9 @@ typedef struct {
grpc_client_config **target_config; grpc_client_config **target_config;
/** current (fully resolved) config */ /** current (fully resolved) config */
grpc_client_config *resolved_config; grpc_client_config *resolved_config;
/** retry timer */
bool have_retry_timer;
grpc_timer retry_timer;
} dns_resolver; } dns_resolver;
static void dns_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *r); static void dns_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *r);
...@@ -125,6 +129,21 @@ static void dns_next(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver, ...@@ -125,6 +129,21 @@ static void dns_next(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver,
gpr_mu_unlock(&r->mu); gpr_mu_unlock(&r->mu);
} }
static void dns_on_retry_timer(grpc_exec_ctx *exec_ctx, void *arg,
bool success) {
dns_resolver *r = arg;
if (success) {
gpr_mu_lock(&r->mu);
if (!r->resolving) {
dns_start_resolving_locked(r);
}
gpr_mu_unlock(&r->mu);
}
GRPC_RESOLVER_UNREF(exec_ctx, &r->base, "retry-timer");
}
static void dns_on_resolved(grpc_exec_ctx *exec_ctx, void *arg, static void dns_on_resolved(grpc_exec_ctx *exec_ctx, void *arg,
grpc_resolved_addresses *addresses) { grpc_resolved_addresses *addresses) {
dns_resolver *r = arg; dns_resolver *r = arg;
...@@ -133,29 +152,47 @@ static void dns_on_resolved(grpc_exec_ctx *exec_ctx, void *arg, ...@@ -133,29 +152,47 @@ static void dns_on_resolved(grpc_exec_ctx *exec_ctx, void *arg,
grpc_subchannel_args args; grpc_subchannel_args args;
grpc_lb_policy *lb_policy; grpc_lb_policy *lb_policy;
size_t i; size_t i;
if (addresses) { gpr_mu_lock(&r->mu);
GPR_ASSERT(r->resolving);
r->resolving = 0;
if (addresses != NULL) {
grpc_lb_policy_args lb_policy_args; grpc_lb_policy_args lb_policy_args;
config = grpc_client_config_create(); config = grpc_client_config_create();
subchannels = gpr_malloc(sizeof(grpc_subchannel *) * addresses->naddrs); subchannels = gpr_malloc(sizeof(grpc_subchannel *) * addresses->naddrs);
size_t naddrs = 0;
for (i = 0; i < addresses->naddrs; i++) { for (i = 0; i < addresses->naddrs; i++) {
memset(&args, 0, sizeof(args)); memset(&args, 0, sizeof(args));
args.addr = (struct sockaddr *)(addresses->addrs[i].addr); args.addr = (struct sockaddr *)(addresses->addrs[i].addr);
args.addr_len = (size_t)addresses->addrs[i].len; args.addr_len = (size_t)addresses->addrs[i].len;
subchannels[i] = grpc_subchannel_factory_create_subchannel( grpc_subchannel *subchannel = grpc_subchannel_factory_create_subchannel(
exec_ctx, r->subchannel_factory, &args); exec_ctx, r->subchannel_factory, &args);
if (subchannel != NULL) {
subchannels[naddrs++] = subchannel;
}
} }
memset(&lb_policy_args, 0, sizeof(lb_policy_args)); memset(&lb_policy_args, 0, sizeof(lb_policy_args));
lb_policy_args.subchannels = subchannels; lb_policy_args.subchannels = subchannels;
lb_policy_args.num_subchannels = addresses->naddrs; lb_policy_args.num_subchannels = naddrs;
lb_policy = grpc_lb_policy_create(r->lb_policy_name, &lb_policy_args); lb_policy = grpc_lb_policy_create(r->lb_policy_name, &lb_policy_args);
grpc_client_config_set_lb_policy(config, lb_policy); if (lb_policy != NULL) {
GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "construction"); grpc_client_config_set_lb_policy(config, lb_policy);
GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "construction");
}
grpc_resolved_addresses_destroy(addresses); grpc_resolved_addresses_destroy(addresses);
gpr_free(subchannels); gpr_free(subchannels);
} else {
int retry_seconds = 15;
gpr_log(GPR_DEBUG, "dns resolution failed: retrying in %d seconds",
retry_seconds);
GPR_ASSERT(!r->have_retry_timer);
r->have_retry_timer = true;
gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
GRPC_RESOLVER_REF(&r->base, "retry-timer");
grpc_timer_init(
exec_ctx, &r->retry_timer,
gpr_time_add(now, gpr_time_from_seconds(retry_seconds, GPR_TIMESPAN)),
dns_on_retry_timer, r, now);
} }
gpr_mu_lock(&r->mu);
GPR_ASSERT(r->resolving);
r->resolving = 0;
if (r->resolved_config) { if (r->resolved_config) {
grpc_client_config_unref(exec_ctx, r->resolved_config); grpc_client_config_unref(exec_ctx, r->resolved_config);
} }
......
...@@ -39,6 +39,7 @@ ...@@ -39,6 +39,7 @@
#include <grpc/support/alloc.h> #include <grpc/support/alloc.h>
#include "src/core/iomgr/resolve_address.h" #include "src/core/iomgr/resolve_address.h"
#include "src/core/iomgr/timer.h"
#include "test/core/util/test_config.h" #include "test/core/util/test_config.h"
static void subchannel_factory_ref(grpc_subchannel_factory *scv) {} static void subchannel_factory_ref(grpc_subchannel_factory *scv) {}
...@@ -47,7 +48,7 @@ static void subchannel_factory_unref(grpc_exec_ctx *exec_ctx, ...@@ -47,7 +48,7 @@ static void subchannel_factory_unref(grpc_exec_ctx *exec_ctx,
static grpc_subchannel *subchannel_factory_create_subchannel( static grpc_subchannel *subchannel_factory_create_subchannel(
grpc_exec_ctx *exec_ctx, grpc_subchannel_factory *factory, grpc_exec_ctx *exec_ctx, grpc_subchannel_factory *factory,
grpc_subchannel_args *args) { grpc_subchannel_args *args) {
GPR_UNREACHABLE_CODE(return NULL); return NULL;
} }
static const grpc_subchannel_factory_vtable sc_vtable = { static const grpc_subchannel_factory_vtable sc_vtable = {
...@@ -96,6 +97,20 @@ static void on_done(grpc_exec_ctx *exec_ctx, void *ev, bool success) { ...@@ -96,6 +97,20 @@ static void on_done(grpc_exec_ctx *exec_ctx, void *ev, bool success) {
gpr_event_set(ev, (void *)1); gpr_event_set(ev, (void *)1);
} }
// interleave waiting for an event with a timer check
static bool wait_loop(int deadline_seconds, gpr_event *ev) {
while (deadline_seconds) {
gpr_log(GPR_DEBUG, "Test: waiting for %d more seconds", deadline_seconds);
if (gpr_event_wait(ev, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(1))) return true;
deadline_seconds--;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_timer_check(&exec_ctx, gpr_now(GPR_CLOCK_MONOTONIC), NULL);
grpc_exec_ctx_finish(&exec_ctx);
}
return false;
}
int main(int argc, char **argv) { int main(int argc, char **argv) {
grpc_test_init(argc, argv); grpc_test_init(argc, argv);
...@@ -113,7 +128,7 @@ int main(int argc, char **argv) { ...@@ -113,7 +128,7 @@ int main(int argc, char **argv) {
grpc_resolver_next(&exec_ctx, resolver, &config, grpc_resolver_next(&exec_ctx, resolver, &config,
grpc_closure_create(on_done, &ev1)); grpc_closure_create(on_done, &ev1));
grpc_exec_ctx_flush(&exec_ctx); grpc_exec_ctx_flush(&exec_ctx);
GPR_ASSERT(gpr_event_wait(&ev1, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5))); GPR_ASSERT(wait_loop(5, &ev1));
GPR_ASSERT(config == NULL); GPR_ASSERT(config == NULL);
gpr_event ev2; gpr_event ev2;
...@@ -121,7 +136,7 @@ int main(int argc, char **argv) { ...@@ -121,7 +136,7 @@ int main(int argc, char **argv) {
grpc_resolver_next(&exec_ctx, resolver, &config, grpc_resolver_next(&exec_ctx, resolver, &config,
grpc_closure_create(on_done, &ev2)); grpc_closure_create(on_done, &ev2));
grpc_exec_ctx_flush(&exec_ctx); grpc_exec_ctx_flush(&exec_ctx);
GPR_ASSERT(gpr_event_wait(&ev2, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5))); GPR_ASSERT(wait_loop(30, &ev2));
GPR_ASSERT(config != NULL); GPR_ASSERT(config != NULL);
grpc_client_config_unref(&exec_ctx, config); grpc_client_config_unref(&exec_ctx, config);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment