diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c index 0fa79870760a867ecb88b5a86c062f8894e2a88d..ea131dbc04330d8e515832547aaae61faf264441 100644 --- a/src/core/ext/transport/cronet/transport/cronet_transport.c +++ b/src/core/ext/transport/cronet/transport/cronet_transport.c @@ -46,6 +46,7 @@ #include "src/core/lib/support/string.h" #include "src/core/lib/surface/channel.h" #include "src/core/lib/transport/metadata_batch.h" +#include "src/core/lib/transport/static_metadata.h" #include "src/core/lib/transport/transport_impl.h" #include "third_party/objective_c/Cronet/cronet_c_for_grpc.h" @@ -59,18 +60,13 @@ /* TODO (makdharma): Hook up into the wider tracing mechanism */ int grpc_cronet_trace = 0; -enum OP_RESULT { +enum e_op_result { ACTION_TAKEN_WITH_CALLBACK, ACTION_TAKEN_NO_CALLBACK, NO_ACTION_POSSIBLE }; -/* Used for printing debug */ -const char *op_result_string[] = {"ACTION_TAKEN_WITH_CALLBACK", - "ACTION_TAKEN_NO_CALLBACK", - "NO_ACTION_POSSIBLE"}; - -enum OP_ID { +enum e_op_id { OP_SEND_INITIAL_METADATA = 0, OP_SEND_MESSAGE, OP_SEND_TRAILING_METADATA, @@ -87,22 +83,7 @@ enum OP_ID { OP_NUM_OPS }; -const char *op_id_string[] = {"OP_SEND_INITIAL_METADATA", - "OP_SEND_MESSAGE", - "OP_SEND_TRAILING_METADATA", - "OP_RECV_MESSAGE", - "OP_RECV_INITIAL_METADATA", - "OP_RECV_TRAILING_METADATA", - "OP_CANCEL_ERROR", - "OP_ON_COMPLETE", - "OP_FAILED", - "OP_SUCCEEDED", - "OP_CANCELED", - "OP_RECV_MESSAGE_AND_ON_COMPLETE", - "OP_READ_REQ_MADE", - "OP_NUM_OPS"}; - -/* Cronet callbacks */ +/* Cronet callbacks. See cronet_c_for_grpc.h for documentation for each. */ static void on_request_headers_sent(cronet_bidirectional_stream *); static void on_response_headers_received( @@ -134,6 +115,8 @@ struct grpc_cronet_transport { }; typedef struct grpc_cronet_transport grpc_cronet_transport; +/* TODO (makdharma): reorder structure for memory efficiency per + http://www.catb.org/esr/structure-packing/#_structure_reordering: */ struct read_state { /* vars to store data coming from server */ char *read_buffer; @@ -204,14 +187,61 @@ struct stream_obj { }; typedef struct stream_obj stream_obj; -static enum OP_RESULT execute_stream_op(grpc_exec_ctx *exec_ctx, - struct op_and_state *oas); +static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, + struct op_and_state *oas); + +/* + Utility function to translate enum into string for printing +*/ +static const char *op_result_string(enum e_op_result i) { + switch (i) { + case ACTION_TAKEN_WITH_CALLBACK: + return "ACTION_TAKEN_WITH_CALLBACK"; + case ACTION_TAKEN_NO_CALLBACK: + return "ACTION_TAKEN_NO_CALLBACK"; + case NO_ACTION_POSSIBLE: + return "NO_ACTION_POSSIBLE"; + } + GPR_UNREACHABLE_CODE(return "UNKNOWN"); +} + +static const char *op_id_string(enum e_op_id i) { + switch (i) { + case OP_SEND_INITIAL_METADATA: + return "OP_SEND_INITIAL_METADATA"; + case OP_SEND_MESSAGE: + return "OP_SEND_MESSAGE"; + case OP_SEND_TRAILING_METADATA: + return "OP_SEND_TRAILING_METADATA"; + case OP_RECV_MESSAGE: + return "OP_RECV_MESSAGE"; + case OP_RECV_INITIAL_METADATA: + return "OP_RECV_INITIAL_METADATA"; + case OP_RECV_TRAILING_METADATA: + return "OP_RECV_TRAILING_METADATA"; + case OP_CANCEL_ERROR: + return "OP_CANCEL_ERROR"; + case OP_ON_COMPLETE: + return "OP_ON_COMPLETE"; + case OP_FAILED: + return "OP_FAILED"; + case OP_SUCCEEDED: + return "OP_SUCCEEDED"; + case OP_CANCELED: + return "OP_CANCELED"; + case OP_RECV_MESSAGE_AND_ON_COMPLETE: + return "OP_RECV_MESSAGE_AND_ON_COMPLETE"; + case OP_READ_REQ_MADE: + return "OP_READ_REQ_MADE"; + case OP_NUM_OPS: + return "OP_NUM_OPS"; + } +} /* Add a new stream op to op storage. */ static void add_to_storage(struct stream_obj *s, grpc_transport_stream_op *op) { - gpr_mu_lock(&s->mu); struct op_storage *storage = &s->storage; /* add new op at the beginning of the linked list. The memory is freed in remove_from_storage */ @@ -220,6 +250,7 @@ static void add_to_storage(struct stream_obj *s, grpc_transport_stream_op *op) { memset(&new_op->state, 0, sizeof(new_op->state)); new_op->s = s; new_op->done = false; + gpr_mu_lock(&s->mu); new_op->next = storage->head; storage->head = new_op; storage->num_pending_ops++; @@ -271,9 +302,9 @@ static void execute_from_storage(stream_obj *s) { for (struct op_and_state *curr = s->storage.head; curr != NULL;) { CRONET_LOG(GPR_DEBUG, "calling op at %p. done = %d", curr, curr->done); GPR_ASSERT(curr->done == 0); - enum OP_RESULT result = execute_stream_op(&exec_ctx, curr); + enum e_op_result result = execute_stream_op(&exec_ctx, curr); CRONET_LOG(GPR_DEBUG, "execute_stream_op[%p] returns %s", curr, - op_result_string[result]); + op_result_string(result)); /* if this op is done, then remove it and free memory */ if (curr->done) { struct op_and_state *next = curr->next; @@ -372,8 +403,7 @@ static void on_response_headers_received( memset(&s->state.rs.initial_metadata, 0, sizeof(s->state.rs.initial_metadata)); grpc_chttp2_incoming_metadata_buffer_init(&s->state.rs.initial_metadata); - unsigned int i = 0; - for (i = 0; i < headers->count; i++) { + for (size_t i = 0; i < headers->count; i++) { grpc_chttp2_incoming_metadata_buffer_add( &s->state.rs.initial_metadata, grpc_mdelem_from_metadata_strings( @@ -439,8 +469,7 @@ static void on_response_trailers_received( sizeof(s->state.rs.trailing_metadata)); s->state.rs.trailing_metadata_valid = false; grpc_chttp2_incoming_metadata_buffer_init(&s->state.rs.trailing_metadata); - unsigned int i = 0; - for (i = 0; i < trailers->count; i++) { + for (size_t i = 0; i < trailers->count; i++) { CRONET_LOG(GPR_DEBUG, "trailer key=%s, value=%s", trailers->headers[i].key, trailers->headers[i].value); grpc_chttp2_incoming_metadata_buffer_add( @@ -460,10 +489,10 @@ static void on_response_trailers_received( */ static void create_grpc_frame(gpr_slice_buffer *write_slice_buffer, char **pp_write_buffer, - int *p_write_buffer_size) { + size_t *p_write_buffer_size) { gpr_slice slice = gpr_slice_buffer_take_first(write_slice_buffer); size_t length = GPR_SLICE_LENGTH(slice); - *p_write_buffer_size = (int)length + GRPC_HEADER_SIZE_IN_BYTES; + *p_write_buffer_size = length + GRPC_HEADER_SIZE_IN_BYTES; /* This is freed in the on_write_completed callback */ char *write_buffer = gpr_malloc(length + GRPC_HEADER_SIZE_IN_BYTES); *pp_write_buffer = write_buffer; @@ -500,7 +529,8 @@ static void convert_metadata_to_cronet_headers( /* Walk the linked list again, this time copying the header fields. s->num_headers can be less than num_headers_available, as some headers - are not used for cronet + are not used for cronet. + TODO (makdharma): Eliminate need to traverse the LL second time for perf. */ curr = head; int num_headers = 0; @@ -509,12 +539,12 @@ static void convert_metadata_to_cronet_headers( curr = curr->next; const char *key = grpc_mdstr_as_c_string(mdelem->key); const char *value = grpc_mdstr_as_c_string(mdelem->value); - if (strcmp(key, ":scheme") == 0 || strcmp(key, ":method") == 0 || - strcmp(key, ":authority") == 0) { + if (mdelem->key == GRPC_MDSTR_METHOD || mdelem->key == GRPC_MDSTR_SCHEME || + mdelem->key == GRPC_MDSTR_AUTHORITY) { /* Cronet populates these fields on its own */ continue; } - if (strcmp(key, ":path") == 0) { + if (mdelem->key == GRPC_MDSTR_PATH) { /* Create URL by appending :path value to the hostname */ gpr_asprintf(pp_url, "https://%s%s", host, value); continue; @@ -546,13 +576,14 @@ static int parse_grpc_header(const uint8_t *data) { */ static bool op_can_be_run(grpc_transport_stream_op *curr_op, struct op_state *stream_state, - struct op_state *op_state, enum OP_ID op_id) { + struct op_state *op_state, enum e_op_id op_id) { bool result = true; /* When call is canceled, every op can be run, except under following conditions */ - if (stream_state->state_op_done[OP_CANCEL_ERROR] || - stream_state->state_callback_received[OP_FAILED]) { + bool is_canceled_of_failed = stream_state->state_op_done[OP_CANCEL_ERROR] || + stream_state->state_callback_received[OP_FAILED]; + if (is_canceled_of_failed) { if (op_id == OP_SEND_INITIAL_METADATA) result = false; if (op_id == OP_SEND_MESSAGE) result = false; if (op_id == OP_SEND_TRAILING_METADATA) result = false; @@ -678,17 +709,20 @@ static bool op_can_be_run(grpc_transport_stream_op *curr_op, !stream_state->state_callback_received[OP_SEND_MESSAGE]) result = false; } - CRONET_LOG(GPR_DEBUG, "op_can_be_run %s : %s", op_id_string[op_id], + CRONET_LOG(GPR_DEBUG, "op_can_be_run %s : %s", op_id_string(op_id), result ? "YES" : "NO"); return result; } -static enum OP_RESULT execute_stream_op(grpc_exec_ctx *exec_ctx, - struct op_and_state *oas) { +/* + TODO (makdharma): Break down this function in smaller chunks for readability. +*/ +static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, + struct op_and_state *oas) { grpc_transport_stream_op *stream_op = &oas->op; struct stream_obj *s = oas->s; struct op_state *stream_state = &s->state; - enum OP_RESULT result = NO_ACTION_POSSIBLE; + enum e_op_result result = NO_ACTION_POSSIBLE; if (stream_op->send_initial_metadata && op_can_be_run(stream_op, stream_state, &oas->state, OP_SEND_INITIAL_METADATA)) { @@ -743,19 +777,21 @@ static enum OP_RESULT execute_stream_op(grpc_exec_ctx *exec_ctx, grpc_byte_stream_next(NULL, stream_op->send_message, &slice, stream_op->send_message->length, NULL); /* Check that compression flag is OFF. We don't support compression yet. */ + gpr_log(GPR_ERROR, "Compression is not supported"); GPR_ASSERT(stream_op->send_message->flags == 0); gpr_slice_buffer_add(&write_slice_buffer, slice); + gpr_log(GPR_ERROR, "Empty request is not supported"); GPR_ASSERT(write_slice_buffer.count == 1); /* Empty request not handled yet */ if (write_slice_buffer.count > 0) { - int write_buffer_size; + size_t write_buffer_size; create_grpc_frame(&write_slice_buffer, &stream_state->ws.write_buffer, &write_buffer_size); CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_write (%p, %p)", s->cbs, stream_state->ws.write_buffer); stream_state->state_callback_received[OP_SEND_MESSAGE] = false; cronet_bidirectional_stream_write(s->cbs, stream_state->ws.write_buffer, - write_buffer_size, false); + (int)write_buffer_size, false); result = ACTION_TAKEN_WITH_CALLBACK; } stream_state->state_op_done[OP_SEND_MESSAGE] = true;