Skip to content
Snippets Groups Projects
Commit ef33c875 authored by Craig Tiller's avatar Craig Tiller
Browse files

Merge github.com:grpc/grpc into bm_meta

parents b92d5a2f 1119a62b
No related branches found
No related tags found
No related merge requests found
......@@ -30,6 +30,5 @@
#TODO: Enable too-many-nested-blocks
#TODO: Enable super-init-not-called
#TODO: Enable no-self-use
#TODO: Enable no-member
disable=missing-docstring,too-few-public-methods,too-many-arguments,no-init,duplicate-code,invalid-name,suppressed-message,locally-disabled,protected-access,no-name-in-module,unused-argument,fixme,wrong-import-order,no-value-for-parameter,cyclic-import,unused-variable,redefined-outer-name,unused-import,too-many-instance-attributes,broad-except,too-many-locals,too-many-lines,redefined-variable-type,next-method-called,import-error,useless-else-on-loop,too-many-return-statements,too-many-nested-blocks,super-init-not-called,no-self-use,no-member
disable=missing-docstring,too-few-public-methods,too-many-arguments,no-init,duplicate-code,invalid-name,suppressed-message,locally-disabled,protected-access,no-name-in-module,unused-argument,fixme,wrong-import-order,no-value-for-parameter,cyclic-import,unused-variable,redefined-outer-name,unused-import,too-many-instance-attributes,broad-except,too-many-locals,too-many-lines,redefined-variable-type,next-method-called,import-error,useless-else-on-loop,too-many-return-statements,too-many-nested-blocks,super-init-not-called,no-self-use
......@@ -153,8 +153,6 @@ struct grpc_call {
bool destroy_called;
/** flag indicating that cancellation is inherited */
bool cancellation_is_inherited;
/** bitmask of live batches */
uint8_t used_batches;
/** which ops are in-flight */
bool sent_initial_metadata;
bool sending_message;
......@@ -1012,25 +1010,48 @@ static bool are_initial_metadata_flags_valid(uint32_t flags, bool is_client) {
return !(flags & invalid_positions);
}
static batch_control *allocate_batch_control(grpc_call *call) {
size_t i;
for (i = 0; i < MAX_CONCURRENT_BATCHES; i++) {
if ((call->used_batches & (1 << i)) == 0) {
call->used_batches = (uint8_t)(call->used_batches | (uint8_t)(1 << i));
return &call->active_batches[i];
}
static int batch_slot_for_op(grpc_op_type type) {
switch (type) {
case GRPC_OP_SEND_INITIAL_METADATA:
return 0;
case GRPC_OP_SEND_MESSAGE:
return 1;
case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
case GRPC_OP_SEND_STATUS_FROM_SERVER:
return 2;
case GRPC_OP_RECV_INITIAL_METADATA:
return 3;
case GRPC_OP_RECV_MESSAGE:
return 4;
case GRPC_OP_RECV_CLOSE_ON_SERVER:
case GRPC_OP_RECV_STATUS_ON_CLIENT:
return 5;
}
GPR_UNREACHABLE_CODE(return 123456789);
}
static batch_control *allocate_batch_control(grpc_call *call,
const grpc_op *ops,
size_t num_ops) {
int slot = batch_slot_for_op(ops[0].op);
for (size_t i = 1; i < num_ops; i++) {
int op_slot = batch_slot_for_op(ops[i].op);
slot = GPR_MIN(slot, op_slot);
}
batch_control *bctl = &call->active_batches[slot];
if (bctl->call != NULL) {
return NULL;
}
return NULL;
memset(bctl, 0, sizeof(*bctl));
bctl->call = call;
return bctl;
}
static void finish_batch_completion(grpc_exec_ctx *exec_ctx, void *user_data,
grpc_cq_completion *storage) {
batch_control *bctl = user_data;
grpc_call *call = bctl->call;
gpr_mu_lock(&call->mu);
call->used_batches = (uint8_t)(
call->used_batches & ~(uint8_t)(1 << (bctl - call->active_batches)));
gpr_mu_unlock(&call->mu);
bctl->call = NULL;
GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "completion");
}
......@@ -1113,12 +1134,8 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx,
if (bctl->is_notify_tag_closure) {
/* unrefs bctl->error */
bctl->call = NULL;
grpc_closure_run(exec_ctx, bctl->notify_tag, error);
gpr_mu_lock(&call->mu);
bctl->call->used_batches =
(uint8_t)(bctl->call->used_batches &
~(uint8_t)(1 << (bctl - bctl->call->active_batches)));
gpr_mu_unlock(&call->mu);
GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "completion");
} else {
/* unrefs bctl->error */
......@@ -1330,6 +1347,11 @@ static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp,
finish_batch_step(exec_ctx, bctl);
}
static void free_no_op_completion(grpc_exec_ctx *exec_ctx, void *p,
grpc_cq_completion *completion) {
gpr_free(completion);
}
static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
grpc_call *call, const grpc_op *ops,
size_t nops, void *notify_tag,
......@@ -1344,32 +1366,34 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
grpc_metadata compression_md;
GPR_TIMER_BEGIN("grpc_call_start_batch", 0);
GRPC_CALL_LOG_BATCH(GPR_INFO, call, ops, nops, notify_tag);
/* TODO(ctiller): this feels like it could be made lock-free */
gpr_mu_lock(&call->mu);
bctl = allocate_batch_control(call);
memset(bctl, 0, sizeof(*bctl));
bctl->call = call;
bctl->notify_tag = notify_tag;
bctl->is_notify_tag_closure = (uint8_t)(is_notify_tag_closure != 0);
grpc_transport_stream_op *stream_op = &bctl->op;
memset(stream_op, 0, sizeof(*stream_op));
stream_op->covered_by_poller = true;
if (nops == 0) {
GRPC_CALL_INTERNAL_REF(call, "completion");
if (!is_notify_tag_closure) {
grpc_cq_begin_op(call->cq, notify_tag);
grpc_cq_end_op(exec_ctx, call->cq, notify_tag, GRPC_ERROR_NONE,
free_no_op_completion, NULL,
gpr_malloc(sizeof(grpc_cq_completion)));
} else {
grpc_closure_sched(exec_ctx, notify_tag, GRPC_ERROR_NONE);
}
gpr_mu_unlock(&call->mu);
post_batch_completion(exec_ctx, bctl);
error = GRPC_CALL_OK;
goto done;
}
/* TODO(ctiller): this feels like it could be made lock-free */
bctl = allocate_batch_control(call, ops, nops);
if (bctl == NULL) {
return GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
}
bctl->notify_tag = notify_tag;
bctl->is_notify_tag_closure = (uint8_t)(is_notify_tag_closure != 0);
gpr_mu_lock(&call->mu);
grpc_transport_stream_op *stream_op = &bctl->op;
memset(stream_op, 0, sizeof(*stream_op));
stream_op->covered_by_poller = true;
/* rewrite batch ops into a transport op */
for (i = 0; i < nops; i++) {
op = &ops[i];
......
......@@ -43,7 +43,6 @@ _USER_AGENT = 'Python-gRPC-{}'.format(_grpcio_metadata.__version__)
_EMPTY_FLAGS = 0
_INFINITE_FUTURE = cygrpc.Timespec(float('+inf'))
_EMPTY_METADATA = cygrpc.Metadata(())
_UNARY_UNARY_INITIAL_DUE = (cygrpc.OperationType.send_initial_metadata,
cygrpc.OperationType.send_message,
......@@ -138,8 +137,8 @@ def _abort(state, code, details):
state.code = code
state.details = details
if state.initial_metadata is None:
state.initial_metadata = _EMPTY_METADATA
state.trailing_metadata = _EMPTY_METADATA
state.initial_metadata = _common.EMPTY_METADATA
state.trailing_metadata = _common.EMPTY_METADATA
def _handle_event(event, state, response_deserializer):
......@@ -435,7 +434,7 @@ def _start_unary_request(request, timeout, request_serializer):
deadline, deadline_timespec = _deadline(timeout)
serialized_request = _common.serialize(request, request_serializer)
if serialized_request is None:
state = _RPCState((), _EMPTY_METADATA, _EMPTY_METADATA,
state = _RPCState((), _common.EMPTY_METADATA, _common.EMPTY_METADATA,
grpc.StatusCode.INTERNAL,
'Exception serializing request!')
rendezvous = _Rendezvous(state, None, None, deadline)
......
......@@ -37,7 +37,7 @@ import six
import grpc
from grpc._cython import cygrpc
_EMPTY_METADATA = cygrpc.Metadata(())
EMPTY_METADATA = cygrpc.Metadata(())
CYGRPC_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY = {
cygrpc.ConnectivityState.idle:
......@@ -107,7 +107,7 @@ def channel_args(options):
def cygrpc_metadata(application_metadata):
return _EMPTY_METADATA if application_metadata is None else cygrpc.Metadata(
return EMPTY_METADATA if application_metadata is None else cygrpc.Metadata(
cygrpc.Metadatum(encode(key), encode(value))
for key, value in application_metadata)
......
......@@ -57,7 +57,6 @@ _CLOSED = 'closed'
_CANCELLED = 'cancelled'
_EMPTY_FLAGS = 0
_EMPTY_METADATA = cygrpc.Metadata(())
_UNEXPECTED_EXIT_SERVER_GRACE = 1.0
......@@ -143,7 +142,7 @@ def _abort(state, call, code, details):
effective_details = details if state.details is None else state.details
if state.initial_metadata_allowed:
operations = (cygrpc.operation_send_initial_metadata(
_EMPTY_METADATA, _EMPTY_FLAGS),
_common.EMPTY_METADATA, _EMPTY_FLAGS),
cygrpc.operation_send_status_from_server(
_common.cygrpc_metadata(state.trailing_metadata),
effective_code, effective_details, _EMPTY_FLAGS),)
......@@ -416,7 +415,7 @@ def _send_response(rpc_event, state, serialized_response):
else:
if state.initial_metadata_allowed:
operations = (cygrpc.operation_send_initial_metadata(
_EMPTY_METADATA, _EMPTY_FLAGS),
_common.EMPTY_METADATA, _EMPTY_FLAGS),
cygrpc.operation_send_message(serialized_response,
_EMPTY_FLAGS),)
state.initial_metadata_allowed = False
......@@ -446,8 +445,8 @@ def _status(rpc_event, state, serialized_response):
]
if state.initial_metadata_allowed:
operations.append(
cygrpc.operation_send_initial_metadata(_EMPTY_METADATA,
_EMPTY_FLAGS))
cygrpc.operation_send_initial_metadata(
_common.EMPTY_METADATA, _EMPTY_FLAGS))
if serialized_response is not None:
operations.append(
cygrpc.operation_send_message(serialized_response,
......@@ -549,12 +548,12 @@ def _find_method_handler(rpc_event, generic_handlers):
def _handle_unrecognized_method(rpc_event):
operations = (
cygrpc.operation_send_initial_metadata(_EMPTY_METADATA, _EMPTY_FLAGS),
cygrpc.operation_receive_close_on_server(_EMPTY_FLAGS),
cygrpc.operation_send_status_from_server(
_EMPTY_METADATA, cygrpc.StatusCode.unimplemented,
b'Method not found!', _EMPTY_FLAGS),)
operations = (cygrpc.operation_send_initial_metadata(_common.EMPTY_METADATA,
_EMPTY_FLAGS),
cygrpc.operation_receive_close_on_server(_EMPTY_FLAGS),
cygrpc.operation_send_status_from_server(
_common.EMPTY_METADATA, cygrpc.StatusCode.unimplemented,
b'Method not found!', _EMPTY_FLAGS),)
rpc_state = _RPCState()
rpc_event.operation_call.start_server_batch(
operations, lambda ignored_event: (rpc_state, (),))
......
File added
......@@ -79725,6 +79725,28 @@
],
"uses_polling": false
},
{
"args": [
"test/core/end2end/fuzzers/api_fuzzer_corpus/clusterfuzz-testcase-5171071900712960"
],
"ci_platforms": [
"linux"
],
"cpu_cost": 0.1,
"exclude_configs": [
"tsan"
],
"exclude_iomgrs": [
"uv"
],
"flaky": false,
"language": "c",
"name": "api_fuzzer_one_entry",
"platforms": [
"linux"
],
"uses_polling": false
},
{
"args": [
"test/core/end2end/fuzzers/api_fuzzer_corpus/crash-0597bbdd657fa4ed14443994c9147a1a7bbc205f"
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment