From ff47bc0daf9c6081506e4ec462208cc990c7cde1 Mon Sep 17 00:00:00 2001 From: Makarand Dharmapurikar <makarandd@google.com> Date: Fri, 29 Jul 2016 16:55:35 -0700 Subject: [PATCH] rewrite the core logic to work with end to end tests. --- .../cronet/transport/cronet_transport.c | 992 +++++++----------- 1 file changed, 380 insertions(+), 612 deletions(-) diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c index 0a079927ea..694d346fc3 100644 --- a/src/core/ext/transport/cronet/transport/cronet_transport.c +++ b/src/core/ext/transport/cronet/transport/cronet_transport.c @@ -51,8 +51,41 @@ #define GRPC_HEADER_SIZE_IN_BYTES 5 -// Global flag that gets set with GRPC_TRACE env variable -int grpc_cronet_trace = 1; +enum OP_ID { + OP_SEND_INITIAL_METADATA = 0, + OP_SEND_MESSAGE, + OP_SEND_TRAILING_METADATA, + OP_RECV_MESSAGE, + OP_RECV_INITIAL_METADATA, + OP_RECV_TRAILING_METADATA, + OP_CANCEL_ERROR, + OP_ON_COMPLETE, + OP_NUM_OPS +}; + +/* Cronet callbacks */ + +static void on_request_headers_sent(cronet_bidirectional_stream *); +static void on_response_headers_received(cronet_bidirectional_stream *, + const cronet_bidirectional_stream_header_array *, + const char *); +static void on_write_completed(cronet_bidirectional_stream *, const char *); +static void on_read_completed(cronet_bidirectional_stream *, char *, int); +static void on_response_trailers_received(cronet_bidirectional_stream *, + const cronet_bidirectional_stream_header_array *); +static void on_succeeded(cronet_bidirectional_stream *); +static void on_failed(cronet_bidirectional_stream *, int); +//static void on_canceled(cronet_bidirectional_stream *); +static cronet_bidirectional_stream_callback cronet_callbacks = { + on_request_headers_sent, + on_response_headers_received, + on_read_completed, + on_write_completed, + on_response_trailers_received, + on_succeeded, + on_failed, + NULL //on_canceled +}; // Cronet transport object struct grpc_cronet_transport { @@ -60,428 +93,198 @@ struct grpc_cronet_transport { cronet_engine *engine; char *host; }; - typedef struct grpc_cronet_transport grpc_cronet_transport; -enum send_state { - CRONET_SEND_IDLE = 0, - CRONET_SEND_HEADER, - CRONET_WRITE_PENDING, - CRONET_WRITE_COMPLETED, - CRONET_WAIT_FOR_CANCEL, - CRONET_STREAM_CLOSED, -}; +struct read_state { + // vars to store data coming from cronet + char *read_buffer; + bool length_field_received; + int received_bytes; + int remaining_bytes; + int length_field; + char grpc_header_bytes[GRPC_HEADER_SIZE_IN_BYTES]; + char *payload_field; + + // vars for holding data destined for the application + struct grpc_slice_buffer_stream sbs; + gpr_slice_buffer read_slice_buffer; -enum recv_state { - CRONET_RECV_IDLE = 0, - CRONET_RECV_READ_LENGTH, - CRONET_RECV_READ_DATA, - CRONET_RECV_CLOSED, -}; + // vars for trailing metadata + grpc_chttp2_incoming_metadata_buffer trailing_metadata; + bool trailing_metadata_valid; -static const char *recv_state_name[] = { - "CRONET_RECV_IDLE", "CRONET_RECV_READ_LENGTH", "CRONET_RECV_READ_DATA,", - "CRONET_RECV_CLOSED"}; + // vars for initial metadata + grpc_chttp2_incoming_metadata_buffer initial_metadata; +}; -// Enum that identifies calling function. -enum e_caller { - PERFORM_STREAM_OP, - ON_READ_COMPLETE, - ON_RESPONSE_HEADERS_RECEIVED, - ON_RESPONSE_TRAILERS_RECEIVED +struct write_state { + char *write_buffer; }; -enum callback_id { - OP_SEND_INITIAL_METADATA = 0, - OP_SEND_MESSAGE, - OP_SEND_TRAILING_METADATA, - OP_RECV_MESSAGE, - OP_RECV_INITIAL_METADATA, - OP_RECV_TRAILING_METADATA, - OP_CANCEL_ERROR, - OP_NUM_CALLBACKS +#define MAX_PENDING_OPS 10 +struct op_storage { + grpc_transport_stream_op pending_ops[MAX_PENDING_OPS]; + int wrptr; + int rdptr; + int num_pending_ops; }; struct stream_obj { - // we store received bytes here as they trickle in. - gpr_slice_buffer write_slice_buffer; + grpc_transport_stream_op *curr_op; + grpc_cronet_transport curr_ct; + grpc_stream *curr_gs; cronet_bidirectional_stream *cbs; - gpr_slice slice; - gpr_slice_buffer read_slice_buffer; - struct grpc_slice_buffer_stream sbs; - char *read_buffer; - int remaining_read_bytes; - int total_read_bytes; - - char *write_buffer; - size_t write_buffer_size; - // Hold the URL - char *url; + // TODO (makdharma) : make a sub structure for tracking state + bool state_op_done[OP_NUM_OPS]; + bool state_callback_received[OP_NUM_OPS]; - // One bit per operation - bool op_requested[OP_NUM_CALLBACKS]; - bool op_done[OP_NUM_CALLBACKS]; - // Set to true when server indicates no more data will be sent - bool read_closed; - - // Recv message stuff - grpc_byte_buffer **recv_message; - // Initial metadata stuff - grpc_metadata_batch *recv_initial_metadata; - grpc_chttp2_incoming_metadata_buffer initial_metadata; - // Trailing metadata stuff - grpc_metadata_batch *recv_trailing_metadata; - grpc_chttp2_incoming_metadata_buffer imb; - bool imb_valid; // true if there are any valid entries in imb. - - // This mutex protects receive state machine execution - gpr_mu recv_mu; - - // Callbacks to be called when operations complete - grpc_closure *cb_recv_initial_metadata_ready; - grpc_closure *cb_recv_message_ready; - grpc_closure *on_complete; - - // storage for header - cronet_bidirectional_stream_header *headers; - uint32_t num_headers; - cronet_bidirectional_stream_header_array header_array; - // state tracking - enum recv_state cronet_recv_state; - enum send_state cronet_send_state; + // Read state + struct read_state rs; + // Write state + struct write_state ws; + // OP storage + struct op_storage storage; }; - typedef struct stream_obj stream_obj; -static void next_send_step(stream_obj *s); -static void next_recv_step(stream_obj *s, enum e_caller caller); +/* Globals */ +cronet_bidirectional_stream_header_array header_array; -static void set_pollset_do_nothing(grpc_exec_ctx *exec_ctx, grpc_transport *gt, - grpc_stream *gs, grpc_pollset *pollset) {} - -static void set_pollset_set_do_nothing(grpc_exec_ctx *exec_ctx, - grpc_transport *gt, grpc_stream *gs, - grpc_pollset_set *pollset_set) {} - -// Client creates a bunch of operations and invokes "call_start_batch" -// call_start_batch creates a stream_op structure. this structure has info -// needed for executing all the ops. It has on_complete callback that needs -// to be called when all ops are executed. This function keeps track of all -// outstanding operations. It returns true if all operations that were part of -// the stream_op have been completed. -static bool is_op_complete(stream_obj *s) { - int i; - // Check if any requested op is pending - for (i = 0; i < OP_NUM_CALLBACKS; i++) { - if (s->op_requested[i] && !s->op_done[i]) { - gpr_log(GPR_DEBUG, "is_op_complete is FALSE because of %d", i); - return false; - } - } - // Clear the requested/done bits and return true - for (i = 0; i < OP_NUM_CALLBACKS; i++) { - s->op_requested[i] = s->op_done[i] = false; - } - return true; -} - -static void enqueue_callback(grpc_closure *callback) { - GPR_ASSERT(callback); - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - grpc_exec_ctx_sched(&exec_ctx, callback, GRPC_ERROR_NONE, NULL); - grpc_exec_ctx_finish(&exec_ctx); +// +static void execute_curr_stream_op(stream_obj *s); + +/************************************************************* + Op Storage +*/ + +static void add_pending_op(struct op_storage *storage, grpc_transport_stream_op *op) { + GPR_ASSERT(storage->num_pending_ops < MAX_PENDING_OPS); + storage->num_pending_ops++; + gpr_log(GPR_DEBUG, "adding new op @wrptr=%d. %d in the queue.", + storage->wrptr, storage->num_pending_ops); + memcpy(&storage->pending_ops[storage->wrptr], op, sizeof(grpc_transport_stream_op)); + storage->wrptr = (storage->wrptr + 1) % MAX_PENDING_OPS; } -static void on_canceled(cronet_bidirectional_stream *stream) { - if (grpc_cronet_trace) { - gpr_log(GPR_DEBUG, "on_canceled(%p)", stream); - } - stream_obj *s = (stream_obj *)stream->annotation; - s->op_done[OP_CANCEL_ERROR] = true; - - // Terminate any read callback - if (s->cb_recv_message_ready) { - enqueue_callback(s->cb_recv_message_ready); - s->cb_recv_message_ready = 0; - s->op_done[OP_RECV_MESSAGE] = true; - } - // Don't wait to get any trailing metadata - s->op_done[OP_RECV_TRAILING_METADATA] = true; - - next_send_step(s); +static grpc_transport_stream_op *pop_pending_op(struct op_storage *storage) { + if (storage->num_pending_ops == 0) return NULL; + grpc_transport_stream_op *result = &storage->pending_ops[storage->rdptr]; + storage->rdptr = (storage->rdptr + 1) % MAX_PENDING_OPS; + storage->num_pending_ops--; + gpr_log(GPR_DEBUG, "popping op @rdptr=%d. %d more left in queue", + storage->rdptr, storage->num_pending_ops); + return result; } +/************************************************************* +Cronet Callback Ipmlementation +*/ static void on_failed(cronet_bidirectional_stream *stream, int net_error) { - if (grpc_cronet_trace) { - gpr_log(GPR_DEBUG, "on_failed(%p, %d)", stream, net_error); - } + gpr_log(GPR_DEBUG, "on_failed(%p, %d)", stream, net_error); } static void on_succeeded(cronet_bidirectional_stream *stream) { - if (grpc_cronet_trace) { - gpr_log(GPR_DEBUG, "on_succeeded(%p)", stream); - } -} - -static void on_response_trailers_received( - cronet_bidirectional_stream *stream, - const cronet_bidirectional_stream_header_array *trailers) { - if (grpc_cronet_trace) { - gpr_log(GPR_DEBUG, "R: on_response_trailers_received(%p,%p)", stream, - trailers); - } - stream_obj *s = (stream_obj *)stream->annotation; - - memset(&s->imb, 0, sizeof(s->imb)); - s->imb_valid = false; - grpc_chttp2_incoming_metadata_buffer_init(&s->imb); - unsigned int i = 0; - for (i = 0; i < trailers->count; i++) { - if (grpc_cronet_trace) { - gpr_log(GPR_DEBUG, "trailer key=%s, value=%s", trailers->headers[i].key, - trailers->headers[i].value); - } - - grpc_chttp2_incoming_metadata_buffer_add( - &s->imb, grpc_mdelem_from_metadata_strings( - grpc_mdstr_from_string(trailers->headers[i].key), - grpc_mdstr_from_string(trailers->headers[i].value))); - s->imb_valid = true; - } - s->op_done[OP_RECV_TRAILING_METADATA] = true; - next_recv_step(s, ON_RESPONSE_TRAILERS_RECEIVED); + gpr_log(GPR_DEBUG, "on_succeeded(%p)", stream); } -static void on_write_completed(cronet_bidirectional_stream *stream, - const char *data) { - if (grpc_cronet_trace) { - gpr_log(GPR_DEBUG, "W: on_write_completed(%p, %s)", stream, data); - } - stream_obj *s = (stream_obj *)stream->annotation; - s->op_done[OP_SEND_MESSAGE] = true; - next_send_step(s); -} -static void process_recv_message(stream_obj *s, const uint8_t *recv_data) { - gpr_slice read_data_slice = gpr_slice_malloc((uint32_t)s->total_read_bytes); - uint8_t *dst_p = GPR_SLICE_START_PTR(read_data_slice); - if (s->total_read_bytes > 0) { - // Only copy if there is non-zero number of bytes - memcpy(dst_p, recv_data, (size_t)s->total_read_bytes); - gpr_slice_buffer_add(&s->read_slice_buffer, read_data_slice); - } - grpc_slice_buffer_stream_init(&s->sbs, &s->read_slice_buffer, 0); - *s->recv_message = (grpc_byte_buffer *)&s->sbs; -} - -static int parse_grpc_header(const uint8_t *data) { - const uint8_t *p = data + 1; - int length = 0; - length |= ((uint8_t)*p++) << 24; - length |= ((uint8_t)*p++) << 16; - length |= ((uint8_t)*p++) << 8; - length |= ((uint8_t)*p++); - return length; -} - -static void on_read_completed(cronet_bidirectional_stream *stream, char *data, - int count) { +static void on_request_headers_sent(cronet_bidirectional_stream *stream) { + gpr_log(GPR_DEBUG, "W: on_request_headers_sent(%p)", stream); stream_obj *s = (stream_obj *)stream->annotation; - if (grpc_cronet_trace) { - gpr_log(GPR_DEBUG, "R: on_read_completed(%p, %p, %d)", stream, data, count); - } - if (count > 0) { - GPR_ASSERT(s->recv_message); - s->remaining_read_bytes -= count; - next_recv_step(s, ON_READ_COMPLETE); - } else { - gpr_log(GPR_DEBUG, "read_closed = true"); - s->read_closed = true; - next_recv_step(s, ON_READ_COMPLETE); - } + s->state_op_done[OP_SEND_INITIAL_METADATA] = true; + s->state_callback_received[OP_SEND_INITIAL_METADATA] = true; + execute_curr_stream_op(s); } static void on_response_headers_received( cronet_bidirectional_stream *stream, const cronet_bidirectional_stream_header_array *headers, const char *negotiated_protocol) { - if (grpc_cronet_trace) { - gpr_log(GPR_DEBUG, "R: on_response_headers_received(%p, %p, %s)", stream, + gpr_log(GPR_DEBUG, "R: on_response_headers_received(%p, %p, %s)", stream, headers, negotiated_protocol); - } - stream_obj *s = (stream_obj *)stream->annotation; - memset(&s->initial_metadata, 0, sizeof(s->initial_metadata)); - grpc_chttp2_incoming_metadata_buffer_init(&s->initial_metadata); + stream_obj *s = (stream_obj *)stream->annotation; + memset(&s->rs.initial_metadata, 0, sizeof(s->rs.initial_metadata)); + grpc_chttp2_incoming_metadata_buffer_init(&s->rs.initial_metadata); unsigned int i = 0; for (i = 0; i < headers->count; i++) { - if (grpc_cronet_trace) { - gpr_log(GPR_DEBUG, "header key=%s, value=%s", headers->headers[i].key, - headers->headers[i].value); - } grpc_chttp2_incoming_metadata_buffer_add( - &s->initial_metadata, + &s->rs.initial_metadata, grpc_mdelem_from_metadata_strings( grpc_mdstr_from_string(headers->headers[i].key), grpc_mdstr_from_string(headers->headers[i].value))); } - - grpc_chttp2_incoming_metadata_buffer_publish(&s->initial_metadata, - s->recv_initial_metadata); - enqueue_callback(s->cb_recv_initial_metadata_ready); - s->op_done[OP_RECV_INITIAL_METADATA] = true; - next_recv_step(s, ON_RESPONSE_HEADERS_RECEIVED); + s->state_callback_received[OP_RECV_INITIAL_METADATA] = true; + execute_curr_stream_op(s); } -static void on_request_headers_sent(cronet_bidirectional_stream *stream) { - if (grpc_cronet_trace) { - gpr_log(GPR_DEBUG, "W: on_request_headers_sent(%p)", stream); - } +static void on_write_completed(cronet_bidirectional_stream *stream, + const char *data) { + gpr_log(GPR_DEBUG, "W: on_write_completed(%p, %s)", stream, data); stream_obj *s = (stream_obj *)stream->annotation; - s->op_done[OP_SEND_INITIAL_METADATA] = true; - next_send_step(s); -} - -// Callback function pointers (invoked by cronet in response to events) -static cronet_bidirectional_stream_callback callbacks = { - on_request_headers_sent, - on_response_headers_received, - on_read_completed, - on_write_completed, - on_response_trailers_received, - on_succeeded, - on_failed, - on_canceled}; - -static void invoke_closing_callback(stream_obj *s) { - if (!is_op_complete(s)) return; - - if (s->imb_valid) { - grpc_chttp2_incoming_metadata_buffer_publish(&s->imb, - s->recv_trailing_metadata); + if (s->ws.write_buffer) { + gpr_free(s->ws.write_buffer); + s->ws.write_buffer = NULL; } - enqueue_callback(s->on_complete); + s->state_callback_received[OP_SEND_MESSAGE] = true; + execute_curr_stream_op(s); } -static void set_recv_state(stream_obj *s, enum recv_state state) { - if (grpc_cronet_trace) { - gpr_log(GPR_DEBUG, "next_state = %s", recv_state_name[state]); +static void on_read_completed(cronet_bidirectional_stream *stream, char *data, + int count) { + stream_obj *s = (stream_obj *)stream->annotation; + gpr_log(GPR_DEBUG, "R: on_read_completed(%p, %p, %d)", stream, data, count); + if (count > 0) { + s->rs.received_bytes += count; + s->rs.remaining_bytes -= count; + if (s->rs.remaining_bytes > 0) { + gpr_log(GPR_DEBUG, "cronet_bidirectional_stream_read"); + cronet_bidirectional_stream_read(s->cbs, s->rs.read_buffer + s->rs.received_bytes, s->rs.remaining_bytes); + } else { + execute_curr_stream_op(s); + } + s->state_callback_received[OP_RECV_MESSAGE] = true; } - s->cronet_recv_state = state; } -// This is invoked from perform_stream_op, and all on_xxxx callbacks. -static void next_recv_step(stream_obj *s, enum e_caller caller) { - // gpr_log(GPR_DEBUG, "locking mutex %p", &s->recv_mu); - gpr_mu_lock(&s->recv_mu); - switch (s->cronet_recv_state) { - case CRONET_RECV_IDLE: - if (grpc_cronet_trace) { - gpr_log(GPR_DEBUG, "cronet_recv_state = CRONET_RECV_IDLE, caller=%d", - caller); - } - if (caller == PERFORM_STREAM_OP || - caller == ON_RESPONSE_HEADERS_RECEIVED) { - if (s->read_closed && s->op_done[OP_RECV_TRAILING_METADATA]) { - set_recv_state(s, CRONET_RECV_CLOSED); - } else if (s->op_done[OP_RECV_INITIAL_METADATA] == true && - s->op_requested[OP_RECV_MESSAGE]) { - set_recv_state(s, CRONET_RECV_READ_LENGTH); - s->total_read_bytes = s->remaining_read_bytes = - GRPC_HEADER_SIZE_IN_BYTES; - GPR_ASSERT(s->read_buffer); - if (grpc_cronet_trace) { - gpr_log(GPR_DEBUG, "R: cronet_bidirectional_stream_read(%p,%p,%d)", - s->cbs, s->read_buffer, s->remaining_read_bytes); - } - cronet_bidirectional_stream_read(s->cbs, s->read_buffer, - s->remaining_read_bytes); - } - } else if (caller == ON_RESPONSE_TRAILERS_RECEIVED) { - // We get here when we receive trailers directly, i.e. without - // going through a data read operation. - set_recv_state(s, CRONET_RECV_CLOSED); - } - break; - case CRONET_RECV_READ_LENGTH: - if (grpc_cronet_trace) { - gpr_log(GPR_DEBUG, "cronet_recv_state = CRONET_RECV_READ_LENGTH"); - } - if (caller == ON_READ_COMPLETE) { - if (s->read_closed) { - enqueue_callback(s->cb_recv_message_ready); - s->op_done[OP_RECV_MESSAGE] = true; - set_recv_state(s, CRONET_RECV_CLOSED); - } else { - GPR_ASSERT(s->remaining_read_bytes == 0); - set_recv_state(s, CRONET_RECV_READ_DATA); - s->total_read_bytes = s->remaining_read_bytes = - parse_grpc_header((const uint8_t *)s->read_buffer); - s->read_buffer = - gpr_realloc(s->read_buffer, (uint32_t)s->remaining_read_bytes); - GPR_ASSERT(s->read_buffer); - if (grpc_cronet_trace) { - gpr_log(GPR_DEBUG, "R: cronet_bidirectional_stream_read(%p,%p,%d)", - s->cbs, s->read_buffer, s->remaining_read_bytes); - } - if (s->remaining_read_bytes > 0) { - cronet_bidirectional_stream_read(s->cbs, (char *)s->read_buffer, - s->remaining_read_bytes); - } else { - // Calling the closing callback directly since this is a 0 byte read - // for an empty message. - process_recv_message(s, NULL); - enqueue_callback(s->cb_recv_message_ready); - s->op_done[OP_RECV_MESSAGE] = true; - set_recv_state(s, CRONET_RECV_CLOSED); - } - } - } - break; - case CRONET_RECV_READ_DATA: - if (grpc_cronet_trace) { - gpr_log(GPR_DEBUG, "cronet_recv_state = CRONET_RECV_READ_DATA"); - } - if (caller == ON_READ_COMPLETE) { - if (s->remaining_read_bytes > 0) { - int offset = s->total_read_bytes - s->remaining_read_bytes; - GPR_ASSERT(s->read_buffer); - if (grpc_cronet_trace) { - gpr_log(GPR_DEBUG, "R: cronet_bidirectional_stream_read()"); - } - cronet_bidirectional_stream_read( - s->cbs, (char *)s->read_buffer + offset, s->remaining_read_bytes); - } else { - gpr_slice_buffer_init(&s->read_slice_buffer); - uint8_t *p = (uint8_t *)s->read_buffer; - process_recv_message(s, p); - set_recv_state(s, CRONET_RECV_IDLE); - enqueue_callback(s->cb_recv_message_ready); - s->op_done[OP_RECV_MESSAGE] = true; - } - } - break; - case CRONET_RECV_CLOSED: - break; - default: - GPR_ASSERT(0); // Should not reach here - break; +static void on_response_trailers_received( + cronet_bidirectional_stream *stream, + const cronet_bidirectional_stream_header_array *trailers) { + gpr_log(GPR_DEBUG, "R: on_response_trailers_received(%p,%p)", stream, + trailers); + stream_obj *s = (stream_obj *)stream->annotation; + memset(&s->rs.trailing_metadata, 0, sizeof(s->rs.trailing_metadata)); + s->rs.trailing_metadata_valid = false; + grpc_chttp2_incoming_metadata_buffer_init(&s->rs.trailing_metadata); + unsigned int i = 0; + for (i = 0; i < trailers->count; i++) { + gpr_log(GPR_DEBUG, "trailer key=%s, value=%s", trailers->headers[i].key, + trailers->headers[i].value); + grpc_chttp2_incoming_metadata_buffer_add( + &s->rs.trailing_metadata, grpc_mdelem_from_metadata_strings( + grpc_mdstr_from_string(trailers->headers[i].key), + grpc_mdstr_from_string(trailers->headers[i].value))); + s->rs.trailing_metadata_valid = true; } - invoke_closing_callback(s); - gpr_mu_unlock(&s->recv_mu); - // gpr_log(GPR_DEBUG, "unlocking mutex %p", &s->recv_mu); + s->state_callback_received[OP_RECV_TRAILING_METADATA] = true; + execute_curr_stream_op(s); } +/************************************************************* +Utility functions. Can be in their own file +*/ // This function takes the data from s->write_slice_buffer and assembles into // a contiguous byte stream with 5 byte gRPC header prepended. -static void create_grpc_frame(stream_obj *s) { - gpr_slice slice = gpr_slice_buffer_take_first(&s->write_slice_buffer); - uint8_t *raw_data = GPR_SLICE_START_PTR(slice); +static void create_grpc_frame(gpr_slice_buffer *write_slice_buffer, + char **pp_write_buffer, int *p_write_buffer_size) { + gpr_slice slice = gpr_slice_buffer_take_first(write_slice_buffer); size_t length = GPR_SLICE_LENGTH(slice); - s->write_buffer_size = length + GRPC_HEADER_SIZE_IN_BYTES; - s->write_buffer = gpr_realloc(s->write_buffer, s->write_buffer_size); - uint8_t *p = (uint8_t *)s->write_buffer; + // TODO (makdharma): FREE THIS!! HACK! + *p_write_buffer_size = (int)length + GRPC_HEADER_SIZE_IN_BYTES; + char *write_buffer = gpr_malloc(length + GRPC_HEADER_SIZE_IN_BYTES); + *pp_write_buffer = write_buffer; + uint8_t *p = (uint8_t *)write_buffer; // Append 5 byte header *p++ = 0; *p++ = (uint8_t)(length >> 24); @@ -489,135 +292,22 @@ static void create_grpc_frame(stream_obj *s) { *p++ = (uint8_t)(length >> 8); *p++ = (uint8_t)(length); // append actual data - memcpy(p, raw_data, length); -} -// Return false if there is no data to write -static bool do_write(stream_obj *s) { - gpr_slice_buffer *sb = &s->write_slice_buffer; - GPR_ASSERT(sb->count <= 1); - if (sb->count > 0) { - create_grpc_frame(s); - if (grpc_cronet_trace) { - gpr_log(GPR_DEBUG, "W: cronet_bidirectional_stream_write(%p,%p,%d,%d)", - s->cbs, s->write_buffer, (int)s->write_buffer_size, false); - } - cronet_bidirectional_stream_write(s->cbs, s->write_buffer, - (int)s->write_buffer_size, false); - return true; - } else { - return false; - } + memcpy(p, GPR_SLICE_START_PTR(slice), length); } -static void init_cronet_stream(stream_obj *s, grpc_transport *gt) { - GPR_ASSERT(s->cbs == NULL); - grpc_cronet_transport *ct = (grpc_cronet_transport *)gt; - GPR_ASSERT(ct->engine); - if (grpc_cronet_trace) { - gpr_log(GPR_DEBUG, "cronet_bidirectional_stream_create"); - } - s->cbs = cronet_bidirectional_stream_create(ct->engine, s, &callbacks); - GPR_ASSERT(s->cbs); - s->read_closed = false; - - for (int i = 0; i < OP_NUM_CALLBACKS; i++) { - s->op_requested[i] = s->op_done[i] = false; - } - s->cronet_send_state = CRONET_SEND_IDLE; - s->cronet_recv_state = CRONET_RECV_IDLE; -} - -static bool do_close_connection(stream_obj *s) { - s->op_done[OP_SEND_TRAILING_METADATA] = true; - if (s->cbs) { - // Send an "empty" write to the far end to signal that we're done. - // This will induce the server to send down trailers. - if (grpc_cronet_trace) { - gpr_log(GPR_DEBUG, "W: cronet_bidirectional_stream_write length 0"); - } - cronet_bidirectional_stream_write(s->cbs, "abc", 0, true); - return true; - } else { - // We never created a stream. This was probably an empty request. - invoke_closing_callback(s); - return true; - } - return false; -} - -// -static void next_send_step(stream_obj *s) { - gpr_log(GPR_DEBUG, "next_send_step cronet_send_state=%d", - s->cronet_send_state); - switch (s->cronet_send_state) { - case CRONET_SEND_IDLE: - GPR_ASSERT( - s->cbs); // cronet_bidirectional_stream is not initialized yet. - s->cronet_send_state = CRONET_SEND_HEADER; - if (grpc_cronet_trace) { - gpr_log(GPR_DEBUG, "cronet_bidirectional_stream_start to %s", s->url); - } - cronet_bidirectional_stream_start(s->cbs, s->url, 0, "POST", - &s->header_array, false); - // we no longer need the memory that was allocated earlier. - gpr_free(s->header_array.headers); - break; - case CRONET_SEND_HEADER: - if (s->op_requested[OP_CANCEL_ERROR]) { - cronet_bidirectional_stream_cancel(s->cbs); - gpr_log(GPR_DEBUG, "W: cronet_bidirectional_stream_cancel(%p)", s->cbs); - s->cronet_send_state = CRONET_WAIT_FOR_CANCEL; - } else if (do_write(s) == false && - s->op_requested[OP_SEND_TRAILING_METADATA]) { - if (do_close_connection(s)) { - s->cronet_send_state = CRONET_STREAM_CLOSED; - } - } else { - s->cronet_send_state = CRONET_WRITE_PENDING; - } - break; - case CRONET_WRITE_PENDING: - if (s->op_requested[OP_CANCEL_ERROR]) { - cronet_bidirectional_stream_cancel(s->cbs); - gpr_log(GPR_DEBUG, "W: cronet_bidirectional_stream_cancel(%p)", s->cbs); - s->cronet_send_state = CRONET_WAIT_FOR_CANCEL; - } else if (do_write(s) == false && - s->op_requested[OP_SEND_TRAILING_METADATA]) { - if (do_close_connection(s)) { - s->cronet_send_state = CRONET_STREAM_CLOSED; - } - } else { - s->cronet_send_state = CRONET_WRITE_COMPLETED; - } - break; - case CRONET_WRITE_COMPLETED: - if (s->op_requested[OP_CANCEL_ERROR]) { - cronet_bidirectional_stream_cancel(s->cbs); - gpr_log(GPR_DEBUG, "W: cronet_bidirectional_stream_cancel(%p)", s->cbs); - s->cronet_send_state = CRONET_WAIT_FOR_CANCEL; - } else if (do_write(s) == false && - s->op_requested[OP_SEND_TRAILING_METADATA]) { - if (do_close_connection(s)) { - s->cronet_send_state = CRONET_STREAM_CLOSED; - } - } - break; - case CRONET_STREAM_CLOSED: - s->cronet_send_state = CRONET_SEND_IDLE; - break; - case CRONET_WAIT_FOR_CANCEL: - invoke_closing_callback(s); - s->cronet_send_state = CRONET_SEND_IDLE; - break; - default: - GPR_ASSERT(0); - break; - } +static void enqueue_callback(grpc_closure *callback) { + GPR_ASSERT(callback); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_exec_ctx_sched(&exec_ctx, callback, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_finish(&exec_ctx); } -static void convert_metadata_to_cronet_headers(grpc_linked_mdelem *head, - const char *host, - stream_obj *s) { +static void convert_metadata_to_cronet_headers( + grpc_linked_mdelem *head, + const char *host, + char **pp_url, + cronet_bidirectional_stream_header **pp_headers, + size_t *p_num_headers) { grpc_linked_mdelem *curr = head; // Walk the linked list and get number of header fields uint32_t num_headers_available = 0; @@ -625,17 +315,19 @@ static void convert_metadata_to_cronet_headers(grpc_linked_mdelem *head, curr = curr->next; num_headers_available++; } - // Allocate enough memory - s->headers = (cronet_bidirectional_stream_header *)gpr_malloc( + // Allocate enough memory. TODO (makdharma): FREE MEMORY! HACK HACK + cronet_bidirectional_stream_header *headers = + (cronet_bidirectional_stream_header *)gpr_malloc( sizeof(cronet_bidirectional_stream_header) * num_headers_available); + *pp_headers = headers; // Walk the linked list again, this time copying the header fields. // s->num_headers // can be less than num_headers_available, as some headers are not used for // cronet curr = head; - s->num_headers = 0; - while (s->num_headers < num_headers_available) { + int num_headers = 0; + while (num_headers < num_headers_available) { grpc_mdelem *mdelem = curr->md; curr = curr->next; const char *key = grpc_mdstr_as_c_string(mdelem->key); @@ -647,161 +339,237 @@ static void convert_metadata_to_cronet_headers(grpc_linked_mdelem *head, } if (strcmp(key, ":path") == 0) { // Create URL by appending :path value to the hostname - gpr_asprintf(&s->url, "https://%s%s", host, value); - if (grpc_cronet_trace) { - // gpr_log(GPR_DEBUG, "extracted URL = %s", s->url); - } + gpr_asprintf(pp_url, "https://%s%s", host, value); continue; } - s->headers[s->num_headers].key = key; - s->headers[s->num_headers].value = value; - s->num_headers++; + headers[num_headers].key = key; + headers[num_headers].value = value; + num_headers++; if (curr == NULL) { break; } } + *p_num_headers = num_headers; } -static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, - grpc_stream *gs, grpc_transport_stream_op *op) { - grpc_cronet_transport *ct = (grpc_cronet_transport *)gt; - GPR_ASSERT(ct->engine); - stream_obj *s = (stream_obj *)gs; - // Initialize a cronet bidirectional stream if it doesn't exist. - if (s->cbs == NULL) { - init_cronet_stream(s, gt); - } - - s->on_complete = op->on_complete; +static int parse_grpc_header(const uint8_t *data) { + const uint8_t *p = data + 1; + int length = 0; + length |= ((uint8_t)*p++) << 24; + length |= ((uint8_t)*p++) << 16; + length |= ((uint8_t)*p++) << 8; + length |= ((uint8_t)*p++); + return length; +} - if (op->recv_trailing_metadata) { - if (grpc_cronet_trace) { - gpr_log(GPR_DEBUG, - "perform_stream_op - recv_trailing_metadata: on_complete=%p", - op->on_complete); - } - s->recv_trailing_metadata = op->recv_trailing_metadata; - s->op_requested[OP_RECV_TRAILING_METADATA] = true; - } - if (op->recv_message) { - if (grpc_cronet_trace) { - gpr_log(GPR_DEBUG, "perform_stream_op - recv_message: on_complete=%p", - op->on_complete); - } - s->recv_message = (grpc_byte_buffer **)op->recv_message; - s->cb_recv_message_ready = op->recv_message_ready; - s->op_requested[OP_RECV_MESSAGE] = true; - } - if (op->recv_initial_metadata) { - if (grpc_cronet_trace) { - gpr_log(GPR_DEBUG, - "perform_stream_op - recv_initial_metadata on_complete=%p, " - "on_ready=%p", - op->on_complete, op->recv_initial_metadata_ready); - } - s->recv_initial_metadata = op->recv_initial_metadata; - s->cb_recv_initial_metadata_ready = op->recv_initial_metadata_ready; - s->op_requested[OP_RECV_INITIAL_METADATA] = true; - } - if (op->send_initial_metadata) { - if (grpc_cronet_trace) { - gpr_log(GPR_DEBUG, - "perform_stream_op - send_initial_metadata: on_complete=%p", - op->on_complete); - } - s->num_headers = 0; - convert_metadata_to_cronet_headers(op->send_initial_metadata->list.head, - ct->host, s); - s->header_array.count = s->num_headers; - s->header_array.capacity = s->num_headers; - s->header_array.headers = s->headers; - s->op_requested[OP_SEND_INITIAL_METADATA] = true; +/* +Op Execution +*/ + +static bool op_can_be_run(stream_obj *s, enum OP_ID op_id) { + if (op_id == OP_SEND_INITIAL_METADATA) { + // already executed + if (s->state_op_done[OP_SEND_INITIAL_METADATA]) return false; + } + if (op_id == OP_RECV_INITIAL_METADATA) { + // already executed + if (s->state_op_done[OP_RECV_INITIAL_METADATA]) return false; + // we haven't sent headers yet. + if (!s->state_callback_received[OP_SEND_INITIAL_METADATA]) return false; + // we haven't received headers yet. + if (!s->state_callback_received[OP_RECV_INITIAL_METADATA]) return false; + } + if (op_id == OP_SEND_MESSAGE) { + // already executed + if (s->state_op_done[OP_SEND_MESSAGE]) return false; + // we haven't received headers yet. + if (!s->state_callback_received[OP_RECV_INITIAL_METADATA]) return false; + } + if (op_id == OP_RECV_MESSAGE) { + // already executed + if (s->state_op_done[OP_RECV_MESSAGE]) return false; + // we haven't received headers yet. + if (!s->state_callback_received[OP_RECV_INITIAL_METADATA]) return false; + } + if (op_id == OP_RECV_TRAILING_METADATA) { + // already executed + if (s->state_op_done[OP_RECV_TRAILING_METADATA]) return false; + // we haven't received trailers yet. + if (!s->state_callback_received[OP_RECV_TRAILING_METADATA]) return false; + } + if (op_id == OP_SEND_TRAILING_METADATA) { + // already executed + if (s->state_op_done[OP_SEND_TRAILING_METADATA]) return false; + // we haven't sent message yet + if (s->curr_op->send_message && !s->state_op_done[OP_SEND_MESSAGE]) return false; + } + + if (op_id == OP_ON_COMPLETE) { + // already executed + if (s->state_op_done[OP_ON_COMPLETE]) return false; + // Check if every op that was asked for is done. + if (s->curr_op->send_initial_metadata && !s->state_op_done[OP_SEND_INITIAL_METADATA]) return false; + if (s->curr_op->send_message && !s->state_op_done[OP_SEND_MESSAGE]) return false; + if (s->curr_op->send_trailing_metadata && !s->state_op_done[OP_SEND_TRAILING_METADATA]) return false; + if (s->curr_op->recv_initial_metadata && !s->state_op_done[OP_RECV_INITIAL_METADATA]) return false; + if (s->curr_op->recv_message && !s->state_op_done[OP_RECV_MESSAGE]) return false; + if (s->curr_op->recv_trailing_metadata && !s->state_op_done[OP_RECV_TRAILING_METADATA]) return false; } - if (op->send_message) { - if (grpc_cronet_trace) { - gpr_log(GPR_DEBUG, "perform_stream_op - send_message: on_complete=%p", - op->on_complete); - } - grpc_byte_stream_next(exec_ctx, op->send_message, &s->slice, - op->send_message->length, NULL); + return true; +} + +static void execute_curr_stream_op(stream_obj *s) { + if (s->curr_op->send_initial_metadata && op_can_be_run(s, OP_SEND_INITIAL_METADATA)) { + // This OP is the beginning. Reset various states + memset(&s->rs, 0, sizeof(s->rs)); + memset(&s->ws, 0, sizeof(s->ws)); + memset(s->state_op_done, 0, sizeof(s->state_op_done)); + memset(s->state_callback_received, 0, sizeof(s->state_callback_received)); + // Start new cronet stream + GPR_ASSERT(s->cbs == NULL); + gpr_log(GPR_DEBUG, "cronet_bidirectional_stream_create"); + s->cbs = cronet_bidirectional_stream_create(s->curr_ct.engine, s->curr_gs, &cronet_callbacks); + char *url; + convert_metadata_to_cronet_headers(s->curr_op->send_initial_metadata->list.head, + s->curr_ct.host, &url, &header_array.headers, &header_array.count); + header_array.capacity = header_array.count; + gpr_log(GPR_DEBUG, "cronet_bidirectional_stream_start"); + cronet_bidirectional_stream_start(s->cbs, url, 0, "POST", &header_array, false); + s->state_op_done[OP_SEND_INITIAL_METADATA] = true; + } else if (s->curr_op->recv_initial_metadata && + op_can_be_run(s, OP_RECV_INITIAL_METADATA)) { + grpc_chttp2_incoming_metadata_buffer_publish(&s->rs.initial_metadata, + s->curr_op->recv_initial_metadata); + enqueue_callback(s->curr_op->recv_initial_metadata_ready); + s->state_op_done[OP_RECV_INITIAL_METADATA] = true; + // We are ready to execute send_message. + execute_curr_stream_op(s); + } else if (s->curr_op->send_message && op_can_be_run(s, OP_SEND_MESSAGE)) { + // TODO (makdharma): Make into a standalone function + gpr_slice_buffer write_slice_buffer; + gpr_slice slice; + gpr_slice_buffer_init(&write_slice_buffer); + grpc_byte_stream_next(NULL, s->curr_op->send_message, &slice, + s->curr_op->send_message->length, NULL); // Check that compression flag is not ON. We don't support compression yet. // TODO (makdharma): add compression support - GPR_ASSERT(op->send_message->flags == 0); - gpr_slice_buffer_add(&s->write_slice_buffer, s->slice); - s->op_requested[OP_SEND_MESSAGE] = true; - } - if (op->send_trailing_metadata) { - if (grpc_cronet_trace) { - gpr_log(GPR_DEBUG, - "perform_stream_op - send_trailing_metadata: on_complete=%p", - op->on_complete); + GPR_ASSERT(s->curr_op->send_message->flags == 0); + gpr_slice_buffer_add(&write_slice_buffer, slice); + GPR_ASSERT(write_slice_buffer.count == 1); // Empty request not handled yet + if (write_slice_buffer.count > 0) { + int write_buffer_size; + create_grpc_frame(&write_slice_buffer, &s->ws.write_buffer, &write_buffer_size); + gpr_log(GPR_DEBUG, "cronet_bidirectional_stream_write (%p)", s->ws.write_buffer); + cronet_bidirectional_stream_write(s->cbs, s->ws.write_buffer, + write_buffer_size, false); // TODO: What if this is not the last write? } - s->op_requested[OP_SEND_TRAILING_METADATA] = true; - } - if (op->cancel_error) { - if (grpc_cronet_trace) { - gpr_log(GPR_DEBUG, "perform_stream_op - cancel_error: on_complete=%p", - op->on_complete); + s->state_op_done[OP_SEND_MESSAGE] = true; + } else if (s->curr_op->recv_message && op_can_be_run(s, OP_RECV_MESSAGE)) { + if (s->rs.length_field_received == false) { + if (s->rs.received_bytes == GRPC_HEADER_SIZE_IN_BYTES && s->rs.remaining_bytes == 0) { + // Start a read operation for data + s->rs.length_field_received = true; + s->rs.length_field = s->rs.remaining_bytes = + parse_grpc_header((const uint8_t *)s->rs.read_buffer); + GPR_ASSERT(s->rs.length_field > 0); // Empty message? + gpr_log(GPR_DEBUG, "length field = %d", s->rs.length_field); + s->rs.read_buffer = gpr_malloc(s->rs.length_field); + GPR_ASSERT(s->rs.read_buffer); + s->rs.remaining_bytes = s->rs.length_field; + s->rs.received_bytes = 0; + gpr_log(GPR_DEBUG, "cronet_bidirectional_stream_read"); + cronet_bidirectional_stream_read(s->cbs, s->rs.read_buffer, + s->rs.remaining_bytes); + } else if (s->rs.remaining_bytes == 0) { + // Start a read operation for first 5 bytes (GRPC header) + s->rs.read_buffer = s->rs.grpc_header_bytes; + s->rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES; + s->rs.received_bytes = 0; + gpr_log(GPR_DEBUG, "cronet_bidirectional_stream_read"); + cronet_bidirectional_stream_read(s->cbs, s->rs.read_buffer, + s->rs.remaining_bytes); + } + } else if (s->rs.remaining_bytes == 0) { + gpr_log(GPR_DEBUG, "read operation complete"); + gpr_slice read_data_slice = gpr_slice_malloc((uint32_t)s->rs.length_field); + uint8_t *dst_p = GPR_SLICE_START_PTR(read_data_slice); + memcpy(dst_p, s->rs.read_buffer, (size_t)s->rs.length_field); + gpr_slice_buffer_init(&s->rs.read_slice_buffer); + gpr_slice_buffer_add(&s->rs.read_slice_buffer, read_data_slice); + grpc_slice_buffer_stream_init(&s->rs.sbs, &s->rs.read_slice_buffer, 0); + *((grpc_byte_buffer **)s->curr_op->recv_message) = (grpc_byte_buffer *)&s->rs.sbs; + enqueue_callback(s->curr_op->recv_message_ready); + s->state_op_done[OP_RECV_MESSAGE] = true; + execute_curr_stream_op(s); + } + } else if (s->curr_op->recv_trailing_metadata && + op_can_be_run(s, OP_RECV_TRAILING_METADATA)) { + if (s->rs.trailing_metadata_valid) { + grpc_chttp2_incoming_metadata_buffer_publish( + &s->rs.trailing_metadata, s->curr_op->recv_trailing_metadata); + s->rs.trailing_metadata_valid = false; } - s->op_requested[OP_CANCEL_ERROR] = true; + s->state_op_done[OP_RECV_TRAILING_METADATA] = true; + execute_curr_stream_op(s); + } else if (s->curr_op->send_trailing_metadata && + op_can_be_run(s, OP_SEND_TRAILING_METADATA)) { + + gpr_log(GPR_DEBUG, "cronet_bidirectional_stream_write (0)"); + cronet_bidirectional_stream_write(s->cbs, "", 0, true); + s->state_op_done[OP_SEND_TRAILING_METADATA] = true; + } else if (op_can_be_run(s, OP_ON_COMPLETE)) { + // All ops are complete. Call the on_complete callback + enqueue_callback(s->curr_op->on_complete); + s->state_op_done[OP_ON_COMPLETE] = true; + cronet_bidirectional_stream_destroy(s->cbs); + s->cbs = NULL; } - next_send_step(s); - next_recv_step(s, PERFORM_STREAM_OP); } +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, grpc_stream *gs, grpc_stream_refcount *refcount, const void *server_data) { stream_obj *s = (stream_obj *)gs; - memset(s, 0, sizeof(stream_obj)); - s->cbs = NULL; - gpr_mu_init(&s->recv_mu); - s->read_buffer = gpr_malloc(GRPC_HEADER_SIZE_IN_BYTES); - s->write_buffer = gpr_malloc(GRPC_HEADER_SIZE_IN_BYTES); - gpr_slice_buffer_init(&s->write_slice_buffer); - if (grpc_cronet_trace) { - gpr_log(GPR_DEBUG, "cronet_transport - init_stream"); - } + memset(&s->storage, 0, sizeof(s->storage)); + s->curr_op = NULL; return 0; } -static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, - grpc_stream *gs, void *and_free_memory) { - if (grpc_cronet_trace) { - gpr_log(GPR_DEBUG, "Destroy stream"); - } +static void set_pollset_do_nothing(grpc_exec_ctx *exec_ctx, grpc_transport *gt, + grpc_stream *gs, grpc_pollset *pollset) {} + +static void set_pollset_set_do_nothing(grpc_exec_ctx *exec_ctx, + grpc_transport *gt, grpc_stream *gs, + grpc_pollset_set *pollset_set) {} + +static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, + grpc_stream *gs, grpc_transport_stream_op *op) { + gpr_log(GPR_DEBUG, "perform_stream_op"); stream_obj *s = (stream_obj *)gs; - s->cbs = NULL; - gpr_free(s->read_buffer); - gpr_free(s->write_buffer); - gpr_free(s->url); - gpr_log(GPR_DEBUG, "destroying %p", &s->recv_mu); - gpr_mu_destroy(&s->recv_mu); - if (and_free_memory) { - gpr_free(and_free_memory); + memcpy(&s->curr_ct, gt, sizeof(grpc_cronet_transport)); + add_pending_op(&s->storage, op); + if (s->curr_op == NULL) { + s->curr_op = pop_pending_op(&s->storage); } + s->curr_gs = gs; + execute_curr_stream_op(s); +} + +static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, + grpc_stream *gs, void *and_free_memory) { } static void destroy_transport(grpc_exec_ctx *exec_ctx, grpc_transport *gt) { - grpc_cronet_transport *ct = (grpc_cronet_transport *)gt; - gpr_free(ct->host); - if (grpc_cronet_trace) { - gpr_log(GPR_DEBUG, "Destroy transport"); - } } static char *get_peer(grpc_exec_ctx *exec_ctx, grpc_transport *gt) { - if (grpc_cronet_trace) { - gpr_log(GPR_DEBUG, "Unimplemented method"); - } return NULL; } static void perform_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, grpc_transport_op *op) { - if (grpc_cronet_trace) { - gpr_log(GPR_DEBUG, "Unimplemented method"); - } } const grpc_transport_vtable grpc_cronet_vtable = {sizeof(stream_obj), -- GitLab