Skip to content
Snippets Groups Projects
Commit 358c29a0 authored by David Garcia Quintas's avatar David Garcia Quintas
Browse files

Increased round robin coverage

parent b334855a
No related branches found
No related tags found
No related merge requests found
......@@ -38,18 +38,19 @@
#include <grpc/support/alloc.h>
#include <grpc/support/host_port.h>
#include <grpc/support/log.h>
#include <grpc/support/time.h>
#include <grpc/support/string_util.h>
#include <grpc/support/time.h>
#include "src/core/channel/channel_stack.h"
#include "src/core/channel/client_channel.h"
#include "src/core/client_config/lb_policies/round_robin.h"
#include "src/core/client_config/lb_policy_registry.h"
#include "src/core/surface/channel.h"
#include "src/core/support/string.h"
#include "src/core/surface/channel.h"
#include "src/core/surface/server.h"
#include "test/core/util/test_config.h"
#include "test/core/util/port.h"
#include "test/core/end2end/cq_verifier.h"
#include "test/core/util/port.h"
#include "test/core/util/test_config.h"
typedef struct servers_fixture {
size_t num_servers;
......@@ -136,8 +137,9 @@ static void kill_server(const servers_fixture *f, size_t i) {
gpr_log(GPR_INFO, "KILLING SERVER %d", i);
GPR_ASSERT(f->servers[i] != NULL);
grpc_server_shutdown_and_notify(f->servers[i], f->cq, tag(10000));
GPR_ASSERT(grpc_completion_queue_pluck(f->cq, tag(10000), n_millis_time(5000),
NULL).type == GRPC_OP_COMPLETE);
GPR_ASSERT(
grpc_completion_queue_pluck(f->cq, tag(10000), n_millis_time(5000), NULL)
.type == GRPC_OP_COMPLETE);
grpc_server_destroy(f->servers[i]);
f->servers[i] = NULL;
}
......@@ -203,8 +205,8 @@ static void teardown_servers(servers_fixture *f) {
if (f->servers[i] == NULL) continue;
grpc_server_shutdown_and_notify(f->servers[i], f->cq, tag(10000));
GPR_ASSERT(grpc_completion_queue_pluck(f->cq, tag(10000),
n_millis_time(5000),
NULL).type == GRPC_OP_COMPLETE);
n_millis_time(5000), NULL)
.type == GRPC_OP_COMPLETE);
grpc_server_destroy(f->servers[i]);
}
grpc_completion_queue_shutdown(f->cq);
......@@ -225,8 +227,8 @@ static void teardown_servers(servers_fixture *f) {
}
/** Returns connection sequence (server indices), which must be freed */
int *perform_request(servers_fixture *f, grpc_channel *client,
request_data *rdata, const test_spec *spec) {
static int *perform_request(servers_fixture *f, grpc_channel *client,
request_data *rdata, const test_spec *spec) {
grpc_call *c;
int s_idx;
int *s_valid;
......@@ -242,8 +244,6 @@ int *perform_request(servers_fixture *f, grpc_channel *client,
s_valid = gpr_malloc(sizeof(int) * f->num_servers);
connection_sequence = gpr_malloc(sizeof(int) * spec->num_iters);
/* Send a trivial request. */
for (iter_num = 0; iter_num < spec->num_iters; iter_num++) {
cq_verifier *cqv = cq_verifier_create(f->cq);
rdata->details = NULL;
......@@ -304,8 +304,8 @@ int *perform_request(servers_fixture *f, grpc_channel *client,
s_idx = -1;
while ((ev = grpc_completion_queue_next(
f->cq, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(1), NULL)).type !=
GRPC_QUEUE_TIMEOUT) {
f->cq, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(1), NULL))
.type != GRPC_QUEUE_TIMEOUT) {
GPR_ASSERT(ev.type == GRPC_OP_COMPLETE);
read_tag = ((int)(gpr_intptr)ev.tag);
gpr_log(GPR_DEBUG, "EVENT: success:%d, type:%d, tag:%d iter:%d",
......@@ -324,8 +324,6 @@ int *perform_request(servers_fixture *f, grpc_channel *client,
}
}
gpr_log(GPR_DEBUG, "s_idx=%d", s_idx);
if (s_idx >= 0) {
op = ops;
op->op = GRPC_OP_SEND_INITIAL_METADATA;
......@@ -371,7 +369,7 @@ int *perform_request(servers_fixture *f, grpc_channel *client,
&rdata->call_details[s_idx],
&f->request_metadata_recv[s_idx], f->cq,
f->cq, tag(1000 + (int)s_idx)));
} else {
} else { /* no response from server */
grpc_call_cancel(c, NULL);
if (!completed_client) {
cq_expect_completion(cqv, tag(1), 1);
......@@ -397,6 +395,42 @@ int *perform_request(servers_fixture *f, grpc_channel *client,
return connection_sequence;
}
static grpc_call **perform_multirequest(servers_fixture *f,
grpc_channel *client,
size_t concurrent_calls) {
grpc_call **calls;
grpc_op ops[6];
grpc_op *op;
size_t i;
calls = gpr_malloc(sizeof(grpc_call *) * concurrent_calls);
for (i = 0; i < f->num_servers; i++) {
kill_server(f, i);
}
op = ops;
op->op = GRPC_OP_SEND_INITIAL_METADATA;
op->data.send_initial_metadata.count = 0;
op->flags = 0;
op->reserved = NULL;
op++;
op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
op->flags = 0;
op->reserved = NULL;
for (i = 0; i < concurrent_calls; i++) {
calls[i] = grpc_channel_create_call(
client, NULL, GRPC_PROPAGATE_DEFAULTS, f->cq, "/foo",
"foo.test.google.fr", gpr_inf_future(GPR_CLOCK_REALTIME), NULL);
GPR_ASSERT(calls[i]);
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(calls[i], ops,
(size_t)(op - ops), tag(1),
NULL));
}
return calls;
}
static void assert_channel_connectivity(
grpc_channel *ch, size_t num_accepted_conn_states,
grpc_connectivity_state accepted_conn_state, ...) {
......@@ -487,8 +521,110 @@ void run_spec(const test_spec *spec) {
gpr_free(actual_connection_sequence);
gpr_free(rdata.call_details);
grpc_channel_destroy(client); /* calls the LB's shutdown func */
teardown_servers(f);
}
static grpc_channel *create_client(const servers_fixture *f) {
grpc_channel *client;
char *client_hostport;
char *servers_hostports_str;
grpc_arg arg;
grpc_channel_args args;
servers_hostports_str = gpr_strjoin_sep((const char **)f->servers_hostports,
f->num_servers, ",", NULL);
gpr_asprintf(&client_hostport, "ipv4:%s?lb_policy=round_robin",
servers_hostports_str);
arg.type = GRPC_ARG_INTEGER;
arg.key = "grpc.testing.fixed_reconnect_backoff";
arg.value.integer = 100;
args.num_args = 1;
args.args = &arg;
client = grpc_insecure_channel_create(client_hostport, &args, NULL);
gpr_free(client_hostport);
gpr_free(servers_hostports_str);
return client;
}
static void test_ping() {
grpc_channel *client;
request_data rdata;
servers_fixture *f;
cq_verifier *cqv;
grpc_connectivity_state state = GRPC_CHANNEL_IDLE;
const size_t num_servers = 1;
int i;
rdata.call_details = gpr_malloc(sizeof(grpc_call_details) * num_servers);
f = setup_servers("127.0.0.1", &rdata, num_servers);
cqv = cq_verifier_create(f->cq);
client = create_client(f);
grpc_channel_ping(client, f->cq, tag(0), NULL);
cq_expect_completion(cqv, tag(0), 0);
/* check that we're still in idle, and start connecting */
GPR_ASSERT(grpc_channel_check_connectivity_state(client, 1) ==
GRPC_CHANNEL_IDLE);
/* we'll go through some set of transitions (some might be missed), until
READY is reached */
while (state != GRPC_CHANNEL_READY) {
grpc_channel_watch_connectivity_state(
client, state, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(3), f->cq, tag(99));
cq_expect_completion(cqv, tag(99), 1);
cq_verify(cqv);
state = grpc_channel_check_connectivity_state(client, 0);
GPR_ASSERT(state == GRPC_CHANNEL_READY ||
state == GRPC_CHANNEL_CONNECTING ||
state == GRPC_CHANNEL_TRANSIENT_FAILURE);
}
for (i = 1; i <= 5; i++) {
grpc_channel_ping(client, f->cq, tag(i), NULL);
cq_expect_completion(cqv, tag(i), 1);
cq_verify(cqv);
}
gpr_free(rdata.call_details);
grpc_channel_destroy(client);
teardown_servers(f);
cq_verifier_destroy(cqv);
}
static void test_pending_calls(size_t concurrent_calls) {
size_t i;
grpc_call **calls;
grpc_channel *client;
request_data rdata;
servers_fixture *f;
test_spec *spec = test_spec_create(0, 4);
rdata.call_details =
gpr_malloc(sizeof(grpc_call_details) * spec->num_servers);
f = setup_servers("127.0.0.1", &rdata, spec->num_servers);
client = create_client(f);
calls = perform_multirequest(f, client, concurrent_calls);
grpc_call_cancel(
calls[0],
NULL); /* exercise the cancel pick path whilst there are pending picks */
gpr_free(rdata.call_details);
grpc_channel_destroy(client); /* calls the LB's shutdown func */
/* destroy the calls after the channel so that they are still around for the
* LB's shutdown func to process */
for (i = 0; i < concurrent_calls; i++) {
grpc_call_destroy(calls[i]);
}
gpr_free(calls);
teardown_servers(f);
test_spec_destroy(spec);
}
static void print_failed_expectations(const int *expected_connection_sequence,
......@@ -715,13 +851,14 @@ int main(int argc, char **argv) {
grpc_test_init(argc, argv);
grpc_init();
grpc_lb_round_robin_trace = 1;
GPR_ASSERT(grpc_lb_policy_create("this-lb-policy-does-not-exist", NULL) ==
NULL);
GPR_ASSERT(grpc_lb_policy_create(NULL, NULL) == NULL);
/* everything is fine, all servers stay up the whole time and life's peachy */
spec = test_spec_create(NUM_ITERS, NUM_SERVERS);
/* everything is fine, all servers stay up the whole time and life's peachy */
spec->verifier = verify_vanilla_round_robin;
spec->description = "test_all_server_up";
run_spec(spec);
......@@ -735,7 +872,8 @@ int main(int argc, char **argv) {
}
run_spec(spec);
/* at the start of the 2nd iteration, kill all but the first and last servers.
/* at the start of the 2nd iteration, kill all but the first and last
* servers.
* This should knock down the server bound to be selected next */
test_spec_reset(spec);
spec->verifier = verify_vanishing_floor_round_robin;
......@@ -764,9 +902,11 @@ int main(int argc, char **argv) {
spec->revive_at[3][i] = 1;
}
run_spec(spec);
test_spec_destroy(spec);
test_pending_calls(4);
test_ping();
grpc_shutdown();
return 0;
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment