Skip to content
Snippets Groups Projects
Commit 21490a1e authored by Muxi Yan's avatar Muxi Yan
Browse files

Fix flush read

parent d6274219
No related branches found
No related tags found
No related merge requests found
...@@ -54,6 +54,7 @@ ...@@ -54,6 +54,7 @@
#include "third_party/objective_c/Cronet/bidirectional_stream_c.h" #include "third_party/objective_c/Cronet/bidirectional_stream_c.h"
#define GRPC_HEADER_SIZE_IN_BYTES 5 #define GRPC_HEADER_SIZE_IN_BYTES 5
#define GRPC_FLUSH_READ_SIZE 4096
#define CRONET_LOG(...) \ #define CRONET_LOG(...) \
do { \ do { \
...@@ -155,7 +156,9 @@ struct op_state { ...@@ -155,7 +156,9 @@ struct op_state {
bool flush_read; bool flush_read;
bool flush_cronet_when_ready; bool flush_cronet_when_ready;
bool pending_write_for_trailer; bool pending_write_for_trailer;
bool unprocessed_send_message; bool pending_send_message;
bool pending_recv_trailing_metadata;
bool pending_read_from_cronet;
grpc_error *cancel_error; grpc_error *cancel_error;
/* data structure for storing data coming from server */ /* data structure for storing data coming from server */
struct read_state rs; struct read_state rs;
...@@ -248,11 +251,27 @@ static const char *op_id_string(enum e_op_id i) { ...@@ -248,11 +251,27 @@ static const char *op_id_string(enum e_op_id i) {
return "UNKNOWN"; return "UNKNOWN";
} }
static void free_read_buffer(stream_obj *s) { static void null_and_maybe_free_read_buffer(stream_obj *s) {
if (s->state.rs.read_buffer && if (s->state.rs.read_buffer &&
s->state.rs.read_buffer != s->state.rs.grpc_header_bytes) { s->state.rs.read_buffer != s->state.rs.grpc_header_bytes) {
gpr_free(s->state.rs.read_buffer); gpr_free(s->state.rs.read_buffer);
s->state.rs.read_buffer = NULL; }
s->state.rs.read_buffer = NULL;
}
static void maybe_flush_read(stream_obj *s) {
if (s->state.pending_recv_trailing_metadata && s->state.fail_state) {
if (!s->state.flush_read) {
CRONET_LOG(GPR_DEBUG, "%p: Flush read", s);
s->state.flush_read = true;
null_and_maybe_free_read_buffer(s);
s->state.rs.read_buffer = gpr_malloc(GRPC_FLUSH_READ_SIZE);
if (!s->state.pending_read_from_cronet) {
CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
bidirectional_stream_read(s->cbs, s->state.rs.read_buffer, GRPC_FLUSH_READ_SIZE);
s->state.pending_read_from_cronet = true;
}
}
} }
} }
...@@ -279,7 +298,11 @@ static void add_to_storage(struct stream_obj *s, grpc_transport_stream_op *op) { ...@@ -279,7 +298,11 @@ static void add_to_storage(struct stream_obj *s, grpc_transport_stream_op *op) {
storage->head = new_op; storage->head = new_op;
storage->num_pending_ops++; storage->num_pending_ops++;
if (op->send_message) { if (op->send_message) {
s->state.unprocessed_send_message = true; s->state.pending_send_message = true;
}
if (op->recv_trailing_metadata) {
s->state.pending_recv_trailing_metadata = true;
maybe_flush_read(s);
} }
CRONET_LOG(GPR_DEBUG, "adding new op %p. %d in the queue.", new_op, CRONET_LOG(GPR_DEBUG, "adding new op %p. %d in the queue.", new_op,
storage->num_pending_ops); storage->num_pending_ops);
...@@ -367,7 +390,7 @@ static void on_failed(bidirectional_stream *stream, int net_error) { ...@@ -367,7 +390,7 @@ static void on_failed(bidirectional_stream *stream, int net_error) {
gpr_free(s->state.ws.write_buffer); gpr_free(s->state.ws.write_buffer);
s->state.ws.write_buffer = NULL; s->state.ws.write_buffer = NULL;
} }
free_read_buffer(s); null_and_maybe_free_read_buffer(s);
gpr_mu_unlock(&s->mu); gpr_mu_unlock(&s->mu);
execute_from_storage(s); execute_from_storage(s);
} }
...@@ -390,7 +413,7 @@ static void on_canceled(bidirectional_stream *stream) { ...@@ -390,7 +413,7 @@ static void on_canceled(bidirectional_stream *stream) {
gpr_free(s->state.ws.write_buffer); gpr_free(s->state.ws.write_buffer);
s->state.ws.write_buffer = NULL; s->state.ws.write_buffer = NULL;
} }
free_read_buffer(s); null_and_maybe_free_read_buffer(s);
gpr_mu_unlock(&s->mu); gpr_mu_unlock(&s->mu);
execute_from_storage(s); execute_from_storage(s);
} }
...@@ -405,7 +428,7 @@ static void on_succeeded(bidirectional_stream *stream) { ...@@ -405,7 +428,7 @@ static void on_succeeded(bidirectional_stream *stream) {
bidirectional_stream_destroy(s->cbs); bidirectional_stream_destroy(s->cbs);
s->state.state_callback_received[OP_SUCCEEDED] = true; s->state.state_callback_received[OP_SUCCEEDED] = true;
s->cbs = NULL; s->cbs = NULL;
free_read_buffer(s); null_and_maybe_free_read_buffer(s);
gpr_mu_unlock(&s->mu); gpr_mu_unlock(&s->mu);
execute_from_storage(s); execute_from_storage(s);
} }
...@@ -473,6 +496,7 @@ static void on_response_headers_received( ...@@ -473,6 +496,7 @@ static void on_response_headers_received(
CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs); CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
bidirectional_stream_read(s->cbs, s->state.rs.read_buffer, bidirectional_stream_read(s->cbs, s->state.rs.read_buffer,
s->state.rs.remaining_bytes); s->state.rs.remaining_bytes);
s->state.pending_read_from_cronet = true;
} }
gpr_mu_unlock(&s->mu); gpr_mu_unlock(&s->mu);
grpc_exec_ctx_finish(&exec_ctx); grpc_exec_ctx_finish(&exec_ctx);
...@@ -504,10 +528,12 @@ static void on_read_completed(bidirectional_stream *stream, char *data, ...@@ -504,10 +528,12 @@ static void on_read_completed(bidirectional_stream *stream, char *data,
CRONET_LOG(GPR_DEBUG, "R: on_read_completed(%p, %p, %d)", stream, data, CRONET_LOG(GPR_DEBUG, "R: on_read_completed(%p, %p, %d)", stream, data,
count); count);
gpr_mu_lock(&s->mu); gpr_mu_lock(&s->mu);
s->state.pending_read_from_cronet = false;
s->state.state_callback_received[OP_RECV_MESSAGE] = true; s->state.state_callback_received[OP_RECV_MESSAGE] = true;
if (count > 0 && s->state.flush_read) { if (count > 0 && s->state.flush_read) {
CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs); CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
bidirectional_stream_read(s->cbs, s->state.rs.read_buffer, 4096); bidirectional_stream_read(s->cbs, s->state.rs.read_buffer, GRPC_FLUSH_READ_SIZE);
s->state.pending_read_from_cronet = true;
gpr_mu_unlock(&s->mu); gpr_mu_unlock(&s->mu);
} else if (count > 0) { } else if (count > 0) {
s->state.rs.received_bytes += count; s->state.rs.received_bytes += count;
...@@ -518,16 +544,14 @@ static void on_read_completed(bidirectional_stream *stream, char *data, ...@@ -518,16 +544,14 @@ static void on_read_completed(bidirectional_stream *stream, char *data,
bidirectional_stream_read( bidirectional_stream_read(
s->cbs, s->state.rs.read_buffer + s->state.rs.received_bytes, s->cbs, s->state.rs.read_buffer + s->state.rs.received_bytes,
s->state.rs.remaining_bytes); s->state.rs.remaining_bytes);
s->state.pending_read_from_cronet = true;
gpr_mu_unlock(&s->mu); gpr_mu_unlock(&s->mu);
} else { } else {
gpr_mu_unlock(&s->mu); gpr_mu_unlock(&s->mu);
execute_from_storage(s); execute_from_storage(s);
} }
} else { } else {
if (s->state.flush_read) { null_and_maybe_free_read_buffer(s);
gpr_free(s->state.rs.read_buffer);
s->state.rs.read_buffer = NULL;
}
s->state.rs.read_stream_closed = true; s->state.rs.read_stream_closed = true;
gpr_mu_unlock(&s->mu); gpr_mu_unlock(&s->mu);
execute_from_storage(s); execute_from_storage(s);
...@@ -564,6 +588,7 @@ static void on_response_trailers_received( ...@@ -564,6 +588,7 @@ static void on_response_trailers_received(
if (0 == strcmp(trailers->headers[i].key, "grpc-status") && if (0 == strcmp(trailers->headers[i].key, "grpc-status") &&
0 != strcmp(trailers->headers[i].value, "0")) { 0 != strcmp(trailers->headers[i].value, "0")) {
s->state.fail_state = true; s->state.fail_state = true;
maybe_flush_read(s);
} }
} }
s->state.state_callback_received[OP_RECV_TRAILING_METADATA] = true; s->state.state_callback_received[OP_RECV_TRAILING_METADATA] = true;
...@@ -778,7 +803,7 @@ static bool op_can_be_run(grpc_transport_stream_op *curr_op, ...@@ -778,7 +803,7 @@ static bool op_can_be_run(grpc_transport_stream_op *curr_op,
else if (!stream_state->state_callback_received[OP_SEND_INITIAL_METADATA]) else if (!stream_state->state_callback_received[OP_SEND_INITIAL_METADATA])
result = false; result = false;
/* we haven't sent message yet */ /* we haven't sent message yet */
else if (stream_state->unprocessed_send_message && else if (stream_state->pending_send_message &&
!stream_state->state_op_done[OP_SEND_MESSAGE]) !stream_state->state_op_done[OP_SEND_MESSAGE])
result = false; result = false;
/* we haven't got on_write_completed for the send yet */ /* we haven't got on_write_completed for the send yet */
...@@ -900,7 +925,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, ...@@ -900,7 +925,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
} else if (stream_op->send_message && } else if (stream_op->send_message &&
op_can_be_run(stream_op, s, &oas->state, OP_SEND_MESSAGE)) { op_can_be_run(stream_op, s, &oas->state, OP_SEND_MESSAGE)) {
CRONET_LOG(GPR_DEBUG, "running: %p OP_SEND_MESSAGE", oas); CRONET_LOG(GPR_DEBUG, "running: %p OP_SEND_MESSAGE", oas);
stream_state->unprocessed_send_message = false; stream_state->pending_send_message = false;
if (stream_state->state_callback_received[OP_FAILED]) { if (stream_state->state_callback_received[OP_FAILED]) {
result = NO_ACTION_POSSIBLE; result = NO_ACTION_POSSIBLE;
CRONET_LOG(GPR_DEBUG, "Stream is either cancelled or failed."); CRONET_LOG(GPR_DEBUG, "Stream is either cancelled or failed.");
...@@ -1009,6 +1034,13 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, ...@@ -1009,6 +1034,13 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
stream_state->state_op_done[OP_RECV_MESSAGE] = true; stream_state->state_op_done[OP_RECV_MESSAGE] = true;
oas->state.state_op_done[OP_RECV_MESSAGE] = true; oas->state.state_op_done[OP_RECV_MESSAGE] = true;
result = ACTION_TAKEN_NO_CALLBACK; result = ACTION_TAKEN_NO_CALLBACK;
} else if (stream_state->flush_read) {
CRONET_LOG(GPR_DEBUG, "flush read");
grpc_closure_sched(exec_ctx, stream_op->recv_message_ready,
GRPC_ERROR_NONE);
stream_state->state_op_done[OP_RECV_MESSAGE] = true;
oas->state.state_op_done[OP_RECV_MESSAGE] = true;
result = ACTION_TAKEN_NO_CALLBACK;
} else if (stream_state->rs.length_field_received == false) { } else if (stream_state->rs.length_field_received == false) {
if (stream_state->rs.received_bytes == GRPC_HEADER_SIZE_IN_BYTES && if (stream_state->rs.received_bytes == GRPC_HEADER_SIZE_IN_BYTES &&
stream_state->rs.remaining_bytes == 0) { stream_state->rs.remaining_bytes == 0) {
...@@ -1029,6 +1061,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, ...@@ -1029,6 +1061,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
true; /* Indicates that at least one read request has been made */ true; /* Indicates that at least one read request has been made */
bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer, bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer,
stream_state->rs.remaining_bytes); stream_state->rs.remaining_bytes);
stream_state->pending_read_from_cronet = true;
result = ACTION_TAKEN_WITH_CALLBACK; result = ACTION_TAKEN_WITH_CALLBACK;
} else { } else {
stream_state->rs.remaining_bytes = 0; stream_state->rs.remaining_bytes = 0;
...@@ -1047,11 +1080,13 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, ...@@ -1047,11 +1080,13 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
stream_state->rs.read_buffer = stream_state->rs.grpc_header_bytes; stream_state->rs.read_buffer = stream_state->rs.grpc_header_bytes;
stream_state->rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES; stream_state->rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES;
stream_state->rs.received_bytes = 0; stream_state->rs.received_bytes = 0;
stream_state->rs.length_field_received = false;
CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs); CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
stream_state->state_op_done[OP_READ_REQ_MADE] = stream_state->state_op_done[OP_READ_REQ_MADE] =
true; /* Indicates that at least one read request has been made */ true; /* Indicates that at least one read request has been made */
bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer, bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer,
stream_state->rs.remaining_bytes); stream_state->rs.remaining_bytes);
stream_state->pending_read_from_cronet = true;
result = ACTION_TAKEN_NO_CALLBACK; result = ACTION_TAKEN_NO_CALLBACK;
} }
} else if (stream_state->rs.remaining_bytes == 0) { } else if (stream_state->rs.remaining_bytes == 0) {
...@@ -1064,6 +1099,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, ...@@ -1064,6 +1099,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
true; /* Indicates that at least one read request has been made */ true; /* Indicates that at least one read request has been made */
bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer, bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer,
stream_state->rs.remaining_bytes); stream_state->rs.remaining_bytes);
stream_state->pending_read_from_cronet = true;
result = ACTION_TAKEN_WITH_CALLBACK; result = ACTION_TAKEN_WITH_CALLBACK;
} else { } else {
result = NO_ACTION_POSSIBLE; result = NO_ACTION_POSSIBLE;
...@@ -1075,7 +1111,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, ...@@ -1075,7 +1111,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
uint8_t *dst_p = GRPC_SLICE_START_PTR(read_data_slice); uint8_t *dst_p = GRPC_SLICE_START_PTR(read_data_slice);
memcpy(dst_p, stream_state->rs.read_buffer, memcpy(dst_p, stream_state->rs.read_buffer,
(size_t)stream_state->rs.length_field); (size_t)stream_state->rs.length_field);
free_read_buffer(s); null_and_maybe_free_read_buffer(s);
grpc_slice_buffer_init(&stream_state->rs.read_slice_buffer); grpc_slice_buffer_init(&stream_state->rs.read_slice_buffer);
grpc_slice_buffer_add(&stream_state->rs.read_slice_buffer, grpc_slice_buffer_add(&stream_state->rs.read_slice_buffer,
read_data_slice); read_data_slice);
...@@ -1096,6 +1132,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, ...@@ -1096,6 +1132,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs); CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer, bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer,
stream_state->rs.remaining_bytes); stream_state->rs.remaining_bytes);
stream_state->pending_read_from_cronet = true;
result = ACTION_TAKEN_NO_CALLBACK; result = ACTION_TAKEN_NO_CALLBACK;
} }
} else if (stream_op->recv_trailing_metadata && } else if (stream_op->recv_trailing_metadata &&
...@@ -1153,15 +1190,6 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, ...@@ -1153,15 +1190,6 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
make a note */ make a note */
if (stream_op->recv_message) if (stream_op->recv_message)
stream_state->state_op_done[OP_RECV_MESSAGE_AND_ON_COMPLETE] = true; stream_state->state_op_done[OP_RECV_MESSAGE_AND_ON_COMPLETE] = true;
} else if (stream_state->fail_state && !stream_state->flush_read) {
CRONET_LOG(GPR_DEBUG, "running: %p flush read", oas);
if (stream_state->rs.read_buffer &&
stream_state->rs.read_buffer != stream_state->rs.grpc_header_bytes) {
gpr_free(stream_state->rs.read_buffer);
stream_state->rs.read_buffer = NULL;
}
stream_state->rs.read_buffer = gpr_malloc(4096);
stream_state->flush_read = true;
} else { } else {
result = NO_ACTION_POSSIBLE; result = NO_ACTION_POSSIBLE;
} }
...@@ -1190,7 +1218,9 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, ...@@ -1190,7 +1218,9 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
s->state.fail_state = s->state.flush_read = false; s->state.fail_state = s->state.flush_read = false;
s->state.cancel_error = NULL; s->state.cancel_error = NULL;
s->state.flush_cronet_when_ready = s->state.pending_write_for_trailer = false; s->state.flush_cronet_when_ready = s->state.pending_write_for_trailer = false;
s->state.unprocessed_send_message = false; s->state.pending_send_message = false;
s->state.pending_recv_trailing_metadata = false;
s->state.pending_read_from_cronet = false;
s->curr_gs = gs; s->curr_gs = gs;
s->curr_ct = (grpc_cronet_transport *)gt; s->curr_ct = (grpc_cronet_transport *)gt;
...@@ -1240,6 +1270,7 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, ...@@ -1240,6 +1270,7 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_stream *gs, void *and_free_memory) { grpc_stream *gs, void *and_free_memory) {
stream_obj *s = (stream_obj *)gs; stream_obj *s = (stream_obj *)gs;
null_and_maybe_free_read_buffer(s);
GRPC_ERROR_UNREF(s->state.cancel_error); GRPC_ERROR_UNREF(s->state.cancel_error);
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment