diff --git a/include/grpc++/security/auth_metadata_processor.h b/include/grpc++/security/auth_metadata_processor.h index 18ad9223217334d068821e84fdbd3043075dfdd5..9b9c06e3b63758aa465ab72c52f5f533b90b2af7 100644 --- a/include/grpc++/security/auth_metadata_processor.h +++ b/include/grpc++/security/auth_metadata_processor.h @@ -45,7 +45,7 @@ namespace grpc { class AuthMetadataProcessor { public: typedef std::multimap<grpc::string_ref, grpc::string_ref> InputMetadata; - typedef std::multimap<grpc::string, grpc::string_ref> OutputMetadata; + typedef std::multimap<grpc::string, grpc::string> OutputMetadata; virtual ~AuthMetadataProcessor() {} diff --git a/include/grpc++/security/credentials.h b/include/grpc++/security/credentials.h index fafcfdc90618b48b557d486ca52d212c3b9fec00..ff41bc597eb368544da896b1889d086af070cfbc 100644 --- a/include/grpc++/security/credentials.h +++ b/include/grpc++/security/credentials.h @@ -180,7 +180,7 @@ class MetadataCredentialsPlugin { // Gets the auth metatada produced by this plugin. virtual Status GetMetadata( grpc::string_ref service_url, - std::multimap<grpc::string, grpc::string_ref>* metadata) = 0; + std::multimap<grpc::string, grpc::string>* metadata) = 0; }; std::shared_ptr<Credentials> MetadataCredentialsFromPlugin( diff --git a/src/core/channel/channel_args.h b/src/core/channel/channel_args.h index f9e7b05860674d2c6e28db64eb0b9aa9478f746b..480cc9aec2fc0e0ae4b4c0667e5816a1168974d3 100644 --- a/src/core/channel/channel_args.h +++ b/src/core/channel/channel_args.h @@ -71,7 +71,7 @@ grpc_channel_args *grpc_channel_args_set_compression_algorithm( * compression algorithms are enabled. It's an error to disable an algorithm set * by grpc_channel_args_set_compression_algorithm. * - * Returns an instance will the updated algorithm states. The \a a pointer is + * Returns an instance with the updated algorithm states. The \a a pointer is * modified to point to the returned instance (which may be different from the * input value of \a a). */ grpc_channel_args *grpc_channel_args_compression_algorithm_set_state( diff --git a/src/core/client_config/resolvers/zookeeper_resolver.c b/src/core/client_config/resolvers/zookeeper_resolver.c index 213d5a172fc61a650d32626e45cf248c94f05374..f640a0084adcf6495e91d031475f40056ba906ee 100644 --- a/src/core/client_config/resolvers/zookeeper_resolver.c +++ b/src/core/client_config/resolvers/zookeeper_resolver.c @@ -87,24 +87,27 @@ typedef struct { int resolved_num; } zookeeper_resolver; -static void zookeeper_destroy(grpc_resolver *r); +static void zookeeper_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *r); static void zookeeper_start_resolving_locked(zookeeper_resolver *r); -static grpc_closure *zookeeper_maybe_finish_next_locked(zookeeper_resolver *r) - GRPC_MUST_USE_RESULT; +static void zookeeper_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx, + zookeeper_resolver *r); -static void zookeeper_shutdown(grpc_resolver *r); -static void zookeeper_channel_saw_error(grpc_resolver *r, +static void zookeeper_shutdown(grpc_exec_ctx *exec_ctx, grpc_resolver *r); +static void zookeeper_channel_saw_error(grpc_exec_ctx *exec_ctx, + grpc_resolver *r, struct sockaddr *failing_address, int failing_address_len); -static void zookeeper_next(grpc_resolver *r, grpc_client_config **target_config, +static void zookeeper_next(grpc_exec_ctx *exec_ctx, grpc_resolver *r, + grpc_client_config **target_config, grpc_closure *on_complete); static const grpc_resolver_vtable zookeeper_resolver_vtable = { zookeeper_destroy, zookeeper_shutdown, zookeeper_channel_saw_error, zookeeper_next}; -static void zookeeper_shutdown(grpc_resolver *resolver) { +static void zookeeper_shutdown(grpc_exec_ctx *exec_ctx, + grpc_resolver *resolver) { zookeeper_resolver *r = (zookeeper_resolver *)resolver; grpc_closure *call = NULL; gpr_mu_lock(&r->mu); @@ -116,11 +119,12 @@ static void zookeeper_shutdown(grpc_resolver *resolver) { zookeeper_close(r->zookeeper_handle); gpr_mu_unlock(&r->mu); if (call != NULL) { - call->cb(call->cb_arg, 1); + call->cb(exec_ctx, call->cb_arg, 1); } } -static void zookeeper_channel_saw_error(grpc_resolver *resolver, +static void zookeeper_channel_saw_error(grpc_exec_ctx *exec_ctx, + grpc_resolver *resolver, struct sockaddr *sa, int len) { zookeeper_resolver *r = (zookeeper_resolver *)resolver; gpr_mu_lock(&r->mu); @@ -130,11 +134,10 @@ static void zookeeper_channel_saw_error(grpc_resolver *resolver, gpr_mu_unlock(&r->mu); } -static void zookeeper_next(grpc_resolver *resolver, +static void zookeeper_next(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver, grpc_client_config **target_config, grpc_closure *on_complete) { zookeeper_resolver *r = (zookeeper_resolver *)resolver; - grpc_closure *call; gpr_mu_lock(&r->mu); GPR_ASSERT(r->next_completion == NULL); r->next_completion = on_complete; @@ -142,10 +145,9 @@ static void zookeeper_next(grpc_resolver *resolver, if (r->resolved_version == 0 && r->resolving == 0) { zookeeper_start_resolving_locked(r); } else { - call = zookeeper_maybe_finish_next_locked(r); + zookeeper_maybe_finish_next_locked(exec_ctx, r); } gpr_mu_unlock(&r->mu); - if (call) call->cb(call->cb_arg, 1); } /** Zookeeper global watcher for connection management @@ -180,14 +182,13 @@ static void zookeeper_watcher(zhandle_t *zookeeper_handle, int type, int state, /** Callback function after getting all resolved addresses Creates a subchannel for each address */ -static void zookeeper_on_resolved(void *arg, +static void zookeeper_on_resolved(grpc_exec_ctx *exec_ctx, void *arg, grpc_resolved_addresses *addresses) { zookeeper_resolver *r = arg; grpc_client_config *config = NULL; grpc_subchannel **subchannels; grpc_subchannel_args args; grpc_lb_policy *lb_policy; - grpc_closure *call; size_t i; if (addresses != NULL) { grpc_lb_policy_args lb_policy_args; @@ -198,13 +199,13 @@ static void zookeeper_on_resolved(void *arg, args.addr = (struct sockaddr *)(addresses->addrs[i].addr); args.addr_len = addresses->addrs[i].len; subchannels[i] = grpc_subchannel_factory_create_subchannel( - r->subchannel_factory, &args); + exec_ctx, r->subchannel_factory, &args); } lb_policy_args.subchannels = subchannels; lb_policy_args.num_subchannels = addresses->naddrs; lb_policy = grpc_lb_policy_create(r->lb_policy_name, &lb_policy_args); grpc_client_config_set_lb_policy(config, lb_policy); - GRPC_LB_POLICY_UNREF(lb_policy, "construction"); + GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "construction"); grpc_resolved_addresses_destroy(addresses); gpr_free(subchannels); } @@ -212,20 +213,18 @@ static void zookeeper_on_resolved(void *arg, GPR_ASSERT(r->resolving == 1); r->resolving = 0; if (r->resolved_config != NULL) { - grpc_client_config_unref(r->resolved_config); + grpc_client_config_unref(exec_ctx, r->resolved_config); } r->resolved_config = config; r->resolved_version++; - call = zookeeper_maybe_finish_next_locked(r); + zookeeper_maybe_finish_next_locked(exec_ctx, r); gpr_mu_unlock(&r->mu); - if (call) call->cb(call->cb_arg, 1); - - GRPC_RESOLVER_UNREF(&r->base, "zookeeper-resolving"); + GRPC_RESOLVER_UNREF(exec_ctx, &r->base, "zookeeper-resolving"); } /** Callback function for each DNS resolved address */ -static void zookeeper_dns_resolved(void *arg, +static void zookeeper_dns_resolved(grpc_exec_ctx *exec_ctx, void *arg, grpc_resolved_addresses *addresses) { size_t i; zookeeper_resolver *r = arg; @@ -251,7 +250,7 @@ static void zookeeper_dns_resolved(void *arg, resolve_done = (r->resolved_num == r->resolved_total); gpr_mu_unlock(&r->mu); if (resolve_done) { - zookeeper_on_resolved(r, r->resolved_addrs); + zookeeper_on_resolved(exec_ctx, r, r->resolved_addrs); } } @@ -300,9 +299,11 @@ static void zookeeper_get_children_node_completion(int rc, const char *value, char *address = NULL; zookeeper_resolver *r = (zookeeper_resolver *)arg; int resolve_done = 0; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; if (rc != 0) { gpr_log(GPR_ERROR, "Error in getting a child node of %s", r->name); + grpc_exec_ctx_finish(&exec_ctx); return; } @@ -318,9 +319,11 @@ static void zookeeper_get_children_node_completion(int rc, const char *value, resolve_done = (r->resolved_num == r->resolved_total); gpr_mu_unlock(&r->mu); if (resolve_done) { - zookeeper_on_resolved(r, r->resolved_addrs); + zookeeper_on_resolved(&exec_ctx, r, r->resolved_addrs); } } + + grpc_exec_ctx_finish(&exec_ctx); } static void zookeeper_get_children_completion( @@ -411,28 +414,27 @@ static void zookeeper_start_resolving_locked(zookeeper_resolver *r) { zookeeper_resolve_address(r); } -static grpc_closure *zookeeper_maybe_finish_next_locked(zookeeper_resolver *r) { - grpc_closure *call = NULL; +static void zookeeper_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx, + zookeeper_resolver *r) { if (r->next_completion != NULL && r->resolved_version != r->published_version) { *r->target_config = r->resolved_config; if (r->resolved_config != NULL) { grpc_client_config_ref(r->resolved_config); } - call = r->next_completion; + grpc_exec_ctx_enqueue(exec_ctx, r->next_completion, 1); r->next_completion = NULL; r->published_version = r->resolved_version; } - return call; } -static void zookeeper_destroy(grpc_resolver *gr) { +static void zookeeper_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *gr) { zookeeper_resolver *r = (zookeeper_resolver *)gr; gpr_mu_destroy(&r->mu); if (r->resolved_config != NULL) { - grpc_client_config_unref(r->resolved_config); + grpc_client_config_unref(exec_ctx, r->resolved_config); } - grpc_subchannel_factory_unref(r->subchannel_factory); + grpc_subchannel_factory_unref(exec_ctx, r->subchannel_factory); gpr_free(r->name); gpr_free(r->lb_policy_name); gpr_free(r); diff --git a/src/core/iomgr/exec_ctx.h b/src/core/iomgr/exec_ctx.h index 0cebc1665ef59cbe664e2d99ca54726dc19d0cbd..bfa08c41dd774e85862030a627dfd81f49be7bf2 100644 --- a/src/core/iomgr/exec_ctx.h +++ b/src/core/iomgr/exec_ctx.h @@ -36,6 +36,23 @@ #include "src/core/iomgr/closure.h" +/** Execution context. + * A bag of data that collects information along a callstack. + * Generally created at public API entry points, and passed down as + * pointer to child functions that manipulate it. + * + * Specific responsibilities (this may grow in the future): + * - track a list of work that needs to be delayed until the top of the + * call stack (this provides a convenient mechanism to run callbacks + * without worrying about locking issues) + * + * CONVENTIONS: + * Instance of this must ALWAYS be constructed on the stack, never + * heap allocated. Instances and pointers to them must always be called + * exec_ctx. Instances are always passed as the first argument + * to a function that takes it, and always as a pointer (grpc_exec_ctx + * is never copied). + */ struct grpc_exec_ctx { grpc_closure_list closure_list; }; @@ -43,10 +60,17 @@ struct grpc_exec_ctx { #define GRPC_EXEC_CTX_INIT \ { GRPC_CLOSURE_LIST_INIT } -void grpc_exec_ctx_finish(grpc_exec_ctx *exec_ctx); +/** Flush any work that has been enqueued onto this grpc_exec_ctx. + * Caller must guarantee that no interfering locks are held. */ void grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx); +/** Finish any pending work for a grpc_exec_ctx. Must be called before + * the instance is destroyed, or work may be lost. */ +void grpc_exec_ctx_finish(grpc_exec_ctx *exec_ctx); +/** Add a closure to be executed at the next flush/finish point */ void grpc_exec_ctx_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure, int success); +/** Add a list of closures to be executed at the next flush/finish point. + * Leaves \a list empty. */ void grpc_exec_ctx_enqueue_list(grpc_exec_ctx *exec_ctx, grpc_closure_list *list); diff --git a/src/core/iomgr/tcp_client_windows.c b/src/core/iomgr/tcp_client_windows.c index 4dfab3b9545fbe303080a06edf2e8fd4e84404ed..15fe30fb72beac940b6bf6131924af3dbade9a5b 100644 --- a/src/core/iomgr/tcp_client_windows.c +++ b/src/core/iomgr/tcp_client_windows.c @@ -179,7 +179,7 @@ void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *on_done, socket = grpc_winsocket_create(sock, "client"); info = &socket->write_info; - success = ConnectEx(sock, addr, addr_len, NULL, 0, NULL, &info->overlapped); + success = ConnectEx(sock, addr, (int)addr_len, NULL, 0, NULL, &info->overlapped); /* It wouldn't be unusual to get a success immediately. But we'll still get an IOCP notification, so let's ignore it. */ diff --git a/src/core/iomgr/tcp_server_windows.c b/src/core/iomgr/tcp_server_windows.c index 9881a411527024eef77f4ea89f95a6fa439657f6..55cd1a5d6eb5fba9ba74f9a775b7c599ba9fed1e 100644 --- a/src/core/iomgr/tcp_server_windows.c +++ b/src/core/iomgr/tcp_server_windows.c @@ -152,7 +152,7 @@ void grpc_tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s, /* Prepare (bind) a recently-created socket for listening. */ static int prepare_socket(SOCKET sock, const struct sockaddr *addr, - int addr_len) { + size_t addr_len) { struct sockaddr_storage sockname_temp; socklen_t sockname_len; @@ -165,7 +165,7 @@ static int prepare_socket(SOCKET sock, const struct sockaddr *addr, goto error; } - if (bind(sock, addr, addr_len) == SOCKET_ERROR) { + if (bind(sock, addr, (int)addr_len) == SOCKET_ERROR) { char *addr_str; char *utf8_message = gpr_format_message(WSAGetLastError()); grpc_sockaddr_to_string(&addr_str, addr, 0); @@ -349,7 +349,7 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, int from_iocp) { } static int add_socket_to_server(grpc_tcp_server *s, SOCKET sock, - const struct sockaddr *addr, int addr_len) { + const struct sockaddr *addr, size_t addr_len) { server_port *sp; int port; int status; diff --git a/src/cpp/client/secure_credentials.cc b/src/cpp/client/secure_credentials.cc index 99b7468e86530aee997a9b2c9a14bd6f15a3dfb5..1693cf740bd4dca8645832333d1f79f484ba9dbd 100644 --- a/src/cpp/client/secure_credentials.cc +++ b/src/cpp/client/secure_credentials.cc @@ -173,7 +173,7 @@ void MetadataCredentialsPluginWrapper::GetMetadata( void MetadataCredentialsPluginWrapper::InvokePlugin( const char* service_url, grpc_credentials_plugin_metadata_cb cb, void* user_data) { - std::multimap<grpc::string, grpc::string_ref> metadata; + std::multimap<grpc::string, grpc::string> metadata; Status status = plugin_->GetMetadata(service_url, &metadata); std::vector<grpc_metadata> md; for (auto it = metadata.begin(); it != metadata.end(); ++it) { diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc index 7f047998c3367e717a038cf68dc77dbc707e9100..2659b0e2132f3435d8372021f6eec00df2a72eb8 100644 --- a/test/cpp/end2end/end2end_test.cc +++ b/test/cpp/end2end/end2end_test.cc @@ -121,7 +121,7 @@ class TestMetadataCredentialsPlugin : public MetadataCredentialsPlugin { bool IsBlocking() const GRPC_OVERRIDE { return is_blocking_; } Status GetMetadata(grpc::string_ref service_url, - std::multimap<grpc::string, grpc::string_ref>* metadata) + std::multimap<grpc::string, grpc::string>* metadata) GRPC_OVERRIDE { EXPECT_GT(service_url.length(), 0UL); EXPECT_TRUE(metadata != nullptr); @@ -175,9 +175,9 @@ class TestAuthMetadataProcessor : public AuthMetadataProcessor { if (auth_md_value == kGoodGuy) { context->AddProperty(kIdentityPropName, kGoodGuy); context->SetPeerIdentityPropertyName(kIdentityPropName); - consumed_auth_metadata->insert( - std::make_pair(string(auth_md->first.data(), auth_md->first.length()), - auth_md->second)); + consumed_auth_metadata->insert(std::make_pair( + string(auth_md->first.data(), auth_md->first.length()), + string(auth_md->second.data(), auth_md->second.length()))); return Status::OK; } else { return Status(StatusCode::UNAUTHENTICATED,