Skip to content
Snippets Groups Projects
Commit e70a0cc7 authored by Craig Tiller's avatar Craig Tiller
Browse files

Merge github.com:grpc/grpc into qps_driver

parents 10923c2f c82c36b4
No related branches found
No related tags found
No related merge requests found
Showing
with 75 additions and 119 deletions
[![Build Status](https://travis-ci.org/grpc/grpc.svg?branch=master)](https://travis-ci.org/grpc/grpc)
[gRPC - An RPC library and framework](http://github.com/grpc/grpc) [gRPC - An RPC library and framework](http://github.com/grpc/grpc)
=================================== ===================================
......
...@@ -1777,7 +1777,6 @@ ...@@ -1777,7 +1777,6 @@
], ],
"deps": [ "deps": [
"pubsub_client_lib", "pubsub_client_lib",
"grpc++_test_util",
"grpc_test_util", "grpc_test_util",
"grpc++", "grpc++",
"grpc", "grpc",
......
...@@ -41,6 +41,7 @@ ...@@ -41,6 +41,7 @@
#include <grpc/grpc.h> #include <grpc/grpc.h>
#include <grpc/support/log.h> #include <grpc/support/log.h>
#include <gflags/gflags.h> #include <gflags/gflags.h>
#include <grpc++/channel_arguments.h>
#include <grpc++/channel_interface.h> #include <grpc++/channel_interface.h>
#include <grpc++/create_channel.h> #include <grpc++/create_channel.h>
#include <grpc++/credentials.h> #include <grpc++/credentials.h>
...@@ -48,17 +49,11 @@ ...@@ -48,17 +49,11 @@
#include "examples/pubsub/publisher.h" #include "examples/pubsub/publisher.h"
#include "examples/pubsub/subscriber.h" #include "examples/pubsub/subscriber.h"
#include "test/cpp/util/create_test_channel.h"
DEFINE_int32(server_port, 443, "Server port."); DEFINE_int32(server_port, 443, "Server port.");
DEFINE_string(server_host, DEFINE_string(server_host,
"pubsub-staging.googleapis.com", "Server host to connect to"); "pubsub-staging.googleapis.com", "Server host to connect to");
DEFINE_string(project_id, "", "GCE project id such as stoked-keyword-656"); DEFINE_string(project_id, "", "GCE project id such as stoked-keyword-656");
DEFINE_string(service_account_key_file, "",
"Path to service account json key file.");
DEFINE_string(oauth_scope,
"https://www.googleapis.com/auth/cloud-platform",
"Scope for OAuth tokens.");
// In some distros, gflags is in the namespace google, and in some others, // In some distros, gflags is in the namespace google, and in some others,
// in gflags. This hack is enabling us to find both. // in gflags. This hack is enabling us to find both.
...@@ -75,17 +70,6 @@ const char kMessageData[] = "Test Data"; ...@@ -75,17 +70,6 @@ const char kMessageData[] = "Test Data";
} // namespace } // namespace
grpc::string GetServiceAccountJsonKey() {
grpc::string json_key;
if (json_key.empty()) {
std::ifstream json_key_file(FLAGS_service_account_key_file);
std::stringstream key_stream;
key_stream << json_key_file.rdbuf();
json_key = key_stream.str();
}
return json_key;
}
int main(int argc, char** argv) { int main(int argc, char** argv) {
grpc_init(); grpc_init();
ParseCommandLineFlags(&argc, &argv, true); ParseCommandLineFlags(&argc, &argv, true);
...@@ -93,23 +77,12 @@ int main(int argc, char** argv) { ...@@ -93,23 +77,12 @@ int main(int argc, char** argv) {
std::ostringstream ss; std::ostringstream ss;
std::unique_ptr<grpc::Credentials> creds;
if (FLAGS_service_account_key_file != "") {
grpc::string json_key = GetServiceAccountJsonKey();
creds = grpc::CredentialsFactory::ServiceAccountCredentials(
json_key, FLAGS_oauth_scope, std::chrono::hours(1));
} else {
creds = grpc::CredentialsFactory::ComputeEngineCredentials();
}
ss << FLAGS_server_host << ":" << FLAGS_server_port; ss << FLAGS_server_host << ":" << FLAGS_server_port;
std::shared_ptr<grpc::ChannelInterface> channel(
grpc::CreateTestChannel( std::unique_ptr<grpc::Credentials> creds =
ss.str(), grpc::CredentialsFactory::GoogleDefaultCredentials();
FLAGS_server_host, std::shared_ptr<grpc::ChannelInterface> channel =
true, // enable SSL grpc::CreateChannel(ss.str(), creds, grpc::ChannelArguments());
true, // use prod roots
creds));
grpc::examples::pubsub::Publisher publisher(channel); grpc::examples::pubsub::Publisher publisher(channel);
grpc::examples::pubsub::Subscriber subscriber(channel); grpc::examples::pubsub::Subscriber subscriber(channel);
...@@ -129,6 +102,7 @@ int main(int argc, char** argv) { ...@@ -129,6 +102,7 @@ int main(int argc, char** argv) {
subscription_name, &subscription_topic).IsOk()) { subscription_name, &subscription_topic).IsOk()) {
subscriber.DeleteSubscription(subscription_name); subscriber.DeleteSubscription(subscription_name);
} }
if (publisher.GetTopic(topic).IsOk()) publisher.DeleteTopic(topic); if (publisher.GetTopic(topic).IsOk()) publisher.DeleteTopic(topic);
grpc::Status s = publisher.CreateTopic(topic); grpc::Status s = publisher.CreateTopic(topic);
......
...@@ -49,7 +49,7 @@ class ClientAsyncResponseReader GRPC_FINAL { ...@@ -49,7 +49,7 @@ class ClientAsyncResponseReader GRPC_FINAL {
public: public:
ClientAsyncResponseReader(ChannelInterface* channel, CompletionQueue* cq, ClientAsyncResponseReader(ChannelInterface* channel, CompletionQueue* cq,
const RpcMethod& method, ClientContext* context, const RpcMethod& method, ClientContext* context,
const google::protobuf::Message& request, void* tag) const grpc::protobuf::Message& request, void* tag)
: context_(context), : context_(context),
call_(channel->CreateCall(method, context, cq)) { call_(channel->CreateCall(method, context, cq)) {
init_buf_.Reset(tag); init_buf_.Reset(tag);
...@@ -77,7 +77,6 @@ class ClientAsyncResponseReader GRPC_FINAL { ...@@ -77,7 +77,6 @@ class ClientAsyncResponseReader GRPC_FINAL {
call_.PerformOps(&finish_buf_); call_.PerformOps(&finish_buf_);
} }
private: private:
ClientContext* context_; ClientContext* context_;
Call call_; Call call_;
......
...@@ -37,12 +37,6 @@ ...@@ -37,12 +37,6 @@
#include <grpc++/status.h> #include <grpc++/status.h>
#include <grpc++/impl/call.h> #include <grpc++/impl/call.h>
namespace google {
namespace protobuf {
class Message;
} // namespace protobuf
} // namespace google
struct grpc_call; struct grpc_call;
namespace grpc { namespace grpc {
......
...@@ -47,12 +47,6 @@ using std::chrono::system_clock; ...@@ -47,12 +47,6 @@ using std::chrono::system_clock;
struct grpc_call; struct grpc_call;
struct grpc_completion_queue; struct grpc_completion_queue;
namespace google {
namespace protobuf {
class Message;
} // namespace protobuf
} // namespace google
namespace grpc { namespace grpc {
class CallOpBuffer; class CallOpBuffer;
......
...@@ -106,8 +106,8 @@ class CompletionQueue { ...@@ -106,8 +106,8 @@ class CompletionQueue {
friend Status BlockingUnaryCall(ChannelInterface *channel, friend Status BlockingUnaryCall(ChannelInterface *channel,
const RpcMethod &method, const RpcMethod &method,
ClientContext *context, ClientContext *context,
const google::protobuf::Message &request, const grpc::protobuf::Message &request,
google::protobuf::Message *result); grpc::protobuf::Message *result);
// Wraps grpc_completion_queue_pluck. // Wraps grpc_completion_queue_pluck.
// Cannot be mixed with calls to Next(). // Cannot be mixed with calls to Next().
......
...@@ -34,8 +34,6 @@ ...@@ -34,8 +34,6 @@
#ifndef GRPCXX_CONFIG_H #ifndef GRPCXX_CONFIG_H
#define GRPCXX_CONFIG_H #define GRPCXX_CONFIG_H
#include <string>
#ifdef GRPC_OLD_CXX #ifdef GRPC_OLD_CXX
#define GRPC_FINAL #define GRPC_FINAL
#define GRPC_OVERRIDE #define GRPC_OVERRIDE
...@@ -44,9 +42,23 @@ ...@@ -44,9 +42,23 @@
#define GRPC_OVERRIDE override #define GRPC_OVERRIDE override
#endif #endif
#ifndef GRPC_CUSTOM_STRING
#include <string>
#define GRPC_CUSTOM_STRING std::string
#endif
#ifndef GRPC_CUSTOM_MESSAGE
#include <google/protobuf/message.h>
#define GRPC_CUSTOM_MESSAGE ::google::protobuf::Message
#endif
namespace grpc { namespace grpc {
typedef std::string string; typedef GRPC_CUSTOM_STRING string;
namespace protobuf {
typedef GRPC_CUSTOM_MESSAGE Message;
} // namespace protobuf
} // namespace grpc } // namespace grpc
......
...@@ -42,12 +42,6 @@ ...@@ -42,12 +42,6 @@
#include <memory> #include <memory>
#include <map> #include <map>
namespace google {
namespace protobuf {
class Message;
} // namespace protobuf
} // namespace google
struct grpc_call; struct grpc_call;
struct grpc_op; struct grpc_op;
...@@ -67,8 +61,8 @@ class CallOpBuffer : public CompletionQueueTag { ...@@ -67,8 +61,8 @@ class CallOpBuffer : public CompletionQueueTag {
std::multimap<grpc::string, grpc::string> *metadata); std::multimap<grpc::string, grpc::string> *metadata);
void AddSendInitialMetadata(ClientContext *ctx); void AddSendInitialMetadata(ClientContext *ctx);
void AddRecvInitialMetadata(ClientContext *ctx); void AddRecvInitialMetadata(ClientContext *ctx);
void AddSendMessage(const google::protobuf::Message &message); void AddSendMessage(const grpc::protobuf::Message &message);
void AddRecvMessage(google::protobuf::Message *message); void AddRecvMessage(grpc::protobuf::Message *message);
void AddClientSendClose(); void AddClientSendClose();
void AddClientRecvStatus(ClientContext *ctx, Status *status); void AddClientRecvStatus(ClientContext *ctx, Status *status);
void AddServerSendStatus(std::multimap<grpc::string, grpc::string> *metadata, void AddServerSendStatus(std::multimap<grpc::string, grpc::string> *metadata,
...@@ -95,10 +89,10 @@ class CallOpBuffer : public CompletionQueueTag { ...@@ -95,10 +89,10 @@ class CallOpBuffer : public CompletionQueueTag {
std::multimap<grpc::string, grpc::string> *recv_initial_metadata_; std::multimap<grpc::string, grpc::string> *recv_initial_metadata_;
grpc_metadata_array recv_initial_metadata_arr_; grpc_metadata_array recv_initial_metadata_arr_;
// Send message // Send message
const google::protobuf::Message *send_message_; const grpc::protobuf::Message *send_message_;
grpc_byte_buffer *send_message_buf_; grpc_byte_buffer *send_message_buf_;
// Recv message // Recv message
google::protobuf::Message *recv_message_; grpc::protobuf::Message *recv_message_;
grpc_byte_buffer *recv_message_buf_; grpc_byte_buffer *recv_message_buf_;
// Client send close // Client send close
bool client_send_close_; bool client_send_close_;
......
...@@ -34,11 +34,7 @@ ...@@ -34,11 +34,7 @@
#ifndef GRPCXX_IMPL_CLIENT_UNARY_CALL_H #ifndef GRPCXX_IMPL_CLIENT_UNARY_CALL_H
#define GRPCXX_IMPL_CLIENT_UNARY_CALL_H #define GRPCXX_IMPL_CLIENT_UNARY_CALL_H
namespace google { #include <grpc++/config.h>
namespace protobuf {
class Message;
} // namespace protobuf
} // namespace google
namespace grpc { namespace grpc {
...@@ -51,8 +47,8 @@ class Status; ...@@ -51,8 +47,8 @@ class Status;
// Wrapper that performs a blocking unary call // Wrapper that performs a blocking unary call
Status BlockingUnaryCall(ChannelInterface *channel, const RpcMethod &method, Status BlockingUnaryCall(ChannelInterface *channel, const RpcMethod &method,
ClientContext *context, ClientContext *context,
const google::protobuf::Message &request, const grpc::protobuf::Message &request,
google::protobuf::Message *result); grpc::protobuf::Message *result);
} // namespace grpc } // namespace grpc
......
...@@ -34,12 +34,6 @@ ...@@ -34,12 +34,6 @@
#ifndef GRPCXX_IMPL_RPC_METHOD_H #ifndef GRPCXX_IMPL_RPC_METHOD_H
#define GRPCXX_IMPL_RPC_METHOD_H #define GRPCXX_IMPL_RPC_METHOD_H
namespace google {
namespace protobuf {
class Message;
} // namespace protobuf
} // namespace google
namespace grpc { namespace grpc {
class RpcMethod { class RpcMethod {
......
...@@ -56,13 +56,13 @@ class MethodHandler { ...@@ -56,13 +56,13 @@ class MethodHandler {
virtual ~MethodHandler() {} virtual ~MethodHandler() {}
struct HandlerParameter { struct HandlerParameter {
HandlerParameter(Call* c, ServerContext* context, HandlerParameter(Call* c, ServerContext* context,
const google::protobuf::Message* req, const grpc::protobuf::Message* req,
google::protobuf::Message* resp) grpc::protobuf::Message* resp)
: call(c), server_context(context), request(req), response(resp) {} : call(c), server_context(context), request(req), response(resp) {}
Call* call; Call* call;
ServerContext* server_context; ServerContext* server_context;
const google::protobuf::Message* request; const grpc::protobuf::Message* request;
google::protobuf::Message* response; grpc::protobuf::Message* response;
}; };
virtual Status RunHandler(const HandlerParameter& param) = 0; virtual Status RunHandler(const HandlerParameter& param) = 0;
}; };
...@@ -165,8 +165,8 @@ class RpcServiceMethod : public RpcMethod { ...@@ -165,8 +165,8 @@ class RpcServiceMethod : public RpcMethod {
// Takes ownership of the handler and two prototype objects. // Takes ownership of the handler and two prototype objects.
RpcServiceMethod(const char* name, RpcMethod::RpcType type, RpcServiceMethod(const char* name, RpcMethod::RpcType type,
MethodHandler* handler, MethodHandler* handler,
google::protobuf::Message* request_prototype, grpc::protobuf::Message* request_prototype,
google::protobuf::Message* response_prototype) grpc::protobuf::Message* response_prototype)
: RpcMethod(name, type), : RpcMethod(name, type),
handler_(handler), handler_(handler),
request_prototype_(request_prototype), request_prototype_(request_prototype),
...@@ -174,17 +174,17 @@ class RpcServiceMethod : public RpcMethod { ...@@ -174,17 +174,17 @@ class RpcServiceMethod : public RpcMethod {
MethodHandler* handler() { return handler_.get(); } MethodHandler* handler() { return handler_.get(); }
google::protobuf::Message* AllocateRequestProto() { grpc::protobuf::Message* AllocateRequestProto() {
return request_prototype_->New(); return request_prototype_->New();
} }
google::protobuf::Message* AllocateResponseProto() { grpc::protobuf::Message* AllocateResponseProto() {
return response_prototype_->New(); return response_prototype_->New();
} }
private: private:
std::unique_ptr<MethodHandler> handler_; std::unique_ptr<MethodHandler> handler_;
std::unique_ptr<google::protobuf::Message> request_prototype_; std::unique_ptr<grpc::protobuf::Message> request_prototype_;
std::unique_ptr<google::protobuf::Message> response_prototype_; std::unique_ptr<grpc::protobuf::Message> response_prototype_;
}; };
// This class contains all the method information for an rpc service. It is // This class contains all the method information for an rpc service. It is
......
...@@ -34,11 +34,7 @@ ...@@ -34,11 +34,7 @@
#ifndef GRPCXX_IMPL_SERVICE_TYPE_H #ifndef GRPCXX_IMPL_SERVICE_TYPE_H
#define GRPCXX_IMPL_SERVICE_TYPE_H #define GRPCXX_IMPL_SERVICE_TYPE_H
namespace google { #include <grpc++/config.h>
namespace protobuf {
class Message;
} // namespace protobuf
} // namespace google
namespace grpc { namespace grpc {
...@@ -72,7 +68,7 @@ class AsynchronousService { ...@@ -72,7 +68,7 @@ class AsynchronousService {
public: public:
virtual void RequestAsyncCall(void* registered_method, virtual void RequestAsyncCall(void* registered_method,
ServerContext* context, ServerContext* context,
::google::protobuf::Message* request, ::grpc::protobuf::Message* request,
ServerAsyncStreamingInterface* stream, ServerAsyncStreamingInterface* stream,
CompletionQueue* cq, void* tag) = 0; CompletionQueue* cq, void* tag) = 0;
}; };
...@@ -91,7 +87,7 @@ class AsynchronousService { ...@@ -91,7 +87,7 @@ class AsynchronousService {
protected: protected:
void RequestAsyncUnary(int index, ServerContext* context, void RequestAsyncUnary(int index, ServerContext* context,
::google::protobuf::Message* request, grpc::protobuf::Message* request,
ServerAsyncStreamingInterface* stream, ServerAsyncStreamingInterface* stream,
CompletionQueue* cq, void* tag) { CompletionQueue* cq, void* tag) {
dispatch_impl_->RequestAsyncCall(request_args_[index], context, request, dispatch_impl_->RequestAsyncCall(request_args_[index], context, request,
...@@ -104,7 +100,7 @@ class AsynchronousService { ...@@ -104,7 +100,7 @@ class AsynchronousService {
stream, cq, tag); stream, cq, tag);
} }
void RequestServerStreaming(int index, ServerContext* context, void RequestServerStreaming(int index, ServerContext* context,
::google::protobuf::Message* request, grpc::protobuf::Message* request,
ServerAsyncStreamingInterface* stream, ServerAsyncStreamingInterface* stream,
CompletionQueue* cq, void* tag) { CompletionQueue* cq, void* tag) {
dispatch_impl_->RequestAsyncCall(request_args_[index], context, request, dispatch_impl_->RequestAsyncCall(request_args_[index], context, request,
......
...@@ -47,12 +47,6 @@ ...@@ -47,12 +47,6 @@
struct grpc_server; struct grpc_server;
namespace google {
namespace protobuf {
class Message;
} // namespace protobuf
} // namespace google
namespace grpc { namespace grpc {
class AsynchronousService; class AsynchronousService;
class RpcService; class RpcService;
...@@ -101,7 +95,7 @@ class Server GRPC_FINAL : private CallHook, ...@@ -101,7 +95,7 @@ class Server GRPC_FINAL : private CallHook,
// DispatchImpl // DispatchImpl
void RequestAsyncCall(void* registered_method, ServerContext* context, void RequestAsyncCall(void* registered_method, ServerContext* context,
::google::protobuf::Message* request, grpc::protobuf::Message* request,
ServerAsyncStreamingInterface* stream, ServerAsyncStreamingInterface* stream,
CompletionQueue* cq, void* tag); CompletionQueue* cq, void* tag);
......
...@@ -88,7 +88,7 @@ class ClientReader GRPC_FINAL : public ClientStreamingInterface, ...@@ -88,7 +88,7 @@ class ClientReader GRPC_FINAL : public ClientStreamingInterface,
public: public:
// Blocking create a stream and write the first request out. // Blocking create a stream and write the first request out.
ClientReader(ChannelInterface* channel, const RpcMethod& method, ClientReader(ChannelInterface* channel, const RpcMethod& method,
ClientContext* context, const google::protobuf::Message& request) ClientContext* context, const grpc::protobuf::Message& request)
: context_(context), call_(channel->CreateCall(method, context, &cq_)) { : context_(context), call_(channel->CreateCall(method, context, &cq_)) {
CallOpBuffer buf; CallOpBuffer buf;
buf.AddSendInitialMetadata(&context->send_initial_metadata_); buf.AddSendInitialMetadata(&context->send_initial_metadata_);
...@@ -142,7 +142,7 @@ class ClientWriter GRPC_FINAL : public ClientStreamingInterface, ...@@ -142,7 +142,7 @@ class ClientWriter GRPC_FINAL : public ClientStreamingInterface,
public: public:
// Blocking create a stream. // Blocking create a stream.
ClientWriter(ChannelInterface* channel, const RpcMethod& method, ClientWriter(ChannelInterface* channel, const RpcMethod& method,
ClientContext* context, google::protobuf::Message* response) ClientContext* context, grpc::protobuf::Message* response)
: context_(context), : context_(context),
response_(response), response_(response),
call_(channel->CreateCall(method, context, &cq_)) { call_(channel->CreateCall(method, context, &cq_)) {
...@@ -179,7 +179,7 @@ class ClientWriter GRPC_FINAL : public ClientStreamingInterface, ...@@ -179,7 +179,7 @@ class ClientWriter GRPC_FINAL : public ClientStreamingInterface,
private: private:
ClientContext* context_; ClientContext* context_;
google::protobuf::Message* const response_; grpc::protobuf::Message* const response_;
CompletionQueue cq_; CompletionQueue cq_;
Call call_; Call call_;
}; };
...@@ -386,7 +386,7 @@ class ClientAsyncReader GRPC_FINAL : public ClientAsyncStreamingInterface, ...@@ -386,7 +386,7 @@ class ClientAsyncReader GRPC_FINAL : public ClientAsyncStreamingInterface,
// Create a stream and write the first request out. // Create a stream and write the first request out.
ClientAsyncReader(ChannelInterface* channel, CompletionQueue* cq, ClientAsyncReader(ChannelInterface* channel, CompletionQueue* cq,
const RpcMethod& method, ClientContext* context, const RpcMethod& method, ClientContext* context,
const google::protobuf::Message& request, void* tag) const grpc::protobuf::Message& request, void* tag)
: context_(context), call_(channel->CreateCall(method, context, cq)) { : context_(context), call_(channel->CreateCall(method, context, cq)) {
init_buf_.Reset(tag); init_buf_.Reset(tag);
init_buf_.AddSendInitialMetadata(&context->send_initial_metadata_); init_buf_.AddSendInitialMetadata(&context->send_initial_metadata_);
...@@ -436,7 +436,7 @@ class ClientAsyncWriter GRPC_FINAL : public ClientAsyncStreamingInterface, ...@@ -436,7 +436,7 @@ class ClientAsyncWriter GRPC_FINAL : public ClientAsyncStreamingInterface,
public: public:
ClientAsyncWriter(ChannelInterface* channel, CompletionQueue* cq, ClientAsyncWriter(ChannelInterface* channel, CompletionQueue* cq,
const RpcMethod& method, ClientContext* context, const RpcMethod& method, ClientContext* context,
google::protobuf::Message* response, void* tag) grpc::protobuf::Message* response, void* tag)
: context_(context), : context_(context),
response_(response), response_(response),
call_(channel->CreateCall(method, context, cq)) { call_(channel->CreateCall(method, context, cq)) {
...@@ -477,7 +477,7 @@ class ClientAsyncWriter GRPC_FINAL : public ClientAsyncStreamingInterface, ...@@ -477,7 +477,7 @@ class ClientAsyncWriter GRPC_FINAL : public ClientAsyncStreamingInterface,
private: private:
ClientContext* context_; ClientContext* context_;
google::protobuf::Message* const response_; grpc::protobuf::Message* const response_;
Call call_; Call call_;
CallOpBuffer init_buf_; CallOpBuffer init_buf_;
CallOpBuffer meta_buf_; CallOpBuffer meta_buf_;
......
...@@ -267,7 +267,6 @@ static void unary_poll_do_promote(void *args, int success) { ...@@ -267,7 +267,6 @@ static void unary_poll_do_promote(void *args, int success) {
* and we don't have any mechanism to unbecome multipoller. */ * and we don't have any mechanism to unbecome multipoller. */
pollset->in_flight_cbs--; pollset->in_flight_cbs--;
if (pollset->shutting_down) { if (pollset->shutting_down) {
gpr_log(GPR_INFO, "Shutting down");
/* We don't care about this pollset anymore. */ /* We don't care about this pollset anymore. */
if (pollset->in_flight_cbs == 0) { if (pollset->in_flight_cbs == 0) {
do_shutdown_cb = 1; do_shutdown_cb = 1;
...@@ -275,7 +274,6 @@ static void unary_poll_do_promote(void *args, int success) { ...@@ -275,7 +274,6 @@ static void unary_poll_do_promote(void *args, int success) {
} else if (grpc_fd_is_orphaned(fd)) { } else if (grpc_fd_is_orphaned(fd)) {
/* Don't try to add it to anything, we'll drop our ref on it below */ /* Don't try to add it to anything, we'll drop our ref on it below */
} else if (pollset->vtable != original_vtable) { } else if (pollset->vtable != original_vtable) {
gpr_log(GPR_INFO, "Not original vtable");
pollset->vtable->add_fd(pollset, fd); pollset->vtable->add_fd(pollset, fd);
} else if (fd != pollset->data.ptr) { } else if (fd != pollset->data.ptr) {
grpc_fd *fds[2]; grpc_fd *fds[2];
......
...@@ -140,6 +140,8 @@ struct grpc_call { ...@@ -140,6 +140,8 @@ struct grpc_call {
gpr_uint8 have_alarm; gpr_uint8 have_alarm;
/* are we currently performing a send operation */ /* are we currently performing a send operation */
gpr_uint8 sending; gpr_uint8 sending;
/* are we currently completing requests */
gpr_uint8 completing;
/* pairs with completed_requests */ /* pairs with completed_requests */
gpr_uint8 num_completed_requests; gpr_uint8 num_completed_requests;
/* flag that we need to request more data */ /* flag that we need to request more data */
...@@ -357,7 +359,7 @@ static void lock(grpc_call *call) { gpr_mu_lock(&call->mu); } ...@@ -357,7 +359,7 @@ static void lock(grpc_call *call) { gpr_mu_lock(&call->mu); }
static void unlock(grpc_call *call) { static void unlock(grpc_call *call) {
send_action sa = SEND_NOTHING; send_action sa = SEND_NOTHING;
completed_request completed_requests[GRPC_IOREQ_OP_COUNT]; completed_request completed_requests[GRPC_IOREQ_OP_COUNT];
int num_completed_requests = call->num_completed_requests; int completing_requests = 0;
int need_more_data = int need_more_data =
call->need_more_data && call->need_more_data &&
(call->write_state >= WRITE_STATE_STARTED || !call->is_client); (call->write_state >= WRITE_STATE_STARTED || !call->is_client);
...@@ -367,10 +369,12 @@ static void unlock(grpc_call *call) { ...@@ -367,10 +369,12 @@ static void unlock(grpc_call *call) {
call->need_more_data = 0; call->need_more_data = 0;
} }
if (num_completed_requests != 0) { if (!call->completing && call->num_completed_requests != 0) {
completing_requests = call->num_completed_requests;
memcpy(completed_requests, call->completed_requests, memcpy(completed_requests, call->completed_requests,
sizeof(completed_requests)); sizeof(completed_requests));
call->num_completed_requests = 0; call->num_completed_requests = 0;
call->completing = 1;
} }
if (!call->sending) { if (!call->sending) {
...@@ -391,9 +395,14 @@ static void unlock(grpc_call *call) { ...@@ -391,9 +395,14 @@ static void unlock(grpc_call *call) {
enact_send_action(call, sa); enact_send_action(call, sa);
} }
for (i = 0; i < num_completed_requests; i++) { if (completing_requests > 0) {
completed_requests[i].on_complete(call, completed_requests[i].status, for (i = 0; i < completing_requests; i++) {
completed_requests[i].user_data); completed_requests[i].on_complete(call, completed_requests[i].status,
completed_requests[i].user_data);
}
lock(call);
call->completing = 0;
unlock(call);
} }
} }
......
...@@ -44,8 +44,8 @@ namespace grpc { ...@@ -44,8 +44,8 @@ namespace grpc {
// Wrapper that performs a blocking unary call // Wrapper that performs a blocking unary call
Status BlockingUnaryCall(ChannelInterface *channel, const RpcMethod &method, Status BlockingUnaryCall(ChannelInterface *channel, const RpcMethod &method,
ClientContext *context, ClientContext *context,
const google::protobuf::Message &request, const grpc::protobuf::Message &request,
google::protobuf::Message *result) { grpc::protobuf::Message *result) {
CompletionQueue cq; CompletionQueue cq;
Call call(channel->CreateCall(method, context, &cq)); Call call(channel->CreateCall(method, context, &cq));
CallOpBuffer buf; CallOpBuffer buf;
......
...@@ -47,7 +47,8 @@ grpc_credentials *Credentials::GetRawCreds() { return creds_; } ...@@ -47,7 +47,8 @@ grpc_credentials *Credentials::GetRawCreds() { return creds_; }
std::unique_ptr<Credentials> CredentialsFactory::GoogleDefaultCredentials() { std::unique_ptr<Credentials> CredentialsFactory::GoogleDefaultCredentials() {
grpc_credentials *c_creds = grpc_google_default_credentials_create(); grpc_credentials *c_creds = grpc_google_default_credentials_create();
std::unique_ptr<Credentials> cpp_creds(new Credentials(c_creds)); std::unique_ptr<Credentials> cpp_creds(
c_creds == nullptr ? nullptr : new Credentials(c_creds));
return cpp_creds; return cpp_creds;
} }
......
...@@ -163,11 +163,11 @@ void CallOpBuffer::AddSendInitialMetadata(ClientContext* ctx) { ...@@ -163,11 +163,11 @@ void CallOpBuffer::AddSendInitialMetadata(ClientContext* ctx) {
AddSendInitialMetadata(&ctx->send_initial_metadata_); AddSendInitialMetadata(&ctx->send_initial_metadata_);
} }
void CallOpBuffer::AddSendMessage(const google::protobuf::Message& message) { void CallOpBuffer::AddSendMessage(const grpc::protobuf::Message& message) {
send_message_ = &message; send_message_ = &message;
} }
void CallOpBuffer::AddRecvMessage(google::protobuf::Message* message) { void CallOpBuffer::AddRecvMessage(grpc::protobuf::Message* message) {
recv_message_ = message; recv_message_ = message;
recv_message_->Clear(); recv_message_->Clear();
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment