Skip to content
Snippets Groups Projects
Commit 1b8deaa4 authored by Makarand Dharmapurikar's avatar Makarand Dharmapurikar
Browse files

addressed review feedback.

parent 3578c5e5
No related branches found
No related tags found
No related merge requests found
......@@ -52,12 +52,12 @@
#define GRPC_HEADER_SIZE_IN_BYTES 5
#define CRONET_LOG(...) \
{ \
do { \
if (grpc_cronet_trace) gpr_log(__VA_ARGS__); \
}
} while (0)
/* TODO (makdharma): Hook up into the wider tracing mechanism */
int grpc_cronet_trace = 1;
int grpc_cronet_trace = 0;
enum OP_RESULT {
ACTION_TAKEN_WITH_CALLBACK,
......@@ -192,8 +192,6 @@ struct stream_obj {
cronet_bidirectional_stream *cbs;
cronet_bidirectional_stream_header_array header_array;
/* Used for executing callbacks for ops */
grpc_exec_ctx exec_ctx;
/* Stream level state. Some state will be tracked both at stream and stream_op
* level */
struct op_state state;
......@@ -206,7 +204,8 @@ struct stream_obj {
};
typedef struct stream_obj stream_obj;
static enum OP_RESULT execute_stream_op(struct op_and_state *oas);
static enum OP_RESULT execute_stream_op(grpc_exec_ctx *exec_ctx,
struct op_and_state *oas);
/*
Add a new stream op to op storage.
......@@ -267,11 +266,12 @@ static void remove_from_storage(struct stream_obj *s,
or on the application supplied thread via the perform_stream_op function.
*/
static void execute_from_storage(stream_obj *s) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
gpr_mu_lock(&s->mu);
for (struct op_and_state *curr = s->storage.head; curr != NULL;) {
CRONET_LOG(GPR_DEBUG, "calling op at %p. done = %d", curr, curr->done);
GPR_ASSERT(curr->done == 0);
enum OP_RESULT result = execute_stream_op(curr);
enum OP_RESULT result = execute_stream_op(&exec_ctx, curr);
CRONET_LOG(GPR_DEBUG, "execute_stream_op[%p] returns %s", curr,
op_result_string[result]);
/* if this op is done, then remove it and free memory */
......@@ -288,7 +288,7 @@ static void execute_from_storage(stream_obj *s) {
}
}
gpr_mu_unlock(&s->mu);
grpc_exec_ctx_finish(&s->exec_ctx);
grpc_exec_ctx_finish(&exec_ctx);
}
/*
......@@ -683,7 +683,8 @@ static bool op_can_be_run(grpc_transport_stream_op *curr_op,
return result;
}
static enum OP_RESULT execute_stream_op(struct op_and_state *oas) {
static enum OP_RESULT execute_stream_op(grpc_exec_ctx *exec_ctx,
struct op_and_state *oas) {
grpc_transport_stream_op *stream_op = &oas->op;
struct stream_obj *s = oas->s;
struct op_state *stream_state = &s->state;
......@@ -724,10 +725,10 @@ static enum OP_RESULT execute_stream_op(struct op_and_state *oas) {
if (!stream_state->state_op_done[OP_CANCEL_ERROR]) {
grpc_chttp2_incoming_metadata_buffer_publish(
&oas->s->state.rs.initial_metadata, stream_op->recv_initial_metadata);
grpc_exec_ctx_sched(&s->exec_ctx, stream_op->recv_initial_metadata_ready,
grpc_exec_ctx_sched(exec_ctx, stream_op->recv_initial_metadata_ready,
GRPC_ERROR_NONE, NULL);
} else {
grpc_exec_ctx_sched(&s->exec_ctx, stream_op->recv_initial_metadata_ready,
grpc_exec_ctx_sched(exec_ctx, stream_op->recv_initial_metadata_ready,
GRPC_ERROR_CANCELLED, NULL);
}
stream_state->state_op_done[OP_RECV_INITIAL_METADATA] = true;
......@@ -764,13 +765,13 @@ static enum OP_RESULT execute_stream_op(struct op_and_state *oas) {
OP_RECV_MESSAGE)) {
CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_MESSAGE", oas);
if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
grpc_exec_ctx_sched(&s->exec_ctx, stream_op->recv_message_ready,
grpc_exec_ctx_sched(exec_ctx, stream_op->recv_message_ready,
GRPC_ERROR_CANCELLED, NULL);
stream_state->state_op_done[OP_RECV_MESSAGE] = true;
} else if (stream_state->rs.read_stream_closed == true) {
/* No more data will be received */
CRONET_LOG(GPR_DEBUG, "read stream closed");
grpc_exec_ctx_sched(&s->exec_ctx, stream_op->recv_message_ready,
grpc_exec_ctx_sched(exec_ctx, stream_op->recv_message_ready,
GRPC_ERROR_NONE, NULL);
stream_state->state_op_done[OP_RECV_MESSAGE] = true;
oas->state.state_op_done[OP_RECV_MESSAGE] = true;
......@@ -803,7 +804,7 @@ static enum OP_RESULT execute_stream_op(struct op_and_state *oas) {
&stream_state->rs.read_slice_buffer, 0);
*((grpc_byte_buffer **)stream_op->recv_message) =
(grpc_byte_buffer *)&stream_state->rs.sbs;
grpc_exec_ctx_sched(&s->exec_ctx, stream_op->recv_message_ready,
grpc_exec_ctx_sched(exec_ctx, stream_op->recv_message_ready,
GRPC_ERROR_NONE, NULL);
stream_state->state_op_done[OP_RECV_MESSAGE] = true;
oas->state.state_op_done[OP_RECV_MESSAGE] = true;
......@@ -835,7 +836,7 @@ static enum OP_RESULT execute_stream_op(struct op_and_state *oas) {
&stream_state->rs.read_slice_buffer, 0);
*((grpc_byte_buffer **)stream_op->recv_message) =
(grpc_byte_buffer *)&stream_state->rs.sbs;
grpc_exec_ctx_sched(&s->exec_ctx, stream_op->recv_message_ready,
grpc_exec_ctx_sched(exec_ctx, stream_op->recv_message_ready,
GRPC_ERROR_NONE, NULL);
stream_state->state_op_done[OP_RECV_MESSAGE] = true;
oas->state.state_op_done[OP_RECV_MESSAGE] = true;
......@@ -882,7 +883,7 @@ static enum OP_RESULT execute_stream_op(struct op_and_state *oas) {
/* All actions in this stream_op are complete. Call the on_complete callback
*/
CRONET_LOG(GPR_DEBUG, "running: %p OP_ON_COMPLETE", oas);
grpc_exec_ctx_sched(&s->exec_ctx, stream_op->on_complete, GRPC_ERROR_NONE,
grpc_exec_ctx_sched(exec_ctx, stream_op->on_complete, GRPC_ERROR_NONE,
NULL);
oas->state.state_op_done[OP_ON_COMPLETE] = true;
oas->done = true;
......@@ -923,7 +924,6 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
memset(s->state.state_callback_received, 0,
sizeof(s->state.state_callback_received));
gpr_mu_init(&s->mu);
s->exec_ctx = *exec_ctx;
return 0;
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment