diff --git a/CMakeLists.txt b/CMakeLists.txt index 266f2c0774a921b2775945d59f0602be090b321b..f71563a38d7a30d78dac1c6e6f0102923cc7643c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -394,6 +394,7 @@ add_dependencies(buildtests_c bad_server_response_test) add_dependencies(buildtests_c bdp_estimator_test) add_dependencies(buildtests_c bin_decoder_test) add_dependencies(buildtests_c bin_encoder_test) +add_dependencies(buildtests_c byte_stream_test) add_dependencies(buildtests_c census_context_test) add_dependencies(buildtests_c census_intrusive_hash_map_test) add_dependencies(buildtests_c census_resource_test) @@ -4785,6 +4786,37 @@ target_link_libraries(bin_encoder_test endif (gRPC_BUILD_TESTS) if (gRPC_BUILD_TESTS) +add_executable(byte_stream_test + test/core/transport/byte_stream_test.c +) + + +target_include_directories(byte_stream_test + PRIVATE ${CMAKE_CURRENT_SOURCE_DIR} + PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/include + PRIVATE ${BORINGSSL_ROOT_DIR}/include + PRIVATE ${PROTOBUF_ROOT_DIR}/src + PRIVATE ${BENCHMARK_ROOT_DIR}/include + PRIVATE ${ZLIB_ROOT_DIR} + PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/third_party/zlib + PRIVATE ${CARES_BUILD_INCLUDE_DIR} + PRIVATE ${CARES_INCLUDE_DIR} + PRIVATE ${CARES_PLATFORM_INCLUDE_DIR} + PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/third_party/cares/cares + PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/third_party/gflags/include +) + +target_link_libraries(byte_stream_test + ${_gRPC_ALLTARGETS_LIBRARIES} + grpc_test_util + grpc + gpr_test_util + gpr +) + +endif (gRPC_BUILD_TESTS) +if (gRPC_BUILD_TESTS) + add_executable(census_context_test test/core/census/context_test.c ) diff --git a/Makefile b/Makefile index 7b53024b6c1516e272fafc1ba09cb574aecbfdd6..98cfb04e54a0cbcf115b2b078e1b308462f5ad01 100644 --- a/Makefile +++ b/Makefile @@ -954,6 +954,7 @@ bad_server_response_test: $(BINDIR)/$(CONFIG)/bad_server_response_test bdp_estimator_test: $(BINDIR)/$(CONFIG)/bdp_estimator_test bin_decoder_test: $(BINDIR)/$(CONFIG)/bin_decoder_test bin_encoder_test: $(BINDIR)/$(CONFIG)/bin_encoder_test +byte_stream_test: $(BINDIR)/$(CONFIG)/byte_stream_test census_context_test: $(BINDIR)/$(CONFIG)/census_context_test census_intrusive_hash_map_test: $(BINDIR)/$(CONFIG)/census_intrusive_hash_map_test census_resource_test: $(BINDIR)/$(CONFIG)/census_resource_test @@ -1345,6 +1346,7 @@ buildtests_c: privatelibs_c \ $(BINDIR)/$(CONFIG)/bdp_estimator_test \ $(BINDIR)/$(CONFIG)/bin_decoder_test \ $(BINDIR)/$(CONFIG)/bin_encoder_test \ + $(BINDIR)/$(CONFIG)/byte_stream_test \ $(BINDIR)/$(CONFIG)/census_context_test \ $(BINDIR)/$(CONFIG)/census_intrusive_hash_map_test \ $(BINDIR)/$(CONFIG)/census_resource_test \ @@ -1746,6 +1748,8 @@ test_c: buildtests_c $(Q) $(BINDIR)/$(CONFIG)/bin_decoder_test || ( echo test bin_decoder_test failed ; exit 1 ) $(E) "[RUN] Testing bin_encoder_test" $(Q) $(BINDIR)/$(CONFIG)/bin_encoder_test || ( echo test bin_encoder_test failed ; exit 1 ) + $(E) "[RUN] Testing byte_stream_test" + $(Q) $(BINDIR)/$(CONFIG)/byte_stream_test || ( echo test byte_stream_test failed ; exit 1 ) $(E) "[RUN] Testing census_context_test" $(Q) $(BINDIR)/$(CONFIG)/census_context_test || ( echo test census_context_test failed ; exit 1 ) $(E) "[RUN] Testing census_intrusive_hash_map_test" @@ -8411,6 +8415,38 @@ endif endif +BYTE_STREAM_TEST_SRC = \ + test/core/transport/byte_stream_test.c \ + +BYTE_STREAM_TEST_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(BYTE_STREAM_TEST_SRC)))) +ifeq ($(NO_SECURE),true) + +# You can't build secure targets if you don't have OpenSSL. + +$(BINDIR)/$(CONFIG)/byte_stream_test: openssl_dep_error + +else + + + +$(BINDIR)/$(CONFIG)/byte_stream_test: $(BYTE_STREAM_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a + $(E) "[LD] Linking $@" + $(Q) mkdir -p `dirname $@` + $(Q) $(LD) $(LDFLAGS) $(BYTE_STREAM_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LDLIBS) $(LDLIBS_SECURE) -o $(BINDIR)/$(CONFIG)/byte_stream_test + +endif + +$(OBJDIR)/$(CONFIG)/test/core/transport/byte_stream_test.o: $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a + +deps_byte_stream_test: $(BYTE_STREAM_TEST_OBJS:.o=.dep) + +ifneq ($(NO_SECURE),true) +ifneq ($(NO_DEPS),true) +-include $(BYTE_STREAM_TEST_OBJS:.o=.dep) +endif +endif + + CENSUS_CONTEXT_TEST_SRC = \ test/core/census/context_test.c \ diff --git a/build.yaml b/build.yaml index 198467bf5ade45e4b6b0f2bd434467ef0ec6adb0..9c2504af71fa56ba5b0975b24f1429b00657ac01 100644 --- a/build.yaml +++ b/build.yaml @@ -1702,6 +1702,16 @@ targets: deps: - grpc_test_util - grpc +- name: byte_stream_test + build: test + language: c + src: + - test/core/transport/byte_stream_test.c + deps: + - grpc_test_util + - grpc + - gpr_test_util + - gpr - name: census_context_test build: test language: c diff --git a/src/core/ext/filters/http/client/http_client_filter.c b/src/core/ext/filters/http/client/http_client_filter.c index 90f0aed7a0d6ef58f559446bc3fdc5d74b89122a..3ca01a41b5301546f4eb8319bd10109d73837449 100644 --- a/src/core/ext/filters/http/client/http_client_filter.c +++ b/src/core/ext/filters/http/client/http_client_filter.c @@ -36,41 +36,29 @@ static const size_t kMaxPayloadSizeForGet = 2048; typedef struct call_data { + // State for handling send_initial_metadata ops. grpc_linked_mdelem method; grpc_linked_mdelem scheme; grpc_linked_mdelem authority; grpc_linked_mdelem te_trailers; grpc_linked_mdelem content_type; grpc_linked_mdelem user_agent; - + // State for handling recv_initial_metadata ops. grpc_metadata_batch *recv_initial_metadata; + grpc_closure *original_recv_initial_metadata_ready; + grpc_closure recv_initial_metadata_ready; + // State for handling recv_trailing_metadata ops. grpc_metadata_batch *recv_trailing_metadata; - uint8_t *payload_bytes; - - /* Vars to read data off of send_message */ - grpc_transport_stream_op_batch *send_op; - uint32_t send_length; - uint32_t send_flags; - grpc_slice incoming_slice; - grpc_slice_buffer_stream replacement_stream; - grpc_slice_buffer slices; - /* flag that indicates that all slices of send_messages aren't availble */ - bool send_message_blocked; - - /** Closure to call when finished with the hc_on_recv hook */ - grpc_closure *on_done_recv_initial_metadata; - grpc_closure *on_done_recv_trailing_metadata; - grpc_closure *on_complete; - grpc_closure *post_send; - - /** Receive closures are chained: we inject this closure as the on_done_recv - up-call on transport_op, and remember to call our on_done_recv member - after handling it. */ - grpc_closure hc_on_recv_initial_metadata; - grpc_closure hc_on_recv_trailing_metadata; - grpc_closure hc_on_complete; - grpc_closure got_slice; - grpc_closure send_done; + grpc_closure *original_recv_trailing_metadata_on_complete; + grpc_closure recv_trailing_metadata_on_complete; + // State for handling send_message ops. + grpc_transport_stream_op_batch *send_message_batch; + size_t send_message_bytes_read; + grpc_byte_stream_cache send_message_cache; + grpc_caching_byte_stream send_message_caching_stream; + grpc_closure on_send_message_next_done; + grpc_closure *original_send_message_on_complete; + grpc_closure send_message_on_complete; } call_data; typedef struct channel_data { @@ -148,7 +136,7 @@ static grpc_error *client_filter_incoming_metadata(grpc_exec_ctx *exec_ctx, return GRPC_ERROR_NONE; } -static void hc_on_recv_initial_metadata(grpc_exec_ctx *exec_ctx, +static void recv_initial_metadata_ready(grpc_exec_ctx *exec_ctx, void *user_data, grpc_error *error) { grpc_call_element *elem = user_data; call_data *calld = elem->call_data; @@ -158,11 +146,13 @@ static void hc_on_recv_initial_metadata(grpc_exec_ctx *exec_ctx, } else { GRPC_ERROR_REF(error); } - GRPC_CLOSURE_RUN(exec_ctx, calld->on_done_recv_initial_metadata, error); + GRPC_CLOSURE_RUN(exec_ctx, calld->original_recv_initial_metadata_ready, + error); } -static void hc_on_recv_trailing_metadata(grpc_exec_ctx *exec_ctx, - void *user_data, grpc_error *error) { +static void recv_trailing_metadata_on_complete(grpc_exec_ctx *exec_ctx, + void *user_data, + grpc_error *error) { grpc_call_element *elem = user_data; call_data *calld = elem->call_data; if (error == GRPC_ERROR_NONE) { @@ -171,25 +161,131 @@ static void hc_on_recv_trailing_metadata(grpc_exec_ctx *exec_ctx, } else { GRPC_ERROR_REF(error); } - GRPC_CLOSURE_RUN(exec_ctx, calld->on_done_recv_trailing_metadata, error); + GRPC_CLOSURE_RUN(exec_ctx, calld->original_recv_trailing_metadata_on_complete, + error); } -static void hc_on_complete(grpc_exec_ctx *exec_ctx, void *user_data, - grpc_error *error) { - grpc_call_element *elem = user_data; - call_data *calld = elem->call_data; - if (calld->payload_bytes) { - gpr_free(calld->payload_bytes); - calld->payload_bytes = NULL; +static void send_message_on_complete(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + grpc_call_element *elem = (grpc_call_element *)arg; + call_data *calld = (call_data *)elem->call_data; + grpc_byte_stream_cache_destroy(exec_ctx, &calld->send_message_cache); + GRPC_CLOSURE_RUN(exec_ctx, calld->original_send_message_on_complete, + GRPC_ERROR_REF(error)); +} + +// Pulls a slice from the send_message byte stream, updating +// calld->send_message_bytes_read. +static grpc_error *pull_slice_from_send_message(grpc_exec_ctx *exec_ctx, + call_data *calld) { + grpc_slice incoming_slice; + grpc_error *error = grpc_byte_stream_pull( + exec_ctx, &calld->send_message_caching_stream.base, &incoming_slice); + if (error == GRPC_ERROR_NONE) { + calld->send_message_bytes_read += GRPC_SLICE_LENGTH(incoming_slice); + grpc_slice_unref_internal(exec_ctx, incoming_slice); } - calld->on_complete->cb(exec_ctx, calld->on_complete->cb_arg, error); + return error; } -static void send_done(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) { - grpc_call_element *elem = elemp; - call_data *calld = elem->call_data; - grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &calld->slices); - calld->post_send->cb(exec_ctx, calld->post_send->cb_arg, error); +// Reads as many slices as possible from the send_message byte stream. +// Upon successful return, if calld->send_message_bytes_read == +// calld->send_message_caching_stream.base.length, then we have completed +// reading from the byte stream; otherwise, an async read has been dispatched +// and on_send_message_next_done() will be invoked when it is complete. +static grpc_error *read_all_available_send_message_data(grpc_exec_ctx *exec_ctx, + call_data *calld) { + while (grpc_byte_stream_next(exec_ctx, + &calld->send_message_caching_stream.base, + ~(size_t)0, &calld->on_send_message_next_done)) { + grpc_error *error = pull_slice_from_send_message(exec_ctx, calld); + if (error != GRPC_ERROR_NONE) return error; + if (calld->send_message_bytes_read == + calld->send_message_caching_stream.base.length) { + break; + } + } + return GRPC_ERROR_NONE; +} + +// Async callback for grpc_byte_stream_next(). +static void on_send_message_next_done(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + grpc_call_element *elem = (grpc_call_element *)arg; + call_data *calld = (call_data *)elem->call_data; + if (error != GRPC_ERROR_NONE) { + grpc_transport_stream_op_batch_finish_with_failure( + exec_ctx, calld->send_message_batch, error); + return; + } + error = pull_slice_from_send_message(exec_ctx, calld); + if (error != GRPC_ERROR_NONE) { + grpc_transport_stream_op_batch_finish_with_failure( + exec_ctx, calld->send_message_batch, error); + return; + } + // There may or may not be more to read, but we don't care. If we got + // here, then we know that all of the data was not available + // synchronously, so we were not able to do a cached call. Instead, + // we just reset the byte stream and then send down the batch as-is. + grpc_caching_byte_stream_reset(&calld->send_message_caching_stream); + grpc_call_next_op(exec_ctx, elem, calld->send_message_batch); +} + +static char *slice_buffer_to_string(grpc_slice_buffer *slice_buffer) { + char *payload_bytes = gpr_malloc(slice_buffer->length + 1); + size_t offset = 0; + for (size_t i = 0; i < slice_buffer->count; ++i) { + memcpy(payload_bytes + offset, + GRPC_SLICE_START_PTR(slice_buffer->slices[i]), + GRPC_SLICE_LENGTH(slice_buffer->slices[i])); + offset += GRPC_SLICE_LENGTH(slice_buffer->slices[i]); + } + *(payload_bytes + offset) = '\0'; + return payload_bytes; +} + +// Modifies the path entry in the batch's send_initial_metadata to +// append the base64-encoded query for a GET request. +static grpc_error *update_path_for_get(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, + grpc_transport_stream_op_batch *batch) { + call_data *calld = (call_data *)elem->call_data; + grpc_slice path_slice = + GRPC_MDVALUE(batch->payload->send_initial_metadata.send_initial_metadata + ->idx.named.path->md); + /* sum up individual component's lengths and allocate enough memory to + * hold combined path+query */ + size_t estimated_len = GRPC_SLICE_LENGTH(path_slice); + estimated_len++; /* for the '?' */ + estimated_len += grpc_base64_estimate_encoded_size( + batch->payload->send_message.send_message->length, true /* url_safe */, + false /* multi_line */); + grpc_slice path_with_query_slice = GRPC_SLICE_MALLOC(estimated_len); + /* memcopy individual pieces into this slice */ + char *write_ptr = (char *)GRPC_SLICE_START_PTR(path_with_query_slice); + char *original_path = (char *)GRPC_SLICE_START_PTR(path_slice); + memcpy(write_ptr, original_path, GRPC_SLICE_LENGTH(path_slice)); + write_ptr += GRPC_SLICE_LENGTH(path_slice); + *write_ptr++ = '?'; + char *payload_bytes = + slice_buffer_to_string(&calld->send_message_cache.cache_buffer); + grpc_base64_encode_core((char *)write_ptr, payload_bytes, + batch->payload->send_message.send_message->length, + true /* url_safe */, false /* multi_line */); + gpr_free(payload_bytes); + /* remove trailing unused memory and add trailing 0 to terminate string */ + char *t = (char *)GRPC_SLICE_START_PTR(path_with_query_slice); + /* safe to use strlen since base64_encode will always add '\0' */ + path_with_query_slice = + grpc_slice_sub_no_ref(path_with_query_slice, 0, strlen(t)); + /* substitute previous path with the new path+query */ + grpc_mdelem mdelem_path_and_query = + grpc_mdelem_from_slices(exec_ctx, GRPC_MDSTR_PATH, path_with_query_slice); + grpc_metadata_batch *b = + batch->payload->send_initial_metadata.send_initial_metadata; + return grpc_metadata_batch_substitute(exec_ctx, b, b->idx.named.path, + mdelem_path_and_query); } static void remove_if_present(grpc_exec_ctx *exec_ctx, @@ -200,273 +296,153 @@ static void remove_if_present(grpc_exec_ctx *exec_ctx, } } -static void continue_send_message(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem) { +static void hc_start_transport_stream_op_batch( + grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + grpc_transport_stream_op_batch *batch) { call_data *calld = elem->call_data; - uint8_t *wrptr = calld->payload_bytes; - while (grpc_byte_stream_next( - exec_ctx, calld->send_op->payload->send_message.send_message, ~(size_t)0, - &calld->got_slice)) { - grpc_byte_stream_pull(exec_ctx, - calld->send_op->payload->send_message.send_message, - &calld->incoming_slice); - if (GRPC_SLICE_LENGTH(calld->incoming_slice) > 0) { - memcpy(wrptr, GRPC_SLICE_START_PTR(calld->incoming_slice), - GRPC_SLICE_LENGTH(calld->incoming_slice)); - } - wrptr += GRPC_SLICE_LENGTH(calld->incoming_slice); - grpc_slice_buffer_add(&calld->slices, calld->incoming_slice); - if (calld->send_length == calld->slices.length) { - calld->send_message_blocked = false; - break; - } - } -} + channel_data *channeld = elem->channel_data; + GPR_TIMER_BEGIN("hc_start_transport_stream_op_batch", 0); + GRPC_CALL_LOG_OP(GPR_INFO, elem, batch); -static void got_slice(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) { - grpc_call_element *elem = elemp; - call_data *calld = elem->call_data; - calld->send_message_blocked = false; - if (GRPC_ERROR_NONE != - grpc_byte_stream_pull(exec_ctx, - calld->send_op->payload->send_message.send_message, - &calld->incoming_slice)) { - /* Should never reach here */ - abort(); - } - grpc_slice_buffer_add(&calld->slices, calld->incoming_slice); - if (calld->send_length == calld->slices.length) { - /* Pass down the original send_message op that was blocked.*/ - grpc_slice_buffer_stream_init(&calld->replacement_stream, &calld->slices, - calld->send_flags); - calld->send_op->payload->send_message.send_message = - &calld->replacement_stream.base; - calld->post_send = calld->send_op->on_complete; - calld->send_op->on_complete = &calld->send_done; - grpc_call_next_op(exec_ctx, elem, calld->send_op); - } else { - continue_send_message(exec_ctx, elem); + if (batch->recv_initial_metadata) { + /* substitute our callback for the higher callback */ + calld->recv_initial_metadata = + batch->payload->recv_initial_metadata.recv_initial_metadata; + calld->original_recv_initial_metadata_ready = + batch->payload->recv_initial_metadata.recv_initial_metadata_ready; + batch->payload->recv_initial_metadata.recv_initial_metadata_ready = + &calld->recv_initial_metadata_ready; } -} -static grpc_error *hc_mutate_op(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem, - grpc_transport_stream_op_batch *op) { - /* grab pointers to our data from the call element */ - call_data *calld = elem->call_data; - channel_data *channeld = elem->channel_data; - grpc_error *error; + if (batch->recv_trailing_metadata) { + /* substitute our callback for the higher callback */ + calld->recv_trailing_metadata = + batch->payload->recv_trailing_metadata.recv_trailing_metadata; + calld->original_recv_trailing_metadata_on_complete = batch->on_complete; + batch->on_complete = &calld->recv_trailing_metadata_on_complete; + } - if (op->send_initial_metadata) { - /* Decide which HTTP VERB to use. We use GET if the request is marked - cacheable, and the operation contains both initial metadata and send - message, and the payload is below the size threshold, and all the data - for this request is immediately available. */ + grpc_error *error = GRPC_ERROR_NONE; + bool batch_will_be_handled_asynchronously = false; + if (batch->send_initial_metadata) { + // Decide which HTTP VERB to use. We use GET if the request is marked + // cacheable, and the operation contains both initial metadata and send + // message, and the payload is below the size threshold, and all the data + // for this request is immediately available. grpc_mdelem method = GRPC_MDELEM_METHOD_POST; - if (op->send_message && - (op->payload->send_initial_metadata.send_initial_metadata_flags & + if (batch->send_message && + (batch->payload->send_initial_metadata.send_initial_metadata_flags & GRPC_INITIAL_METADATA_CACHEABLE_REQUEST) && - op->payload->send_message.send_message->length < + batch->payload->send_message.send_message->length < channeld->max_payload_size_for_get) { - method = GRPC_MDELEM_METHOD_GET; - /* The following write to calld->send_message_blocked isn't racy with - reads in hc_start_transport_op (which deals with SEND_MESSAGE ops) because - being here means ops->send_message is not NULL, which is primarily - guarding the read there. */ - calld->send_message_blocked = true; - } else if (op->payload->send_initial_metadata.send_initial_metadata_flags & - GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) { - method = GRPC_MDELEM_METHOD_PUT; - } - - /* Attempt to read the data from send_message and create a header field. */ - if (grpc_mdelem_eq(method, GRPC_MDELEM_METHOD_GET)) { - /* allocate memory to hold the entire payload */ - calld->payload_bytes = - gpr_malloc(op->payload->send_message.send_message->length); - - /* read slices of send_message and copy into payload_bytes */ - calld->send_op = op; - calld->send_length = op->payload->send_message.send_message->length; - calld->send_flags = op->payload->send_message.send_message->flags; - continue_send_message(exec_ctx, elem); - - if (calld->send_message_blocked == false) { - /* when all the send_message data is available, then modify the path - * MDELEM by appending base64 encoded query to the path */ - const int k_url_safe = 1; - const int k_multi_line = 0; - const unsigned char k_query_separator = '?'; - - grpc_slice path_slice = - GRPC_MDVALUE(op->payload->send_initial_metadata - .send_initial_metadata->idx.named.path->md); - /* sum up individual component's lengths and allocate enough memory to - * hold combined path+query */ - size_t estimated_len = GRPC_SLICE_LENGTH(path_slice); - estimated_len++; /* for the '?' */ - estimated_len += grpc_base64_estimate_encoded_size( - op->payload->send_message.send_message->length, k_url_safe, - k_multi_line); - grpc_slice path_with_query_slice = GRPC_SLICE_MALLOC(estimated_len); - - /* memcopy individual pieces into this slice */ - uint8_t *write_ptr = - (uint8_t *)GRPC_SLICE_START_PTR(path_with_query_slice); - uint8_t *original_path = (uint8_t *)GRPC_SLICE_START_PTR(path_slice); - memcpy(write_ptr, original_path, GRPC_SLICE_LENGTH(path_slice)); - write_ptr += GRPC_SLICE_LENGTH(path_slice); - - *write_ptr = k_query_separator; - write_ptr++; /* for the '?' */ - - grpc_base64_encode_core((char *)write_ptr, calld->payload_bytes, - op->payload->send_message.send_message->length, - k_url_safe, k_multi_line); - - /* remove trailing unused memory and add trailing 0 to terminate string - */ - char *t = (char *)GRPC_SLICE_START_PTR(path_with_query_slice); - /* safe to use strlen since base64_encode will always add '\0' */ - path_with_query_slice = - grpc_slice_sub_no_ref(path_with_query_slice, 0, strlen(t)); - - /* substitute previous path with the new path+query */ - grpc_mdelem mdelem_path_and_query = grpc_mdelem_from_slices( - exec_ctx, GRPC_MDSTR_PATH, path_with_query_slice); - grpc_metadata_batch *b = - op->payload->send_initial_metadata.send_initial_metadata; - error = grpc_metadata_batch_substitute(exec_ctx, b, b->idx.named.path, - mdelem_path_and_query); - if (error != GRPC_ERROR_NONE) return error; - - calld->on_complete = op->on_complete; - op->on_complete = &calld->hc_on_complete; - op->send_message = false; + calld->send_message_bytes_read = 0; + grpc_byte_stream_cache_init(&calld->send_message_cache, + batch->payload->send_message.send_message); + grpc_caching_byte_stream_init(&calld->send_message_caching_stream, + &calld->send_message_cache); + batch->payload->send_message.send_message = + &calld->send_message_caching_stream.base; + calld->original_send_message_on_complete = batch->on_complete; + batch->on_complete = &calld->send_message_on_complete; + calld->send_message_batch = batch; + error = read_all_available_send_message_data(exec_ctx, calld); + if (error != GRPC_ERROR_NONE) goto done; + // If all the data has been read, then we can use GET. + if (calld->send_message_bytes_read == + calld->send_message_caching_stream.base.length) { + method = GRPC_MDELEM_METHOD_GET; + error = update_path_for_get(exec_ctx, elem, batch); + if (error != GRPC_ERROR_NONE) goto done; + batch->send_message = false; + grpc_byte_stream_destroy(exec_ctx, + &calld->send_message_caching_stream.base); } else { - /* Not all data is available. Fall back to POST. */ + // Not all data is available. The batch will be sent down + // asynchronously in on_send_message_next_done(). + batch_will_be_handled_asynchronously = true; + // Fall back to POST. gpr_log(GPR_DEBUG, - "Request is marked Cacheable but not all data is available.\ - Falling back to POST"); - method = GRPC_MDELEM_METHOD_POST; + "Request is marked Cacheable but not all data is available. " + "Falling back to POST"); } + } else if (batch->payload->send_initial_metadata + .send_initial_metadata_flags & + GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) { + method = GRPC_MDELEM_METHOD_PUT; } - remove_if_present(exec_ctx, - op->payload->send_initial_metadata.send_initial_metadata, - GRPC_BATCH_METHOD); - remove_if_present(exec_ctx, - op->payload->send_initial_metadata.send_initial_metadata, - GRPC_BATCH_SCHEME); - remove_if_present(exec_ctx, - op->payload->send_initial_metadata.send_initial_metadata, - GRPC_BATCH_TE); - remove_if_present(exec_ctx, - op->payload->send_initial_metadata.send_initial_metadata, - GRPC_BATCH_CONTENT_TYPE); - remove_if_present(exec_ctx, - op->payload->send_initial_metadata.send_initial_metadata, - GRPC_BATCH_USER_AGENT); + remove_if_present( + exec_ctx, batch->payload->send_initial_metadata.send_initial_metadata, + GRPC_BATCH_METHOD); + remove_if_present( + exec_ctx, batch->payload->send_initial_metadata.send_initial_metadata, + GRPC_BATCH_SCHEME); + remove_if_present( + exec_ctx, batch->payload->send_initial_metadata.send_initial_metadata, + GRPC_BATCH_TE); + remove_if_present( + exec_ctx, batch->payload->send_initial_metadata.send_initial_metadata, + GRPC_BATCH_CONTENT_TYPE); + remove_if_present( + exec_ctx, batch->payload->send_initial_metadata.send_initial_metadata, + GRPC_BATCH_USER_AGENT); /* Send : prefixed headers, which have to be before any application layer headers. */ error = grpc_metadata_batch_add_head( - exec_ctx, op->payload->send_initial_metadata.send_initial_metadata, + exec_ctx, batch->payload->send_initial_metadata.send_initial_metadata, &calld->method, method); - if (error != GRPC_ERROR_NONE) return error; + if (error != GRPC_ERROR_NONE) goto done; error = grpc_metadata_batch_add_head( - exec_ctx, op->payload->send_initial_metadata.send_initial_metadata, + exec_ctx, batch->payload->send_initial_metadata.send_initial_metadata, &calld->scheme, channeld->static_scheme); - if (error != GRPC_ERROR_NONE) return error; + if (error != GRPC_ERROR_NONE) goto done; error = grpc_metadata_batch_add_tail( - exec_ctx, op->payload->send_initial_metadata.send_initial_metadata, + exec_ctx, batch->payload->send_initial_metadata.send_initial_metadata, &calld->te_trailers, GRPC_MDELEM_TE_TRAILERS); - if (error != GRPC_ERROR_NONE) return error; + if (error != GRPC_ERROR_NONE) goto done; error = grpc_metadata_batch_add_tail( - exec_ctx, op->payload->send_initial_metadata.send_initial_metadata, + exec_ctx, batch->payload->send_initial_metadata.send_initial_metadata, &calld->content_type, GRPC_MDELEM_CONTENT_TYPE_APPLICATION_SLASH_GRPC); - if (error != GRPC_ERROR_NONE) return error; + if (error != GRPC_ERROR_NONE) goto done; error = grpc_metadata_batch_add_tail( - exec_ctx, op->payload->send_initial_metadata.send_initial_metadata, + exec_ctx, batch->payload->send_initial_metadata.send_initial_metadata, &calld->user_agent, GRPC_MDELEM_REF(channeld->user_agent)); - if (error != GRPC_ERROR_NONE) return error; + if (error != GRPC_ERROR_NONE) goto done; } - if (op->recv_initial_metadata) { - /* substitute our callback for the higher callback */ - calld->recv_initial_metadata = - op->payload->recv_initial_metadata.recv_initial_metadata; - calld->on_done_recv_initial_metadata = - op->payload->recv_initial_metadata.recv_initial_metadata_ready; - op->payload->recv_initial_metadata.recv_initial_metadata_ready = - &calld->hc_on_recv_initial_metadata; - } - - if (op->recv_trailing_metadata) { - /* substitute our callback for the higher callback */ - calld->recv_trailing_metadata = - op->payload->recv_trailing_metadata.recv_trailing_metadata; - calld->on_done_recv_trailing_metadata = op->on_complete; - op->on_complete = &calld->hc_on_recv_trailing_metadata; - } - - return GRPC_ERROR_NONE; -} - -static void hc_start_transport_op(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem, - grpc_transport_stream_op_batch *op) { - GPR_TIMER_BEGIN("hc_start_transport_op", 0); - GRPC_CALL_LOG_OP(GPR_INFO, elem, op); - grpc_error *error = hc_mutate_op(exec_ctx, elem, op); +done: if (error != GRPC_ERROR_NONE) { - grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error); - } else { - call_data *calld = elem->call_data; - if (op->send_message && calld->send_message_blocked) { - /* Don't forward the op. send_message contains slices that aren't ready - yet. The call will be forwarded by the op_complete of slice read call. - */ - } else { - grpc_call_next_op(exec_ctx, elem, op); - } + grpc_transport_stream_op_batch_finish_with_failure( + exec_ctx, calld->send_message_batch, error); + } else if (!batch_will_be_handled_asynchronously) { + grpc_call_next_op(exec_ctx, elem, batch); } - GPR_TIMER_END("hc_start_transport_op", 0); + GPR_TIMER_END("hc_start_transport_stream_op_batch", 0); } /* Constructor for call_data */ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, const grpc_call_element_args *args) { - call_data *calld = elem->call_data; - calld->on_done_recv_initial_metadata = NULL; - calld->on_done_recv_trailing_metadata = NULL; - calld->on_complete = NULL; - calld->payload_bytes = NULL; - calld->send_message_blocked = false; - grpc_slice_buffer_init(&calld->slices); - GRPC_CLOSURE_INIT(&calld->hc_on_recv_initial_metadata, - hc_on_recv_initial_metadata, elem, - grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_INIT(&calld->hc_on_recv_trailing_metadata, - hc_on_recv_trailing_metadata, elem, + call_data *calld = (call_data *)elem->call_data; + GRPC_CLOSURE_INIT(&calld->recv_initial_metadata_ready, + recv_initial_metadata_ready, elem, grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_INIT(&calld->hc_on_complete, hc_on_complete, elem, - grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_INIT(&calld->got_slice, got_slice, elem, - grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_INIT(&calld->send_done, send_done, elem, + GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_on_complete, + recv_trailing_metadata_on_complete, elem, grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT(&calld->send_message_on_complete, send_message_on_complete, + elem, grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT(&calld->on_send_message_next_done, + on_send_message_next_done, elem, grpc_schedule_on_exec_ctx); return GRPC_ERROR_NONE; } /* Destructor for call_data */ static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, const grpc_call_final_info *final_info, - grpc_closure *ignored) { - call_data *calld = elem->call_data; - grpc_slice_buffer_destroy_internal(exec_ctx, &calld->slices); -} + grpc_closure *ignored) {} static grpc_mdelem scheme_from_args(const grpc_channel_args *args) { unsigned i; @@ -580,7 +556,7 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx, } const grpc_channel_filter grpc_http_client_filter = { - hc_start_transport_op, + hc_start_transport_stream_op_batch, grpc_channel_next_op, sizeof(call_data), init_call_elem, diff --git a/src/core/ext/filters/http/message_compress/message_compress_filter.c b/src/core/ext/filters/http/message_compress/message_compress_filter.c index 71a8bc5bec2db880a3981779f5190941db28f81a..20a3488115e316c35468e7301858edb54555dc5a 100644 --- a/src/core/ext/filters/http/message_compress/message_compress_filter.c +++ b/src/core/ext/filters/http/message_compress/message_compress_filter.c @@ -61,14 +61,11 @@ typedef struct call_data { pointer | CANCELLED_BIT - request was cancelled with error pointed to */ gpr_atm send_initial_metadata_state; - grpc_transport_stream_op_batch *send_op; - uint32_t send_length; - uint32_t send_flags; - grpc_slice incoming_slice; + grpc_transport_stream_op_batch *send_message_batch; grpc_slice_buffer_stream replacement_stream; - grpc_closure *post_send; - grpc_closure send_done; - grpc_closure got_slice; + grpc_closure *original_send_message_on_complete; + grpc_closure send_message_on_complete; + grpc_closure on_send_message_next_done; } call_data; typedef struct channel_data { @@ -164,24 +161,25 @@ static grpc_error *process_send_initial_metadata( return error; } -static void continue_send_message(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem); - -static void send_done(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) { - grpc_call_element *elem = elemp; - call_data *calld = elem->call_data; +static void send_message_on_complete(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + grpc_call_element *elem = (grpc_call_element *)arg; + call_data *calld = (call_data *)elem->call_data; grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &calld->slices); - calld->post_send->cb(exec_ctx, calld->post_send->cb_arg, error); + GRPC_CLOSURE_RUN(exec_ctx, calld->original_send_message_on_complete, + GRPC_ERROR_REF(error)); } static void finish_send_message(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { - call_data *calld = elem->call_data; - int did_compress; + call_data *calld = (call_data *)elem->call_data; + // Compress the data if appropriate. grpc_slice_buffer tmp; grpc_slice_buffer_init(&tmp); - did_compress = grpc_msg_compress(exec_ctx, calld->compression_algorithm, - &calld->slices, &tmp); + uint32_t send_flags = + calld->send_message_batch->payload->send_message.send_message->flags; + const bool did_compress = grpc_msg_compress( + exec_ctx, calld->compression_algorithm, &calld->slices, &tmp); if (did_compress) { if (GRPC_TRACER_ON(grpc_compression_trace)) { char *algo_name; @@ -195,7 +193,7 @@ static void finish_send_message(grpc_exec_ctx *exec_ctx, algo_name, before_size, after_size, 100 * savings_ratio); } grpc_slice_buffer_swap(&calld->slices, &tmp); - calld->send_flags |= GRPC_WRITE_INTERNAL_COMPRESS; + send_flags |= GRPC_WRITE_INTERNAL_COMPRESS; } else { if (GRPC_TRACER_ON(grpc_compression_trace)) { char *algo_name; @@ -207,83 +205,118 @@ static void finish_send_message(grpc_exec_ctx *exec_ctx, algo_name, calld->slices.length); } } - grpc_slice_buffer_destroy_internal(exec_ctx, &tmp); - + // Swap out the original byte stream with our new one and send the + // batch down. + grpc_byte_stream_destroy( + exec_ctx, calld->send_message_batch->payload->send_message.send_message); grpc_slice_buffer_stream_init(&calld->replacement_stream, &calld->slices, - calld->send_flags); - calld->send_op->payload->send_message.send_message = + send_flags); + calld->send_message_batch->payload->send_message.send_message = &calld->replacement_stream.base; - calld->post_send = calld->send_op->on_complete; - calld->send_op->on_complete = &calld->send_done; - - grpc_call_next_op(exec_ctx, elem, calld->send_op); + calld->original_send_message_on_complete = + calld->send_message_batch->on_complete; + calld->send_message_batch->on_complete = &calld->send_message_on_complete; + grpc_call_next_op(exec_ctx, elem, calld->send_message_batch); } -static void got_slice(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) { - grpc_call_element *elem = elemp; - call_data *calld = elem->call_data; - if (GRPC_ERROR_NONE != - grpc_byte_stream_pull(exec_ctx, - calld->send_op->payload->send_message.send_message, - &calld->incoming_slice)) { - /* Should never reach here */ - abort(); - } - grpc_slice_buffer_add(&calld->slices, calld->incoming_slice); - if (calld->send_length == calld->slices.length) { - finish_send_message(exec_ctx, elem); - } else { - continue_send_message(exec_ctx, elem); +// Pulls a slice from the send_message byte stream and adds it to calld->slices. +static grpc_error *pull_slice_from_send_message(grpc_exec_ctx *exec_ctx, + call_data *calld) { + grpc_slice incoming_slice; + grpc_error *error = grpc_byte_stream_pull( + exec_ctx, calld->send_message_batch->payload->send_message.send_message, + &incoming_slice); + if (error == GRPC_ERROR_NONE) { + grpc_slice_buffer_add(&calld->slices, incoming_slice); } + return error; } -static void continue_send_message(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem) { - call_data *calld = elem->call_data; +// Reads as many slices as possible from the send_message byte stream. +// If all data has been read, invokes finish_send_message(). Otherwise, +// an async call to grpc_byte_stream_next() has been started, which will +// eventually result in calling on_send_message_next_done(). +static grpc_error *continue_reading_send_message(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem) { + call_data *calld = (call_data *)elem->call_data; while (grpc_byte_stream_next( - exec_ctx, calld->send_op->payload->send_message.send_message, ~(size_t)0, - &calld->got_slice)) { - grpc_byte_stream_pull(exec_ctx, - calld->send_op->payload->send_message.send_message, - &calld->incoming_slice); - grpc_slice_buffer_add(&calld->slices, calld->incoming_slice); - if (calld->send_length == calld->slices.length) { + exec_ctx, calld->send_message_batch->payload->send_message.send_message, + ~(size_t)0, &calld->on_send_message_next_done)) { + grpc_error *error = pull_slice_from_send_message(exec_ctx, calld); + if (error != GRPC_ERROR_NONE) return error; + if (calld->slices.length == + calld->send_message_batch->payload->send_message.send_message->length) { finish_send_message(exec_ctx, elem); break; } } + return GRPC_ERROR_NONE; } -static void handle_send_message_batch(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem, - grpc_transport_stream_op_batch *op, - bool has_compression_algorithm) { - call_data *calld = elem->call_data; - if (!skip_compression(elem, op->payload->send_message.send_message->flags, +// Async callback for grpc_byte_stream_next(). +static void on_send_message_next_done(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + grpc_call_element *elem = (grpc_call_element *)arg; + call_data *calld = (call_data *)elem->call_data; + if (error != GRPC_ERROR_NONE) goto fail; + error = pull_slice_from_send_message(exec_ctx, calld); + if (error != GRPC_ERROR_NONE) goto fail; + if (calld->slices.length == + calld->send_message_batch->payload->send_message.send_message->length) { + finish_send_message(exec_ctx, elem); + } else { + // This will either finish reading all of the data and invoke + // finish_send_message(), or else it will make an async call to + // grpc_byte_stream_next(), which will eventually result in calling + // this function again. + error = continue_reading_send_message(exec_ctx, elem); + if (error != GRPC_ERROR_NONE) goto fail; + } + return; +fail: + grpc_transport_stream_op_batch_finish_with_failure( + exec_ctx, calld->send_message_batch, error); +} + +static void start_send_message_batch(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, + grpc_transport_stream_op_batch *batch, + bool has_compression_algorithm) { + call_data *calld = (call_data *)elem->call_data; + if (!skip_compression(elem, batch->payload->send_message.send_message->flags, has_compression_algorithm)) { - calld->send_op = op; - calld->send_length = op->payload->send_message.send_message->length; - calld->send_flags = op->payload->send_message.send_message->flags; - continue_send_message(exec_ctx, elem); + calld->send_message_batch = batch; + // This will either finish reading all of the data and invoke + // finish_send_message(), or else it will make an async call to + // grpc_byte_stream_next(), which will eventually result in calling + // on_send_message_next_done(). + grpc_error *error = continue_reading_send_message(exec_ctx, elem); + if (error != GRPC_ERROR_NONE) { + grpc_transport_stream_op_batch_finish_with_failure( + exec_ctx, calld->send_message_batch, error); + } } else { /* pass control down the stack */ - grpc_call_next_op(exec_ctx, elem, op); + grpc_call_next_op(exec_ctx, elem, batch); } } static void compress_start_transport_stream_op_batch( grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - grpc_transport_stream_op_batch *op) { + grpc_transport_stream_op_batch *batch) { call_data *calld = elem->call_data; GPR_TIMER_BEGIN("compress_start_transport_stream_op_batch", 0); - if (op->cancel_stream) { - GRPC_ERROR_REF(op->payload->cancel_stream.cancel_error); + if (batch->cancel_stream) { + // TODO(roth): As part of the upcoming call combiner work, change + // this to call grpc_byte_stream_shutdown() on the incoming byte + // stream, to cancel any in-flight calls to grpc_byte_stream_next(). + GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error); gpr_atm cur = gpr_atm_full_xchg( &calld->send_initial_metadata_state, - CANCELLED_BIT | (gpr_atm)op->payload->cancel_stream.cancel_error); + CANCELLED_BIT | (gpr_atm)batch->payload->cancel_stream.cancel_error); switch (cur) { case HAS_COMPRESSION_ALGORITHM: case NO_COMPRESSION_ALGORITHM: @@ -293,7 +326,7 @@ static void compress_start_transport_stream_op_batch( if ((cur & CANCELLED_BIT) == 0) { grpc_transport_stream_op_batch_finish_with_failure( exec_ctx, (grpc_transport_stream_op_batch *)cur, - GRPC_ERROR_REF(op->payload->cancel_stream.cancel_error)); + GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error)); } else { GRPC_ERROR_UNREF((grpc_error *)(cur & ~CANCELLED_BIT)); } @@ -301,14 +334,15 @@ static void compress_start_transport_stream_op_batch( } } - if (op->send_initial_metadata) { + if (batch->send_initial_metadata) { bool has_compression_algorithm; grpc_error *error = process_send_initial_metadata( exec_ctx, elem, - op->payload->send_initial_metadata.send_initial_metadata, + batch->payload->send_initial_metadata.send_initial_metadata, &has_compression_algorithm); if (error != GRPC_ERROR_NONE) { - grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error); + grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, batch, + error); return; } gpr_atm cur; @@ -324,32 +358,32 @@ static void compress_start_transport_stream_op_batch( goto retry_send_im; } if (cur != INITIAL_METADATA_UNSEEN) { - handle_send_message_batch(exec_ctx, elem, - (grpc_transport_stream_op_batch *)cur, - has_compression_algorithm); + start_send_message_batch(exec_ctx, elem, + (grpc_transport_stream_op_batch *)cur, + has_compression_algorithm); } } } - if (op->send_message) { + if (batch->send_message) { gpr_atm cur; retry_send: cur = gpr_atm_acq_load(&calld->send_initial_metadata_state); switch (cur) { case INITIAL_METADATA_UNSEEN: if (!gpr_atm_rel_cas(&calld->send_initial_metadata_state, cur, - (gpr_atm)op)) { + (gpr_atm)batch)) { goto retry_send; } break; case HAS_COMPRESSION_ALGORITHM: case NO_COMPRESSION_ALGORITHM: - handle_send_message_batch(exec_ctx, elem, op, - cur == HAS_COMPRESSION_ALGORITHM); + start_send_message_batch(exec_ctx, elem, batch, + cur == HAS_COMPRESSION_ALGORITHM); break; default: if (cur & CANCELLED_BIT) { grpc_transport_stream_op_batch_finish_with_failure( - exec_ctx, op, + exec_ctx, batch, GRPC_ERROR_REF((grpc_error *)(cur & ~CANCELLED_BIT))); } else { /* >1 send_message concurrently */ @@ -358,7 +392,7 @@ static void compress_start_transport_stream_op_batch( } } else { /* pass control down the stack */ - grpc_call_next_op(exec_ctx, elem, op); + grpc_call_next_op(exec_ctx, elem, batch); } GPR_TIMER_END("compress_start_transport_stream_op_batch", 0); @@ -373,10 +407,10 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, /* initialize members */ grpc_slice_buffer_init(&calld->slices); - GRPC_CLOSURE_INIT(&calld->got_slice, got_slice, elem, - grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_INIT(&calld->send_done, send_done, elem, - grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT(&calld->on_send_message_next_done, + on_send_message_next_done, elem, grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT(&calld->send_message_on_complete, send_message_on_complete, + elem, grpc_schedule_on_exec_ctx); return GRPC_ERROR_NONE; } diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index 731ebf400f75bdf7c2ae63b3d7665f2920606fbb..b1a76988110794fab2ec7c128370c59ad56b8617 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -1173,6 +1173,7 @@ static void continue_fetching_send_locked(grpc_exec_ctx *exec_ctx, return; /* early out */ } if (s->fetched_send_message_length == s->fetching_send_message->length) { + grpc_byte_stream_destroy(exec_ctx, s->fetching_send_message); int64_t notify_offset = s->next_message_end_offset; if (notify_offset <= s->flow_controlled_bytes_written) { grpc_chttp2_complete_closure_step( @@ -1195,9 +1196,14 @@ static void continue_fetching_send_locked(grpc_exec_ctx *exec_ctx, return; /* early out */ } else if (grpc_byte_stream_next(exec_ctx, s->fetching_send_message, UINT32_MAX, &s->complete_fetch_locked)) { - grpc_byte_stream_pull(exec_ctx, s->fetching_send_message, - &s->fetching_slice); - add_fetched_slice_locked(exec_ctx, t, s); + grpc_error *error = grpc_byte_stream_pull( + exec_ctx, s->fetching_send_message, &s->fetching_slice); + if (error != GRPC_ERROR_NONE) { + grpc_byte_stream_destroy(exec_ctx, s->fetching_send_message); + grpc_chttp2_cancel_stream(exec_ctx, t, s, error); + } else { + add_fetched_slice_locked(exec_ctx, t, s); + } } } } @@ -1214,10 +1220,9 @@ static void complete_fetch_locked(grpc_exec_ctx *exec_ctx, void *gs, continue_fetching_send_locked(exec_ctx, t, s); } } - if (error != GRPC_ERROR_NONE) { - /* TODO(ctiller): what to do here */ - abort(); + grpc_byte_stream_destroy(exec_ctx, s->fetching_send_message); + grpc_chttp2_cancel_stream(exec_ctx, t, s, error); } } @@ -2686,22 +2691,9 @@ static grpc_error *incoming_byte_stream_pull(grpc_exec_ctx *exec_ctx, return GRPC_ERROR_NONE; } -static void incoming_byte_stream_destroy(grpc_exec_ctx *exec_ctx, - grpc_byte_stream *byte_stream); - static void incoming_byte_stream_destroy_locked(grpc_exec_ctx *exec_ctx, void *byte_stream, - grpc_error *error_ignored) { - grpc_chttp2_incoming_byte_stream *bs = byte_stream; - grpc_chttp2_stream *s = bs->stream; - grpc_chttp2_transport *t = s->t; - - GPR_ASSERT(bs->base.destroy == incoming_byte_stream_destroy); - incoming_byte_stream_unref(exec_ctx, bs); - s->pending_byte_stream = false; - grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s); - grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, t, s); -} + grpc_error *error_ignored); static void incoming_byte_stream_destroy(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream) { @@ -2768,6 +2760,33 @@ grpc_error *grpc_chttp2_incoming_byte_stream_finished( return error; } +static void incoming_byte_stream_shutdown(grpc_exec_ctx *exec_ctx, + grpc_byte_stream *byte_stream, + grpc_error *error) { + grpc_chttp2_incoming_byte_stream *bs = + (grpc_chttp2_incoming_byte_stream *)byte_stream; + GRPC_ERROR_UNREF(grpc_chttp2_incoming_byte_stream_finished( + exec_ctx, bs, error, true /* reset_on_error */)); +} + +static const grpc_byte_stream_vtable grpc_chttp2_incoming_byte_stream_vtable = { + incoming_byte_stream_next, incoming_byte_stream_pull, + incoming_byte_stream_shutdown, incoming_byte_stream_destroy}; + +static void incoming_byte_stream_destroy_locked(grpc_exec_ctx *exec_ctx, + void *byte_stream, + grpc_error *error_ignored) { + grpc_chttp2_incoming_byte_stream *bs = byte_stream; + grpc_chttp2_stream *s = bs->stream; + grpc_chttp2_transport *t = s->t; + + GPR_ASSERT(bs->base.vtable == &grpc_chttp2_incoming_byte_stream_vtable); + incoming_byte_stream_unref(exec_ctx, bs); + s->pending_byte_stream = false; + grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s); + grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, t, s); +} + grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create( grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s, uint32_t frame_size, uint32_t flags) { @@ -2776,9 +2795,7 @@ grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create( incoming_byte_stream->base.length = frame_size; incoming_byte_stream->remaining_bytes = frame_size; incoming_byte_stream->base.flags = flags; - incoming_byte_stream->base.next = incoming_byte_stream_next; - incoming_byte_stream->base.pull = incoming_byte_stream_pull; - incoming_byte_stream->base.destroy = incoming_byte_stream_destroy; + incoming_byte_stream->base.vtable = &grpc_chttp2_incoming_byte_stream_vtable; gpr_ref_init(&incoming_byte_stream->refs, 2); incoming_byte_stream->transport = t; incoming_byte_stream->stream = s; diff --git a/src/core/ext/transport/inproc/inproc_transport.c b/src/core/ext/transport/inproc/inproc_transport.c index 14498021ebdd0063ba15841474db961d947a9174..6f4b429ee2f6f8ae22c4abbece622546b8d08bac 100644 --- a/src/core/ext/transport/inproc/inproc_transport.c +++ b/src/core/ext/transport/inproc/inproc_transport.c @@ -72,6 +72,7 @@ typedef struct sb_list_entry { typedef struct { grpc_byte_stream base; sb_list_entry *le; + grpc_error *shutdown_error; } inproc_slice_byte_stream; typedef struct { @@ -201,24 +202,39 @@ static grpc_error *inproc_slice_byte_stream_pull(grpc_exec_ctx *exec_ctx, grpc_byte_stream *bs, grpc_slice *slice) { inproc_slice_byte_stream *stream = (inproc_slice_byte_stream *)bs; + if (stream->shutdown_error != GRPC_ERROR_NONE) { + return GRPC_ERROR_REF(stream->shutdown_error); + } *slice = grpc_slice_buffer_take_first(&stream->le->sb); return GRPC_ERROR_NONE; } +static void inproc_slice_byte_stream_shutdown(grpc_exec_ctx *exec_ctx, + grpc_byte_stream *bs, + grpc_error *error) { + inproc_slice_byte_stream *stream = (inproc_slice_byte_stream *)bs; + GRPC_ERROR_UNREF(stream->shutdown_error); + stream->shutdown_error = error; +} + static void inproc_slice_byte_stream_destroy(grpc_exec_ctx *exec_ctx, grpc_byte_stream *bs) { inproc_slice_byte_stream *stream = (inproc_slice_byte_stream *)bs; sb_list_entry_destroy(exec_ctx, stream->le); + GRPC_ERROR_UNREF(stream->shutdown_error); } +static const grpc_byte_stream_vtable inproc_slice_byte_stream_vtable = { + inproc_slice_byte_stream_next, inproc_slice_byte_stream_pull, + inproc_slice_byte_stream_shutdown, inproc_slice_byte_stream_destroy}; + void inproc_slice_byte_stream_init(inproc_slice_byte_stream *s, sb_list_entry *le) { s->base.length = (uint32_t)le->sb.length; s->base.flags = 0; - s->base.next = inproc_slice_byte_stream_next; - s->base.pull = inproc_slice_byte_stream_pull; - s->base.destroy = inproc_slice_byte_stream_destroy; + s->base.vtable = &inproc_slice_byte_stream_vtable; s->le = le; + s->shutdown_error = GRPC_ERROR_NONE; } static void ref_transport(inproc_transport *t) { @@ -956,11 +972,18 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, GPR_ASSERT(grpc_byte_stream_next(exec_ctx, op->payload->send_message.send_message, SIZE_MAX, &unused)); - grpc_byte_stream_pull(exec_ctx, op->payload->send_message.send_message, - &message_slice); + error = grpc_byte_stream_pull( + exec_ctx, op->payload->send_message.send_message, &message_slice); + if (error != GRPC_ERROR_NONE) { + cancel_stream_locked(exec_ctx, s, GRPC_ERROR_REF(error)); + break; + } + GPR_ASSERT(error == GRPC_ERROR_NONE); remaining -= GRPC_SLICE_LENGTH(message_slice); grpc_slice_buffer_add(dest, message_slice); } while (remaining != 0); + grpc_byte_stream_destroy(exec_ctx, + op->payload->send_message.send_message); } if (error == GRPC_ERROR_NONE && op->send_trailing_metadata) { grpc_metadata_batch *dest = (other == NULL) ? &s->write_buffer_trailing_md diff --git a/src/core/lib/transport/byte_stream.c b/src/core/lib/transport/byte_stream.c index 3355814017fc32cf9de36a51855f4f0e9c2a8fdb..fb03a10315bdd5c7fa989d6f01aec269dd39e613 100644 --- a/src/core/lib/transport/byte_stream.c +++ b/src/core/lib/transport/byte_stream.c @@ -19,29 +19,37 @@ #include "src/core/lib/transport/byte_stream.h" #include <stdlib.h> +#include <string.h> #include <grpc/support/log.h> #include "src/core/lib/slice/slice_internal.h" -int grpc_byte_stream_next(grpc_exec_ctx *exec_ctx, - grpc_byte_stream *byte_stream, size_t max_size_hint, - grpc_closure *on_complete) { - return byte_stream->next(exec_ctx, byte_stream, max_size_hint, on_complete); +bool grpc_byte_stream_next(grpc_exec_ctx *exec_ctx, + grpc_byte_stream *byte_stream, size_t max_size_hint, + grpc_closure *on_complete) { + return byte_stream->vtable->next(exec_ctx, byte_stream, max_size_hint, + on_complete); } grpc_error *grpc_byte_stream_pull(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream, grpc_slice *slice) { - return byte_stream->pull(exec_ctx, byte_stream, slice); + return byte_stream->vtable->pull(exec_ctx, byte_stream, slice); +} + +void grpc_byte_stream_shutdown(grpc_exec_ctx *exec_ctx, + grpc_byte_stream *byte_stream, + grpc_error *error) { + byte_stream->vtable->shutdown(exec_ctx, byte_stream, error); } void grpc_byte_stream_destroy(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream) { - byte_stream->destroy(exec_ctx, byte_stream); + byte_stream->vtable->destroy(exec_ctx, byte_stream); } -/* slice_buffer_stream */ +// grpc_slice_buffer_stream static bool slice_buffer_stream_next(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream, @@ -56,6 +64,9 @@ static grpc_error *slice_buffer_stream_pull(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream, grpc_slice *slice) { grpc_slice_buffer_stream *stream = (grpc_slice_buffer_stream *)byte_stream; + if (stream->shutdown_error != GRPC_ERROR_NONE) { + return GRPC_ERROR_REF(stream->shutdown_error); + } GPR_ASSERT(stream->cursor < stream->backing_buffer->count); *slice = grpc_slice_ref_internal(stream->backing_buffer->slices[stream->cursor]); @@ -63,8 +74,23 @@ static grpc_error *slice_buffer_stream_pull(grpc_exec_ctx *exec_ctx, return GRPC_ERROR_NONE; } +static void slice_buffer_stream_shutdown(grpc_exec_ctx *exec_ctx, + grpc_byte_stream *byte_stream, + grpc_error *error) { + grpc_slice_buffer_stream *stream = (grpc_slice_buffer_stream *)byte_stream; + GRPC_ERROR_UNREF(stream->shutdown_error); + stream->shutdown_error = error; +} + static void slice_buffer_stream_destroy(grpc_exec_ctx *exec_ctx, - grpc_byte_stream *byte_stream) {} + grpc_byte_stream *byte_stream) { + grpc_slice_buffer_stream *stream = (grpc_slice_buffer_stream *)byte_stream; + GRPC_ERROR_UNREF(stream->shutdown_error); +} + +static const grpc_byte_stream_vtable slice_buffer_stream_vtable = { + slice_buffer_stream_next, slice_buffer_stream_pull, + slice_buffer_stream_shutdown, slice_buffer_stream_destroy}; void grpc_slice_buffer_stream_init(grpc_slice_buffer_stream *stream, grpc_slice_buffer *slice_buffer, @@ -72,9 +98,89 @@ void grpc_slice_buffer_stream_init(grpc_slice_buffer_stream *stream, GPR_ASSERT(slice_buffer->length <= UINT32_MAX); stream->base.length = (uint32_t)slice_buffer->length; stream->base.flags = flags; - stream->base.next = slice_buffer_stream_next; - stream->base.pull = slice_buffer_stream_pull; - stream->base.destroy = slice_buffer_stream_destroy; + stream->base.vtable = &slice_buffer_stream_vtable; stream->backing_buffer = slice_buffer; stream->cursor = 0; + stream->shutdown_error = GRPC_ERROR_NONE; +} + +// grpc_caching_byte_stream + +void grpc_byte_stream_cache_init(grpc_byte_stream_cache *cache, + grpc_byte_stream *underlying_stream) { + cache->underlying_stream = underlying_stream; + grpc_slice_buffer_init(&cache->cache_buffer); +} + +void grpc_byte_stream_cache_destroy(grpc_exec_ctx *exec_ctx, + grpc_byte_stream_cache *cache) { + grpc_byte_stream_destroy(exec_ctx, cache->underlying_stream); + grpc_slice_buffer_destroy_internal(exec_ctx, &cache->cache_buffer); +} + +static bool caching_byte_stream_next(grpc_exec_ctx *exec_ctx, + grpc_byte_stream *byte_stream, + size_t max_size_hint, + grpc_closure *on_complete) { + grpc_caching_byte_stream *stream = (grpc_caching_byte_stream *)byte_stream; + if (stream->shutdown_error != GRPC_ERROR_NONE) return true; + if (stream->cursor < stream->cache->cache_buffer.count) return true; + return grpc_byte_stream_next(exec_ctx, stream->cache->underlying_stream, + max_size_hint, on_complete); +} + +static grpc_error *caching_byte_stream_pull(grpc_exec_ctx *exec_ctx, + grpc_byte_stream *byte_stream, + grpc_slice *slice) { + grpc_caching_byte_stream *stream = (grpc_caching_byte_stream *)byte_stream; + if (stream->shutdown_error != GRPC_ERROR_NONE) { + return GRPC_ERROR_REF(stream->shutdown_error); + } + if (stream->cursor < stream->cache->cache_buffer.count) { + *slice = grpc_slice_ref_internal( + stream->cache->cache_buffer.slices[stream->cursor]); + ++stream->cursor; + return GRPC_ERROR_NONE; + } + grpc_error *error = + grpc_byte_stream_pull(exec_ctx, stream->cache->underlying_stream, slice); + if (error == GRPC_ERROR_NONE) { + ++stream->cursor; + grpc_slice_buffer_add(&stream->cache->cache_buffer, + grpc_slice_ref_internal(*slice)); + } + return error; +} + +static void caching_byte_stream_shutdown(grpc_exec_ctx *exec_ctx, + grpc_byte_stream *byte_stream, + grpc_error *error) { + grpc_caching_byte_stream *stream = (grpc_caching_byte_stream *)byte_stream; + GRPC_ERROR_UNREF(stream->shutdown_error); + stream->shutdown_error = GRPC_ERROR_REF(error); + grpc_byte_stream_shutdown(exec_ctx, stream->cache->underlying_stream, error); +} + +static void caching_byte_stream_destroy(grpc_exec_ctx *exec_ctx, + grpc_byte_stream *byte_stream) { + grpc_caching_byte_stream *stream = (grpc_caching_byte_stream *)byte_stream; + GRPC_ERROR_UNREF(stream->shutdown_error); +} + +static const grpc_byte_stream_vtable caching_byte_stream_vtable = { + caching_byte_stream_next, caching_byte_stream_pull, + caching_byte_stream_shutdown, caching_byte_stream_destroy}; + +void grpc_caching_byte_stream_init(grpc_caching_byte_stream *stream, + grpc_byte_stream_cache *cache) { + memset(stream, 0, sizeof(*stream)); + stream->base.length = cache->underlying_stream->length; + stream->base.flags = cache->underlying_stream->flags; + stream->base.vtable = &caching_byte_stream_vtable; + stream->cache = cache; + stream->shutdown_error = GRPC_ERROR_NONE; +} + +void grpc_caching_byte_stream_reset(grpc_caching_byte_stream *stream) { + stream->cursor = 0; } diff --git a/src/core/lib/transport/byte_stream.h b/src/core/lib/transport/byte_stream.h index f172296e4b445baf43f5ac7c5a02b8eb1874070d..1e1e8310b832ad91c69e84e5ac4159aee640d091 100644 --- a/src/core/lib/transport/byte_stream.h +++ b/src/core/lib/transport/byte_stream.h @@ -28,52 +28,109 @@ /** Mask of all valid internal flags. */ #define GRPC_WRITE_INTERNAL_USED_MASK (GRPC_WRITE_INTERNAL_COMPRESS) -struct grpc_byte_stream; typedef struct grpc_byte_stream grpc_byte_stream; -struct grpc_byte_stream { - uint32_t length; - uint32_t flags; +typedef struct { bool (*next)(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream, size_t max_size_hint, grpc_closure *on_complete); grpc_error *(*pull)(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream, grpc_slice *slice); + void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream, + grpc_error *error); void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream); +} grpc_byte_stream_vtable; + +struct grpc_byte_stream { + uint32_t length; + uint32_t flags; + const grpc_byte_stream_vtable *vtable; }; -/* returns 1 if the bytes are available immediately (in which case - * on_complete will not be called), 0 if the bytes will be available - * asynchronously. - * - * max_size_hint can be set as a hint as to the maximum number - * of bytes that would be acceptable to read. - */ -int grpc_byte_stream_next(grpc_exec_ctx *exec_ctx, - grpc_byte_stream *byte_stream, size_t max_size_hint, - grpc_closure *on_complete); +// Returns true if the bytes are available immediately (in which case +// on_complete will not be called), false if the bytes will be available +// asynchronously. +// +// max_size_hint can be set as a hint as to the maximum number +// of bytes that would be acceptable to read. +bool grpc_byte_stream_next(grpc_exec_ctx *exec_ctx, + grpc_byte_stream *byte_stream, size_t max_size_hint, + grpc_closure *on_complete); -/* returns the next slice in the byte stream when it is ready (indicated by - * either grpc_byte_stream_next returning 1 or on_complete passed to - * grpc_byte_stream_next is called). - * - * once a slice is returned into *slice, it is owned by the caller. - */ +// Returns the next slice in the byte stream when it is ready (indicated by +// either grpc_byte_stream_next returning true or on_complete passed to +// grpc_byte_stream_next is called). +// +// Once a slice is returned into *slice, it is owned by the caller. grpc_error *grpc_byte_stream_pull(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream, grpc_slice *slice); +// Shuts down the byte stream. +// +// If there is a pending call to on_complete from grpc_byte_stream_next(), +// it will be invoked with the error passed to grpc_byte_stream_shutdown(). +// +// The next call to grpc_byte_stream_pull() (if any) will return the error +// passed to grpc_byte_stream_shutdown(). +void grpc_byte_stream_shutdown(grpc_exec_ctx *exec_ctx, + grpc_byte_stream *byte_stream, + grpc_error *error); + void grpc_byte_stream_destroy(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream); -/* grpc_byte_stream that wraps a slice buffer */ +// grpc_slice_buffer_stream +// +// A grpc_byte_stream that wraps a slice buffer. + typedef struct grpc_slice_buffer_stream { grpc_byte_stream base; grpc_slice_buffer *backing_buffer; size_t cursor; + grpc_error *shutdown_error; } grpc_slice_buffer_stream; void grpc_slice_buffer_stream_init(grpc_slice_buffer_stream *stream, grpc_slice_buffer *slice_buffer, uint32_t flags); +// grpc_caching_byte_stream +// +// A grpc_byte_stream that that wraps an underlying byte stream but caches +// the resulting slices in a slice buffer. If an initial attempt fails +// without fully draining the underlying stream, a new caching stream +// can be created from the same underlying cache, in which case it will +// return whatever is in the backing buffer before continuing to read the +// underlying stream. +// +// NOTE: No synchronization is done, so it is not safe to have multiple +// grpc_caching_byte_streams simultaneously drawing from the same underlying +// grpc_byte_stream_cache at the same time. + +typedef struct { + grpc_byte_stream *underlying_stream; + grpc_slice_buffer cache_buffer; +} grpc_byte_stream_cache; + +// Takes ownership of underlying_stream. +void grpc_byte_stream_cache_init(grpc_byte_stream_cache *cache, + grpc_byte_stream *underlying_stream); + +// Must not be called while still in use by a grpc_caching_byte_stream. +void grpc_byte_stream_cache_destroy(grpc_exec_ctx *exec_ctx, + grpc_byte_stream_cache *cache); + +typedef struct { + grpc_byte_stream base; + grpc_byte_stream_cache *cache; + size_t cursor; + grpc_error *shutdown_error; +} grpc_caching_byte_stream; + +void grpc_caching_byte_stream_init(grpc_caching_byte_stream *stream, + grpc_byte_stream_cache *cache); + +// Resets the byte stream to the start of the underlying stream. +void grpc_caching_byte_stream_reset(grpc_caching_byte_stream *stream); + #endif /* GRPC_CORE_LIB_TRANSPORT_BYTE_STREAM_H */ diff --git a/src/core/lib/transport/transport.c b/src/core/lib/transport/transport.c index 7281602d66b50c2c247812a8a39b3d1737101fb7..6c61f4b8d9d0e8726f71b065cd86bb363fbf23b3 100644 --- a/src/core/lib/transport/transport.c +++ b/src/core/lib/transport/transport.c @@ -207,27 +207,35 @@ grpc_endpoint *grpc_transport_get_endpoint(grpc_exec_ctx *exec_ctx, return transport->vtable->get_endpoint(exec_ctx, transport); } +// This comment should be sung to the tune of +// "Supercalifragilisticexpialidocious": +// // grpc_transport_stream_op_batch_finish_with_failure // is a function that must always unref cancel_error // though it lives in lib, it handles transport stream ops sure // it's grpc_transport_stream_op_batch_finish_with_failure void grpc_transport_stream_op_batch_finish_with_failure( - grpc_exec_ctx *exec_ctx, grpc_transport_stream_op_batch *op, + grpc_exec_ctx *exec_ctx, grpc_transport_stream_op_batch *batch, grpc_error *error) { - if (op->recv_message) { - GRPC_CLOSURE_SCHED(exec_ctx, op->payload->recv_message.recv_message_ready, + if (batch->send_message) { + grpc_byte_stream_destroy(exec_ctx, + batch->payload->send_message.send_message); + } + if (batch->recv_message) { + GRPC_CLOSURE_SCHED(exec_ctx, + batch->payload->recv_message.recv_message_ready, GRPC_ERROR_REF(error)); } - if (op->recv_initial_metadata) { + if (batch->recv_initial_metadata) { GRPC_CLOSURE_SCHED( exec_ctx, - op->payload->recv_initial_metadata.recv_initial_metadata_ready, + batch->payload->recv_initial_metadata.recv_initial_metadata_ready, GRPC_ERROR_REF(error)); } - GRPC_CLOSURE_SCHED(exec_ctx, op->on_complete, error); - if (op->cancel_stream) { - GRPC_ERROR_UNREF(op->payload->cancel_stream.cancel_error); + GRPC_CLOSURE_SCHED(exec_ctx, batch->on_complete, error); + if (batch->cancel_stream) { + GRPC_ERROR_UNREF(batch->payload->cancel_stream.cancel_error); } } diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h index 84e53e683acb09fdee977fac7df4284d3a6ef7ac..099138ea14645aaabe7fb64ce44958835d17b9b9 100644 --- a/src/core/lib/transport/transport.h +++ b/src/core/lib/transport/transport.h @@ -159,6 +159,11 @@ struct grpc_transport_stream_op_batch_payload { } send_trailing_metadata; struct { + // The transport (or a filter that decides to return a failure before + // the op gets down to the transport) is responsible for calling + // grpc_byte_stream_destroy() on this. + // The batch's on_complete will not be called until after the byte + // stream is destroyed. grpc_byte_stream *send_message; } send_message; @@ -174,6 +179,10 @@ struct grpc_transport_stream_op_batch_payload { } recv_initial_metadata; struct { + // Will be set by the transport to point to the byte stream + // containing a received message. + // The caller is responsible for calling grpc_byte_stream_destroy() + // on this byte stream. grpc_byte_stream **recv_message; /** Should be enqueued when one message is ready to be processed. */ grpc_closure *recv_message_ready; diff --git a/test/core/transport/BUILD b/test/core/transport/BUILD index 8091cf9c6335f6d618567c4ff97b4a505574e304..040c0c35c220922c875f22e301ea6283f18611c7 100644 --- a/test/core/transport/BUILD +++ b/test/core/transport/BUILD @@ -35,6 +35,18 @@ grpc_cc_test( ], ) +grpc_cc_test( + name = "byte_stream_test", + srcs = ["byte_stream_test.c"], + language = "C", + deps = [ + "//:gpr", + "//:grpc", + "//test/core/util:gpr_test_util", + "//test/core/util:grpc_test_util", + ], +) + grpc_cc_test( name = "connectivity_state_test", srcs = ["connectivity_state_test.c"], diff --git a/test/core/transport/byte_stream_test.c b/test/core/transport/byte_stream_test.c new file mode 100644 index 0000000000000000000000000000000000000000..a0c5f961cfd00f144ccd21354356c1a5987c8dd8 --- /dev/null +++ b/test/core/transport/byte_stream_test.c @@ -0,0 +1,279 @@ +/* + * + * Copyright 2017 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "src/core/lib/transport/byte_stream.h" + +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/useful.h> + +#include "src/core/lib/slice/slice_internal.h" + +#include "test/core/util/test_config.h" + +// +// grpc_slice_buffer_stream tests +// + +static void not_called_closure(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + GPR_ASSERT(false); +} + +static void test_slice_buffer_stream_basic(void) { + gpr_log(GPR_DEBUG, "test_slice_buffer_stream_basic"); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + // Create and populate slice buffer. + grpc_slice_buffer buffer; + grpc_slice_buffer_init(&buffer); + grpc_slice input[] = { + grpc_slice_from_static_string("foo"), + grpc_slice_from_static_string("bar"), + }; + for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) { + grpc_slice_buffer_add(&buffer, input[i]); + } + // Create byte stream. + grpc_slice_buffer_stream stream; + grpc_slice_buffer_stream_init(&stream, &buffer, 0); + GPR_ASSERT(stream.base.length == 6); + grpc_closure closure; + GRPC_CLOSURE_INIT(&closure, not_called_closure, NULL, + grpc_schedule_on_exec_ctx); + // Read each slice. Note that next() always returns synchronously. + for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) { + GPR_ASSERT( + grpc_byte_stream_next(&exec_ctx, &stream.base, ~(size_t)0, &closure)); + grpc_slice output; + grpc_error *error = grpc_byte_stream_pull(&exec_ctx, &stream.base, &output); + GPR_ASSERT(error == GRPC_ERROR_NONE); + GPR_ASSERT(grpc_slice_eq(input[i], output)); + grpc_slice_unref_internal(&exec_ctx, output); + } + // Clean up. + grpc_byte_stream_destroy(&exec_ctx, &stream.base); + grpc_slice_buffer_destroy_internal(&exec_ctx, &buffer); + grpc_exec_ctx_finish(&exec_ctx); +} + +static void test_slice_buffer_stream_shutdown(void) { + gpr_log(GPR_DEBUG, "test_slice_buffer_stream_shutdown"); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + // Create and populate slice buffer. + grpc_slice_buffer buffer; + grpc_slice_buffer_init(&buffer); + grpc_slice input[] = { + grpc_slice_from_static_string("foo"), + grpc_slice_from_static_string("bar"), + }; + for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) { + grpc_slice_buffer_add(&buffer, input[i]); + } + // Create byte stream. + grpc_slice_buffer_stream stream; + grpc_slice_buffer_stream_init(&stream, &buffer, 0); + GPR_ASSERT(stream.base.length == 6); + grpc_closure closure; + GRPC_CLOSURE_INIT(&closure, not_called_closure, NULL, + grpc_schedule_on_exec_ctx); + // Read the first slice. + GPR_ASSERT( + grpc_byte_stream_next(&exec_ctx, &stream.base, ~(size_t)0, &closure)); + grpc_slice output; + grpc_error *error = grpc_byte_stream_pull(&exec_ctx, &stream.base, &output); + GPR_ASSERT(error == GRPC_ERROR_NONE); + GPR_ASSERT(grpc_slice_eq(input[0], output)); + grpc_slice_unref_internal(&exec_ctx, output); + // Now shutdown. + grpc_error *shutdown_error = + GRPC_ERROR_CREATE_FROM_STATIC_STRING("shutdown error"); + grpc_byte_stream_shutdown(&exec_ctx, &stream.base, + GRPC_ERROR_REF(shutdown_error)); + // After shutdown, the next pull() should return the error. + GPR_ASSERT( + grpc_byte_stream_next(&exec_ctx, &stream.base, ~(size_t)0, &closure)); + error = grpc_byte_stream_pull(&exec_ctx, &stream.base, &output); + GPR_ASSERT(error == shutdown_error); + GRPC_ERROR_UNREF(error); + GRPC_ERROR_UNREF(shutdown_error); + // Clean up. + grpc_byte_stream_destroy(&exec_ctx, &stream.base); + grpc_slice_buffer_destroy_internal(&exec_ctx, &buffer); + grpc_exec_ctx_finish(&exec_ctx); +} + +// +// grpc_caching_byte_stream tests +// + +static void test_caching_byte_stream_basic(void) { + gpr_log(GPR_DEBUG, "test_caching_byte_stream_basic"); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + // Create and populate slice buffer byte stream. + grpc_slice_buffer buffer; + grpc_slice_buffer_init(&buffer); + grpc_slice input[] = { + grpc_slice_from_static_string("foo"), + grpc_slice_from_static_string("bar"), + }; + for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) { + grpc_slice_buffer_add(&buffer, input[i]); + } + grpc_slice_buffer_stream underlying_stream; + grpc_slice_buffer_stream_init(&underlying_stream, &buffer, 0); + // Create cache and caching stream. + grpc_byte_stream_cache cache; + grpc_byte_stream_cache_init(&cache, &underlying_stream.base); + grpc_caching_byte_stream stream; + grpc_caching_byte_stream_init(&stream, &cache); + grpc_closure closure; + GRPC_CLOSURE_INIT(&closure, not_called_closure, NULL, + grpc_schedule_on_exec_ctx); + // Read each slice. Note that next() always returns synchronously, + // because the underlying byte stream always does. + for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) { + GPR_ASSERT( + grpc_byte_stream_next(&exec_ctx, &stream.base, ~(size_t)0, &closure)); + grpc_slice output; + grpc_error *error = grpc_byte_stream_pull(&exec_ctx, &stream.base, &output); + GPR_ASSERT(error == GRPC_ERROR_NONE); + GPR_ASSERT(grpc_slice_eq(input[i], output)); + grpc_slice_unref_internal(&exec_ctx, output); + } + // Clean up. + grpc_byte_stream_destroy(&exec_ctx, &stream.base); + grpc_byte_stream_cache_destroy(&exec_ctx, &cache); + grpc_slice_buffer_destroy_internal(&exec_ctx, &buffer); + grpc_exec_ctx_finish(&exec_ctx); +} + +static void test_caching_byte_stream_reset(void) { + gpr_log(GPR_DEBUG, "test_caching_byte_stream_reset"); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + // Create and populate slice buffer byte stream. + grpc_slice_buffer buffer; + grpc_slice_buffer_init(&buffer); + grpc_slice input[] = { + grpc_slice_from_static_string("foo"), + grpc_slice_from_static_string("bar"), + }; + for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) { + grpc_slice_buffer_add(&buffer, input[i]); + } + grpc_slice_buffer_stream underlying_stream; + grpc_slice_buffer_stream_init(&underlying_stream, &buffer, 0); + // Create cache and caching stream. + grpc_byte_stream_cache cache; + grpc_byte_stream_cache_init(&cache, &underlying_stream.base); + grpc_caching_byte_stream stream; + grpc_caching_byte_stream_init(&stream, &cache); + grpc_closure closure; + GRPC_CLOSURE_INIT(&closure, not_called_closure, NULL, + grpc_schedule_on_exec_ctx); + // Read one slice. + GPR_ASSERT( + grpc_byte_stream_next(&exec_ctx, &stream.base, ~(size_t)0, &closure)); + grpc_slice output; + grpc_error *error = grpc_byte_stream_pull(&exec_ctx, &stream.base, &output); + GPR_ASSERT(error == GRPC_ERROR_NONE); + GPR_ASSERT(grpc_slice_eq(input[0], output)); + grpc_slice_unref_internal(&exec_ctx, output); + // Reset the caching stream. The reads should start over from the + // first slice. + grpc_caching_byte_stream_reset(&stream); + for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) { + GPR_ASSERT( + grpc_byte_stream_next(&exec_ctx, &stream.base, ~(size_t)0, &closure)); + error = grpc_byte_stream_pull(&exec_ctx, &stream.base, &output); + GPR_ASSERT(error == GRPC_ERROR_NONE); + GPR_ASSERT(grpc_slice_eq(input[i], output)); + grpc_slice_unref_internal(&exec_ctx, output); + } + // Clean up. + grpc_byte_stream_destroy(&exec_ctx, &stream.base); + grpc_byte_stream_cache_destroy(&exec_ctx, &cache); + grpc_slice_buffer_destroy_internal(&exec_ctx, &buffer); + grpc_exec_ctx_finish(&exec_ctx); +} + +static void test_caching_byte_stream_shared_cache(void) { + gpr_log(GPR_DEBUG, "test_caching_byte_stream_shared_cache"); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + // Create and populate slice buffer byte stream. + grpc_slice_buffer buffer; + grpc_slice_buffer_init(&buffer); + grpc_slice input[] = { + grpc_slice_from_static_string("foo"), + grpc_slice_from_static_string("bar"), + }; + for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) { + grpc_slice_buffer_add(&buffer, input[i]); + } + grpc_slice_buffer_stream underlying_stream; + grpc_slice_buffer_stream_init(&underlying_stream, &buffer, 0); + // Create cache and two caching streams. + grpc_byte_stream_cache cache; + grpc_byte_stream_cache_init(&cache, &underlying_stream.base); + grpc_caching_byte_stream stream1; + grpc_caching_byte_stream_init(&stream1, &cache); + grpc_caching_byte_stream stream2; + grpc_caching_byte_stream_init(&stream2, &cache); + grpc_closure closure; + GRPC_CLOSURE_INIT(&closure, not_called_closure, NULL, + grpc_schedule_on_exec_ctx); + // Read one slice from stream1. + GPR_ASSERT( + grpc_byte_stream_next(&exec_ctx, &stream1.base, ~(size_t)0, &closure)); + grpc_slice output; + grpc_error *error = grpc_byte_stream_pull(&exec_ctx, &stream1.base, &output); + GPR_ASSERT(error == GRPC_ERROR_NONE); + GPR_ASSERT(grpc_slice_eq(input[0], output)); + grpc_slice_unref_internal(&exec_ctx, output); + // Read all slices from stream2. + for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) { + GPR_ASSERT( + grpc_byte_stream_next(&exec_ctx, &stream2.base, ~(size_t)0, &closure)); + error = grpc_byte_stream_pull(&exec_ctx, &stream2.base, &output); + GPR_ASSERT(error == GRPC_ERROR_NONE); + GPR_ASSERT(grpc_slice_eq(input[i], output)); + grpc_slice_unref_internal(&exec_ctx, output); + } + // Now read the second slice from stream1. + GPR_ASSERT( + grpc_byte_stream_next(&exec_ctx, &stream1.base, ~(size_t)0, &closure)); + error = grpc_byte_stream_pull(&exec_ctx, &stream1.base, &output); + GPR_ASSERT(error == GRPC_ERROR_NONE); + GPR_ASSERT(grpc_slice_eq(input[1], output)); + grpc_slice_unref_internal(&exec_ctx, output); + // Clean up. + grpc_byte_stream_destroy(&exec_ctx, &stream1.base); + grpc_byte_stream_destroy(&exec_ctx, &stream2.base); + grpc_byte_stream_cache_destroy(&exec_ctx, &cache); + grpc_slice_buffer_destroy_internal(&exec_ctx, &buffer); + grpc_exec_ctx_finish(&exec_ctx); +} + +int main(int argc, char **argv) { + grpc_test_init(argc, argv); + test_slice_buffer_stream_basic(); + test_slice_buffer_stream_shutdown(); + test_caching_byte_stream_basic(); + test_caching_byte_stream_reset(); + test_caching_byte_stream_shared_cache(); + return 0; +} diff --git a/tools/run_tests/generated/sources_and_headers.json b/tools/run_tests/generated/sources_and_headers.json index 24e7ff2b7bc44ce4cd98f11695951eb5549c2389..aa76fc047fb000d7c9888d12b8f9f1c2a5d573ef 100644 --- a/tools/run_tests/generated/sources_and_headers.json +++ b/tools/run_tests/generated/sources_and_headers.json @@ -164,6 +164,23 @@ "third_party": false, "type": "target" }, + { + "deps": [ + "gpr", + "gpr_test_util", + "grpc", + "grpc_test_util" + ], + "headers": [], + "is_filegroup": false, + "language": "c", + "name": "byte_stream_test", + "src": [ + "test/core/transport/byte_stream_test.c" + ], + "third_party": false, + "type": "target" + }, { "deps": [ "gpr", diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json index ebf30717d4e3407e2492bc089becee5ea7050467..83781f941e7cc2b181366c0938665c8f23502602 100644 --- a/tools/run_tests/generated/tests.json +++ b/tools/run_tests/generated/tests.json @@ -201,6 +201,28 @@ "windows" ] }, + { + "args": [], + "ci_platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "cpu_cost": 1.0, + "exclude_configs": [], + "exclude_iomgrs": [], + "flaky": false, + "gtest": false, + "language": "c", + "name": "byte_stream_test", + "platforms": [ + "linux", + "mac", + "posix", + "windows" + ] + }, { "args": [], "ci_platforms": [ diff --git a/vsprojects/buildtests_c.sln b/vsprojects/buildtests_c.sln index 55de734b427278d7837405b953ad15dd1b78190e..6f13039cac19aef6021a5a188d7e93fae24f745e 100644 --- a/vsprojects/buildtests_c.sln +++ b/vsprojects/buildtests_c.sln @@ -118,6 +118,17 @@ Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "bin_encoder_test", "vcxproj {29D16885-7228-4C31-81ED-5F9187C7F2A9} = {29D16885-7228-4C31-81ED-5F9187C7F2A9} EndProjectSection EndProject +Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "byte_stream_test", "vcxproj\test\byte_stream_test\byte_stream_test.vcxproj", "{9AEDA345-E3E8-BFE9-11BF-64949EF41C9C}" + ProjectSection(myProperties) = preProject + lib = "False" + EndProjectSection + ProjectSection(ProjectDependencies) = postProject + {17BCAFC0-5FDC-4C94-AEB9-95F3E220614B} = {17BCAFC0-5FDC-4C94-AEB9-95F3E220614B} + {29D16885-7228-4C31-81ED-5F9187C7F2A9} = {29D16885-7228-4C31-81ED-5F9187C7F2A9} + {EAB0A629-17A9-44DB-B5FF-E91A721FE037} = {EAB0A629-17A9-44DB-B5FF-E91A721FE037} + {B23D3D1A-9438-4EDA-BEB6-9A0A03D17792} = {B23D3D1A-9438-4EDA-BEB6-9A0A03D17792} + EndProjectSection +EndProject Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "census_context_test", "vcxproj\test\census_context_test\census_context_test.vcxproj", "{5C1CFC2D-AF3C-D7CB-BA74-D267E91CBC73}" ProjectSection(myProperties) = preProject lib = "False" @@ -1940,6 +1951,22 @@ Global {D5C70922-D68E-0E9D-9988-995E0F9A79AE}.Release-DLL|Win32.Build.0 = Release|Win32 {D5C70922-D68E-0E9D-9988-995E0F9A79AE}.Release-DLL|x64.ActiveCfg = Release|x64 {D5C70922-D68E-0E9D-9988-995E0F9A79AE}.Release-DLL|x64.Build.0 = Release|x64 + {9AEDA345-E3E8-BFE9-11BF-64949EF41C9C}.Debug|Win32.ActiveCfg = Debug|Win32 + {9AEDA345-E3E8-BFE9-11BF-64949EF41C9C}.Debug|x64.ActiveCfg = Debug|x64 + {9AEDA345-E3E8-BFE9-11BF-64949EF41C9C}.Release|Win32.ActiveCfg = Release|Win32 + {9AEDA345-E3E8-BFE9-11BF-64949EF41C9C}.Release|x64.ActiveCfg = Release|x64 + {9AEDA345-E3E8-BFE9-11BF-64949EF41C9C}.Debug|Win32.Build.0 = Debug|Win32 + {9AEDA345-E3E8-BFE9-11BF-64949EF41C9C}.Debug|x64.Build.0 = Debug|x64 + {9AEDA345-E3E8-BFE9-11BF-64949EF41C9C}.Release|Win32.Build.0 = Release|Win32 + {9AEDA345-E3E8-BFE9-11BF-64949EF41C9C}.Release|x64.Build.0 = Release|x64 + {9AEDA345-E3E8-BFE9-11BF-64949EF41C9C}.Debug-DLL|Win32.ActiveCfg = Debug|Win32 + {9AEDA345-E3E8-BFE9-11BF-64949EF41C9C}.Debug-DLL|Win32.Build.0 = Debug|Win32 + {9AEDA345-E3E8-BFE9-11BF-64949EF41C9C}.Debug-DLL|x64.ActiveCfg = Debug|x64 + {9AEDA345-E3E8-BFE9-11BF-64949EF41C9C}.Debug-DLL|x64.Build.0 = Debug|x64 + {9AEDA345-E3E8-BFE9-11BF-64949EF41C9C}.Release-DLL|Win32.ActiveCfg = Release|Win32 + {9AEDA345-E3E8-BFE9-11BF-64949EF41C9C}.Release-DLL|Win32.Build.0 = Release|Win32 + {9AEDA345-E3E8-BFE9-11BF-64949EF41C9C}.Release-DLL|x64.ActiveCfg = Release|x64 + {9AEDA345-E3E8-BFE9-11BF-64949EF41C9C}.Release-DLL|x64.Build.0 = Release|x64 {5C1CFC2D-AF3C-D7CB-BA74-D267E91CBC73}.Debug|Win32.ActiveCfg = Debug|Win32 {5C1CFC2D-AF3C-D7CB-BA74-D267E91CBC73}.Debug|x64.ActiveCfg = Debug|x64 {5C1CFC2D-AF3C-D7CB-BA74-D267E91CBC73}.Release|Win32.ActiveCfg = Release|Win32 diff --git a/vsprojects/vcxproj/test/byte_stream_test/byte_stream_test.vcxproj b/vsprojects/vcxproj/test/byte_stream_test/byte_stream_test.vcxproj new file mode 100644 index 0000000000000000000000000000000000000000..5d656471b1a039cf2c56650eb4b7b00ce48315d1 --- /dev/null +++ b/vsprojects/vcxproj/test/byte_stream_test/byte_stream_test.vcxproj @@ -0,0 +1,199 @@ +<?xml version="1.0" encoding="utf-8"?> +<Project DefaultTargets="Build" ToolsVersion="12.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003"> + <Import Project="$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.1.0.204.1\build\native\grpc.dependencies.openssl.props" Condition="Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.1.0.204.1\build\native\1.0.204.1.props')" /> + <ItemGroup Label="ProjectConfigurations"> + <ProjectConfiguration Include="Debug|Win32"> + <Configuration>Debug</Configuration> + <Platform>Win32</Platform> + </ProjectConfiguration> + <ProjectConfiguration Include="Debug|x64"> + <Configuration>Debug</Configuration> + <Platform>x64</Platform> + </ProjectConfiguration> + <ProjectConfiguration Include="Release|Win32"> + <Configuration>Release</Configuration> + <Platform>Win32</Platform> + </ProjectConfiguration> + <ProjectConfiguration Include="Release|x64"> + <Configuration>Release</Configuration> + <Platform>x64</Platform> + </ProjectConfiguration> + </ItemGroup> + <PropertyGroup Label="Globals"> + <ProjectGuid>{9AEDA345-E3E8-BFE9-11BF-64949EF41C9C}</ProjectGuid> + <IgnoreWarnIntDirInTempDetected>true</IgnoreWarnIntDirInTempDetected> + <IntDir>$(SolutionDir)IntDir\$(MSBuildProjectName)\</IntDir> + </PropertyGroup> + <Import Project="$(VCTargetsPath)\Microsoft.Cpp.Default.props" /> + <PropertyGroup Condition="'$(VisualStudioVersion)' == '10.0'" Label="Configuration"> + <PlatformToolset>v100</PlatformToolset> + </PropertyGroup> + <PropertyGroup Condition="'$(VisualStudioVersion)' == '11.0'" Label="Configuration"> + <PlatformToolset>v110</PlatformToolset> + </PropertyGroup> + <PropertyGroup Condition="'$(VisualStudioVersion)' == '12.0'" Label="Configuration"> + <PlatformToolset>v120</PlatformToolset> + </PropertyGroup> + <PropertyGroup Condition="'$(VisualStudioVersion)' == '14.0'" Label="Configuration"> + <PlatformToolset>v140</PlatformToolset> + </PropertyGroup> + <PropertyGroup Condition="'$(Configuration)'=='Debug'" Label="Configuration"> + <ConfigurationType>Application</ConfigurationType> + <UseDebugLibraries>true</UseDebugLibraries> + <CharacterSet>Unicode</CharacterSet> + </PropertyGroup> + <PropertyGroup Condition="'$(Configuration)'=='Release'" Label="Configuration"> + <ConfigurationType>Application</ConfigurationType> + <UseDebugLibraries>false</UseDebugLibraries> + <WholeProgramOptimization>true</WholeProgramOptimization> + <CharacterSet>Unicode</CharacterSet> + </PropertyGroup> + <Import Project="$(VCTargetsPath)\Microsoft.Cpp.props" /> + <ImportGroup Label="ExtensionSettings"> + </ImportGroup> + <ImportGroup Label="PropertySheets"> + <Import Project="$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props" Condition="exists('$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props')" Label="LocalAppDataPlatform" /> + <Import Project="$(SolutionDir)\..\vsprojects\global.props" /> + <Import Project="$(SolutionDir)\..\vsprojects\openssl.props" /> + <Import Project="$(SolutionDir)\..\vsprojects\winsock.props" /> + <Import Project="$(SolutionDir)\..\vsprojects\zlib.props" /> + </ImportGroup> + <PropertyGroup Label="UserMacros" /> + <PropertyGroup Condition="'$(Configuration)'=='Debug'"> + <TargetName>byte_stream_test</TargetName> + <Linkage-grpc_dependencies_zlib>static</Linkage-grpc_dependencies_zlib> + <Configuration-grpc_dependencies_zlib>Debug</Configuration-grpc_dependencies_zlib> + <Linkage-grpc_dependencies_openssl>static</Linkage-grpc_dependencies_openssl> + <Configuration-grpc_dependencies_openssl>Debug</Configuration-grpc_dependencies_openssl> + </PropertyGroup> + <PropertyGroup Condition="'$(Configuration)'=='Release'"> + <TargetName>byte_stream_test</TargetName> + <Linkage-grpc_dependencies_zlib>static</Linkage-grpc_dependencies_zlib> + <Configuration-grpc_dependencies_zlib>Release</Configuration-grpc_dependencies_zlib> + <Linkage-grpc_dependencies_openssl>static</Linkage-grpc_dependencies_openssl> + <Configuration-grpc_dependencies_openssl>Release</Configuration-grpc_dependencies_openssl> + </PropertyGroup> + <ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'"> + <ClCompile> + <PrecompiledHeader>NotUsing</PrecompiledHeader> + <WarningLevel>Level3</WarningLevel> + <Optimization>Disabled</Optimization> + <PreprocessorDefinitions>WIN32;_DEBUG;_LIB;%(PreprocessorDefinitions)</PreprocessorDefinitions> + <SDLCheck>true</SDLCheck> + <RuntimeLibrary>MultiThreadedDebug</RuntimeLibrary> + <TreatWarningAsError>true</TreatWarningAsError> + <DebugInformationFormat Condition="$(Jenkins)">None</DebugInformationFormat> + <MinimalRebuild Condition="$(Jenkins)">false</MinimalRebuild> + </ClCompile> + <Link> + <SubSystem>Console</SubSystem> + <GenerateDebugInformation Condition="!$(Jenkins)">true</GenerateDebugInformation> + <GenerateDebugInformation Condition="$(Jenkins)">false</GenerateDebugInformation> + </Link> + </ItemDefinitionGroup> + + <ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Debug|x64'"> + <ClCompile> + <PrecompiledHeader>NotUsing</PrecompiledHeader> + <WarningLevel>Level3</WarningLevel> + <Optimization>Disabled</Optimization> + <PreprocessorDefinitions>WIN32;_DEBUG;_LIB;%(PreprocessorDefinitions)</PreprocessorDefinitions> + <SDLCheck>true</SDLCheck> + <RuntimeLibrary>MultiThreadedDebug</RuntimeLibrary> + <TreatWarningAsError>true</TreatWarningAsError> + <DebugInformationFormat Condition="$(Jenkins)">None</DebugInformationFormat> + <MinimalRebuild Condition="$(Jenkins)">false</MinimalRebuild> + </ClCompile> + <Link> + <SubSystem>Console</SubSystem> + <GenerateDebugInformation Condition="!$(Jenkins)">true</GenerateDebugInformation> + <GenerateDebugInformation Condition="$(Jenkins)">false</GenerateDebugInformation> + </Link> + </ItemDefinitionGroup> + + <ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Release|Win32'"> + <ClCompile> + <PrecompiledHeader>NotUsing</PrecompiledHeader> + <WarningLevel>Level3</WarningLevel> + <Optimization>MaxSpeed</Optimization> + <PreprocessorDefinitions>WIN32;NDEBUG;_LIB;%(PreprocessorDefinitions)</PreprocessorDefinitions> + <FunctionLevelLinking>true</FunctionLevelLinking> + <IntrinsicFunctions>true</IntrinsicFunctions> + <SDLCheck>true</SDLCheck> + <RuntimeLibrary>MultiThreaded</RuntimeLibrary> + <TreatWarningAsError>true</TreatWarningAsError> + <DebugInformationFormat Condition="$(Jenkins)">None</DebugInformationFormat> + <MinimalRebuild Condition="$(Jenkins)">false</MinimalRebuild> + </ClCompile> + <Link> + <SubSystem>Console</SubSystem> + <GenerateDebugInformation Condition="!$(Jenkins)">true</GenerateDebugInformation> + <GenerateDebugInformation Condition="$(Jenkins)">false</GenerateDebugInformation> + <EnableCOMDATFolding>true</EnableCOMDATFolding> + <OptimizeReferences>true</OptimizeReferences> + </Link> + </ItemDefinitionGroup> + + <ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Release|x64'"> + <ClCompile> + <PrecompiledHeader>NotUsing</PrecompiledHeader> + <WarningLevel>Level3</WarningLevel> + <Optimization>MaxSpeed</Optimization> + <PreprocessorDefinitions>WIN32;NDEBUG;_LIB;%(PreprocessorDefinitions)</PreprocessorDefinitions> + <FunctionLevelLinking>true</FunctionLevelLinking> + <IntrinsicFunctions>true</IntrinsicFunctions> + <SDLCheck>true</SDLCheck> + <RuntimeLibrary>MultiThreaded</RuntimeLibrary> + <TreatWarningAsError>true</TreatWarningAsError> + <DebugInformationFormat Condition="$(Jenkins)">None</DebugInformationFormat> + <MinimalRebuild Condition="$(Jenkins)">false</MinimalRebuild> + </ClCompile> + <Link> + <SubSystem>Console</SubSystem> + <GenerateDebugInformation Condition="!$(Jenkins)">true</GenerateDebugInformation> + <GenerateDebugInformation Condition="$(Jenkins)">false</GenerateDebugInformation> + <EnableCOMDATFolding>true</EnableCOMDATFolding> + <OptimizeReferences>true</OptimizeReferences> + </Link> + </ItemDefinitionGroup> + + <ItemGroup> + <ClCompile Include="$(SolutionDir)\..\test\core\transport\byte_stream_test.c"> + </ClCompile> + </ItemGroup> + <ItemGroup> + <ProjectReference Include="$(SolutionDir)\..\vsprojects\vcxproj\.\grpc_test_util\grpc_test_util.vcxproj"> + <Project>{17BCAFC0-5FDC-4C94-AEB9-95F3E220614B}</Project> + </ProjectReference> + <ProjectReference Include="$(SolutionDir)\..\vsprojects\vcxproj\.\grpc\grpc.vcxproj"> + <Project>{29D16885-7228-4C31-81ED-5F9187C7F2A9}</Project> + </ProjectReference> + <ProjectReference Include="$(SolutionDir)\..\vsprojects\vcxproj\.\gpr_test_util\gpr_test_util.vcxproj"> + <Project>{EAB0A629-17A9-44DB-B5FF-E91A721FE037}</Project> + </ProjectReference> + <ProjectReference Include="$(SolutionDir)\..\vsprojects\vcxproj\.\gpr\gpr.vcxproj"> + <Project>{B23D3D1A-9438-4EDA-BEB6-9A0A03D17792}</Project> + </ProjectReference> + </ItemGroup> + <ItemGroup> + <None Include="packages.config" /> + </ItemGroup> + <Import Project="$(VCTargetsPath)\Microsoft.Cpp.targets" /> + <ImportGroup Label="ExtensionTargets"> + <Import Project="$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.zlib.redist.1.2.8.10\build\native\grpc.dependencies.zlib.redist.targets" Condition="Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.zlib.redist.1.2.8.10\build\native\grpc.dependencies\grpc.dependencies.zlib.targets')" /> + <Import Project="$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.zlib.1.2.8.10\build\native\grpc.dependencies.zlib.targets" Condition="Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.zlib.1.2.8.10\build\native\grpc.dependencies\grpc.dependencies.zlib.targets')" /> + <Import Project="$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.redist.1.0.204.1\build\native\grpc.dependencies.openssl.redist.targets" Condition="Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.redist.1.0.204.1\build\native\grpc.dependencies\grpc.dependencies.openssl.targets')" /> + <Import Project="$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.1.0.204.1\build\native\grpc.dependencies.openssl.targets" Condition="Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.1.0.204.1\build\native\grpc.dependencies\grpc.dependencies.openssl.targets')" /> + </ImportGroup> + <Target Name="EnsureNuGetPackageBuildImports" BeforeTargets="PrepareForBuild"> + <PropertyGroup> + <ErrorText>This project references NuGet package(s) that are missing on this computer. Enable NuGet Package Restore to download them. For more information, see http://go.microsoft.com/fwlink/?LinkID=322105. The missing file is {0}.</ErrorText> + </PropertyGroup> + <Error Condition="!Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.zlib.redist.1.2.8.10\build\native\grpc.dependencies.zlib.redist.targets')" Text="$([System.String]::Format('$(ErrorText)', '$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.zlib.redist.1.2.8.10\build\native\grpc.dependencies.zlib.redist.targets')" /> + <Error Condition="!Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.zlib.1.2.8.10\build\native\grpc.dependencies.zlib.targets')" Text="$([System.String]::Format('$(ErrorText)', '$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.zlib.1.2.8.10\build\native\grpc.dependencies.zlib.targets')" /> + <Error Condition="!Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.redist.1.0.204.1\build\native\grpc.dependencies.openssl.redist.targets')" Text="$([System.String]::Format('$(ErrorText)', '$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.redist.1.0.204.1\build\native\grpc.dependencies.openssl.redist.targets')" /> + <Error Condition="!Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.1.0.204.1\build\native\grpc.dependencies.openssl.props')" Text="$([System.String]::Format('$(ErrorText)', '$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.1.0.204.1\build\native\grpc.dependencies.openssl.props')" /> + <Error Condition="!Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.1.0.204.1\build\native\grpc.dependencies.openssl.targets')" Text="$([System.String]::Format('$(ErrorText)', '$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.1.0.204.1\build\native\grpc.dependencies.openssl.targets')" /> + </Target> +</Project> + diff --git a/vsprojects/vcxproj/test/byte_stream_test/byte_stream_test.vcxproj.filters b/vsprojects/vcxproj/test/byte_stream_test/byte_stream_test.vcxproj.filters new file mode 100644 index 0000000000000000000000000000000000000000..65e35b7429b9117f799620ab267109b63255ae8d --- /dev/null +++ b/vsprojects/vcxproj/test/byte_stream_test/byte_stream_test.vcxproj.filters @@ -0,0 +1,21 @@ +<?xml version="1.0" encoding="utf-8"?> +<Project ToolsVersion="4.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003"> + <ItemGroup> + <ClCompile Include="$(SolutionDir)\..\test\core\transport\byte_stream_test.c"> + <Filter>test\core\transport</Filter> + </ClCompile> + </ItemGroup> + + <ItemGroup> + <Filter Include="test"> + <UniqueIdentifier>{f172d292-4ad6-342a-f27a-096c06d43a31}</UniqueIdentifier> + </Filter> + <Filter Include="test\core"> + <UniqueIdentifier>{d7f690de-dfe0-56fc-ff3b-38eec3931699}</UniqueIdentifier> + </Filter> + <Filter Include="test\core\transport"> + <UniqueIdentifier>{f78f56ef-47df-c99d-18f0-86277f7013f3}</UniqueIdentifier> + </Filter> + </ItemGroup> +</Project> +