Skip to content
Snippets Groups Projects
Commit 1bc34473 authored by Yuchen Zeng's avatar Yuchen Zeng
Browse files

Add max age enforcement for server channels

parent ac4a7283
No related branches found
No related tags found
No related merge requests found
...@@ -163,6 +163,10 @@ typedef struct { ...@@ -163,6 +163,10 @@ typedef struct {
/** Maximum message length that the channel can send. Int valued, bytes. /** Maximum message length that the channel can send. Int valued, bytes.
-1 means unlimited. */ -1 means unlimited. */
#define GRPC_ARG_MAX_SEND_MESSAGE_LENGTH "grpc.max_send_message_length" #define GRPC_ARG_MAX_SEND_MESSAGE_LENGTH "grpc.max_send_message_length"
#define GPRC_ARG_MAX_CONNECION_AGE_S "grpc.max_connection_age"
#define GPRC_ARG_MAX_CONNECION_AGE_GRACE_S "grpc.max_connection_age_grace"
/** Initial sequence number for http2 transports. Int valued. */ /** Initial sequence number for http2 transports. Int valued. */
#define GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER \ #define GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER \
"grpc.http2.initial_sequence_number" "grpc.http2.initial_sequence_number"
......
...@@ -45,6 +45,7 @@ ...@@ -45,6 +45,7 @@
#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/connected_channel.h" #include "src/core/lib/channel/connected_channel.h"
#include "src/core/lib/iomgr/iomgr.h" #include "src/core/lib/iomgr/iomgr.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/support/stack_lockfree.h" #include "src/core/lib/support/stack_lockfree.h"
#include "src/core/lib/support/string.h" #include "src/core/lib/support/string.h"
...@@ -53,9 +54,12 @@ ...@@ -53,9 +54,12 @@
#include "src/core/lib/surface/channel.h" #include "src/core/lib/surface/channel.h"
#include "src/core/lib/surface/completion_queue.h" #include "src/core/lib/surface/completion_queue.h"
#include "src/core/lib/surface/init.h" #include "src/core/lib/surface/init.h"
#include "src/core/lib/transport/http2_errors.h"
#include "src/core/lib/transport/metadata.h" #include "src/core/lib/transport/metadata.h"
#include "src/core/lib/transport/static_metadata.h" #include "src/core/lib/transport/static_metadata.h"
#define DEFAULT_MAX_CONNECTION_AGE_S INT_MAX
typedef struct listener { typedef struct listener {
void *arg; void *arg;
void (*start)(grpc_exec_ctx *exec_ctx, grpc_server *server, void *arg, void (*start)(grpc_exec_ctx *exec_ctx, grpc_server *server, void *arg,
...@@ -116,6 +120,9 @@ struct channel_data { ...@@ -116,6 +120,9 @@ struct channel_data {
uint32_t registered_method_max_probes; uint32_t registered_method_max_probes;
grpc_closure finish_destroy_channel_closure; grpc_closure finish_destroy_channel_closure;
grpc_closure channel_connectivity_changed; grpc_closure channel_connectivity_changed;
grpc_timer max_age_timer;
gpr_timespec max_connection_age;
grpc_closure close_max_age_channel;
}; };
typedef struct shutdown_tag { typedef struct shutdown_tag {
...@@ -381,6 +388,7 @@ static void request_matcher_kill_requests(grpc_exec_ctx *exec_ctx, ...@@ -381,6 +388,7 @@ static void request_matcher_kill_requests(grpc_exec_ctx *exec_ctx,
static void server_ref(grpc_server *server) { static void server_ref(grpc_server *server) {
gpr_ref(&server->internal_refcount); gpr_ref(&server->internal_refcount);
gpr_log(GPR_DEBUG, "server_ref");
} }
static void server_delete(grpc_exec_ctx *exec_ctx, grpc_server *server) { static void server_delete(grpc_exec_ctx *exec_ctx, grpc_server *server) {
...@@ -442,9 +450,11 @@ static void finish_destroy_channel(grpc_exec_ctx *exec_ctx, void *cd, ...@@ -442,9 +450,11 @@ static void finish_destroy_channel(grpc_exec_ctx *exec_ctx, void *cd,
static void destroy_channel(grpc_exec_ctx *exec_ctx, channel_data *chand, static void destroy_channel(grpc_exec_ctx *exec_ctx, channel_data *chand,
grpc_error *error) { grpc_error *error) {
gpr_log(GPR_DEBUG, "destroy_channel");
if (is_channel_orphaned(chand)) return; if (is_channel_orphaned(chand)) return;
GPR_ASSERT(chand->server != NULL); GPR_ASSERT(chand->server != NULL);
orphan_channel(chand); orphan_channel(chand);
grpc_timer_cancel(exec_ctx, &chand->max_age_timer);
server_ref(chand->server); server_ref(chand->server);
maybe_finish_shutdown(exec_ctx, chand->server); maybe_finish_shutdown(exec_ctx, chand->server);
grpc_closure_init(&chand->finish_destroy_channel_closure, grpc_closure_init(&chand->finish_destroy_channel_closure,
...@@ -831,6 +841,7 @@ static void got_initial_metadata(grpc_exec_ctx *exec_ctx, void *ptr, ...@@ -831,6 +841,7 @@ static void got_initial_metadata(grpc_exec_ctx *exec_ctx, void *ptr,
static void accept_stream(grpc_exec_ctx *exec_ctx, void *cd, static void accept_stream(grpc_exec_ctx *exec_ctx, void *cd,
grpc_transport *transport, grpc_transport *transport,
const void *transport_server_data) { const void *transport_server_data) {
gpr_log(GPR_DEBUG, "accept_stream");
channel_data *chand = cd; channel_data *chand = cd;
/* create a call */ /* create a call */
grpc_call_create_args args; grpc_call_create_args args;
...@@ -882,6 +893,7 @@ static void channel_connectivity_changed(grpc_exec_ctx *exec_ctx, void *cd, ...@@ -882,6 +893,7 @@ static void channel_connectivity_changed(grpc_exec_ctx *exec_ctx, void *cd,
static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem, grpc_call_element *elem,
const grpc_call_element_args *args) { const grpc_call_element_args *args) {
gpr_log(GPR_DEBUG, "init_call_elem");
call_data *calld = elem->call_data; call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data; channel_data *chand = elem->channel_data;
memset(calld, 0, sizeof(call_data)); memset(calld, 0, sizeof(call_data));
...@@ -903,6 +915,7 @@ static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, ...@@ -903,6 +915,7 @@ static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
channel_data *chand = elem->channel_data; channel_data *chand = elem->channel_data;
call_data *calld = elem->call_data; call_data *calld = elem->call_data;
gpr_log(GPR_DEBUG, "destroy_call_elem");
GPR_ASSERT(calld->state != PENDING); GPR_ASSERT(calld->state != PENDING);
if (calld->host_set) { if (calld->host_set) {
...@@ -918,6 +931,23 @@ static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, ...@@ -918,6 +931,23 @@ static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
server_unref(exec_ctx, chand->server); server_unref(exec_ctx, chand->server);
} }
static void close_max_age_channel(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
channel_data *chand = arg;
if (error == GRPC_ERROR_NONE) {
grpc_transport_op *op = grpc_make_transport_op(NULL);
op->goaway_error =
grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING("max_age"),
GRPC_ERROR_INT_HTTP2_ERROR, GRPC_HTTP2_NO_ERROR);
grpc_channel_element *elem = grpc_channel_stack_element(
grpc_channel_get_channel_stack(chand->channel), 0);
elem->filter->start_transport_op(exec_ctx, elem, op);
} else if (error != GRPC_ERROR_CANCELLED) {
GRPC_LOG_IF_ERROR("close_max_age_channel", error);
}
GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, chand->channel, "max age");
}
static grpc_error *init_channel_elem(grpc_exec_ctx *exec_ctx, static grpc_error *init_channel_elem(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem, grpc_channel_element *elem,
grpc_channel_element_args *args) { grpc_channel_element_args *args) {
...@@ -929,6 +959,28 @@ static grpc_error *init_channel_elem(grpc_exec_ctx *exec_ctx, ...@@ -929,6 +959,28 @@ static grpc_error *init_channel_elem(grpc_exec_ctx *exec_ctx,
chand->next = chand->prev = chand; chand->next = chand->prev = chand;
chand->registered_methods = NULL; chand->registered_methods = NULL;
chand->connectivity_state = GRPC_CHANNEL_IDLE; chand->connectivity_state = GRPC_CHANNEL_IDLE;
chand->max_connection_age =
DEFAULT_MAX_CONNECTION_AGE_S == INT_MAX
? gpr_inf_future(GPR_TIMESPAN)
: gpr_time_from_seconds(DEFAULT_MAX_CONNECTION_AGE_S, GPR_TIMESPAN);
grpc_closure_init(&chand->close_max_age_channel, close_max_age_channel, chand,
grpc_schedule_on_exec_ctx);
const grpc_channel_args *channel_args = args->channel_args;
if (channel_args) {
size_t i;
for (i = 0; i < channel_args->num_args; i++) {
if (0 ==
strcmp(channel_args->args[i].key, GPRC_ARG_MAX_CONNECION_AGE_S)) {
const int value = grpc_channel_arg_get_integer(
&channel_args->args[i],
(grpc_integer_options){DEFAULT_MAX_CONNECTION_AGE_S, 1, INT_MAX});
chand->max_connection_age =
value == INT_MAX ? gpr_inf_future(GPR_TIMESPAN)
: gpr_time_from_seconds(value, GPR_TIMESPAN);
}
}
}
grpc_closure_init(&chand->channel_connectivity_changed, grpc_closure_init(&chand->channel_connectivity_changed,
channel_connectivity_changed, chand, channel_connectivity_changed, chand,
grpc_schedule_on_exec_ctx); grpc_schedule_on_exec_ctx);
...@@ -1132,6 +1184,7 @@ void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *s, ...@@ -1132,6 +1184,7 @@ void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *s,
grpc_transport *transport, grpc_transport *transport,
grpc_pollset *accepting_pollset, grpc_pollset *accepting_pollset,
const grpc_channel_args *args) { const grpc_channel_args *args) {
gpr_log(GPR_DEBUG, "grpc_server_setup_transport");
size_t num_registered_methods; size_t num_registered_methods;
size_t alloc; size_t alloc;
registered_method *rm; registered_method *rm;
...@@ -1152,6 +1205,11 @@ void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *s, ...@@ -1152,6 +1205,11 @@ void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *s,
chand->server = s; chand->server = s;
server_ref(s); server_ref(s);
chand->channel = channel; chand->channel = channel;
GRPC_CHANNEL_INTERNAL_REF(channel, "max age");
grpc_timer_init(
exec_ctx, &chand->max_age_timer,
gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), chand->max_connection_age),
&chand->close_max_age_channel, gpr_now(GPR_CLOCK_MONOTONIC));
size_t cq_idx; size_t cq_idx;
grpc_completion_queue *accepting_cq = grpc_cq_from_pollset(accepting_pollset); grpc_completion_queue *accepting_cq = grpc_cq_from_pollset(accepting_pollset);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment