Skip to content
Snippets Groups Projects
Commit 18c04775 authored by Vijay Pai's avatar Vijay Pai
Browse files

Got rid of all compilation problems, now need to test

parent e4886680
No related branches found
No related tags found
No related merge requests found
......@@ -55,6 +55,10 @@ class ByteBuffer GRPC_FINAL {
/// Construct buffer from \a slices, of which there are \a nslices.
ByteBuffer(const Slice* slices, size_t nslices);
/// Constuct a byte buffer by referencing elements of existing buffer
/// \a buf. Wrapper of core function grpc_byte_buffer_copy
ByteBuffer(const ByteBuffer&buf);
~ByteBuffer();
/// Dump (read) the buffer contents into \a slices.
......@@ -72,7 +76,6 @@ class ByteBuffer GRPC_FINAL {
private:
friend class SerializationTraits<ByteBuffer, void>;
ByteBuffer(const ByteBuffer&);
ByteBuffer& operator=(const ByteBuffer&);
// takes ownership
......
......@@ -85,4 +85,8 @@ void ByteBuffer::MoveFrom(ByteBuffer* bbuf) {
bbuf->buffer_ = nullptr;
}
ByteBuffer::ByteBuffer(const ByteBuffer& buf):
buffer_(grpc_byte_buffer_copy(buf.buffer_)) {
}
} // namespace grpc
......@@ -69,23 +69,29 @@ namespace testing {
typedef std::chrono::high_resolution_clock grpc_time_source;
typedef std::chrono::time_point<grpc_time_source> grpc_time;
namespace ClientRequestCreation {
template <class RequestType>
void CreateRequest(RequestType *req, const PayloadConfig&) {
class ClientRequestCreator {
public:
ClientRequestCreator(RequestType* req, const PayloadConfig&) {
// this template must be specialized
// fail with an assertion rather than a compile-time
// check since these only happen at the beginning anyway
GPR_ASSERT(false);
}
};
template <>
void CreateRequest<SimpleRequest>(SimpleRequest *req, const PayloadConfig& payload_config) {
class ClientRequestCreator<SimpleRequest> {
public:
ClientRequestCreator(SimpleRequest* req,
const PayloadConfig& payload_config) {
if (payload_config.has_bytebuf_params()) {
GPR_ASSERT(false); // not appropriate for this specialization
} else if (payload_config.has_simple_params()) {
req->set_response_type(grpc::testing::PayloadType::COMPRESSABLE);
req->set_response_size(payload_config.simple_params().resp_size());
req->mutable_payload()->set_type(grpc::testing::PayloadType::COMPRESSABLE);
req->mutable_payload()->set_type(
grpc::testing::PayloadType::COMPRESSABLE);
int size = payload_config.simple_params().req_size();
std::unique_ptr<char[]> body(new char[size]);
req->mutable_payload()->set_body(body.get(), size);
......@@ -95,19 +101,22 @@ void CreateRequest<SimpleRequest>(SimpleRequest *req, const PayloadConfig& paylo
// default should be simple proto without payloads
req->set_response_type(grpc::testing::PayloadType::COMPRESSABLE);
req->set_response_size(0);
req->mutable_payload()->set_type(grpc::testing::PayloadType::COMPRESSABLE);
req->mutable_payload()->set_type(
grpc::testing::PayloadType::COMPRESSABLE);
}
}
};
template <>
void CreateRequest<ByteBuffer>(ByteBuffer *req,
const PayloadConfig& payload_config) {
class ClientRequestCreator<ByteBuffer> {
public:
ClientRequestCreator(ByteBuffer* req, const PayloadConfig& payload_config) {
if (payload_config.has_bytebuf_params()) {
if (payload_config.bytebuf_params().req_size() > 0) {
std::unique_ptr<char>
buf(new char[payload_config.bytebuf_params().req_size()]);
gpr_slice s =
gpr_slice_from_copied_buffer(buf.get(),
payload_config.bytebuf_params().req_size());
std::unique_ptr<char> buf(
new char[payload_config.bytebuf_params().req_size()]);
gpr_slice s = gpr_slice_from_copied_buffer(
buf.get(), payload_config.bytebuf_params().req_size());
Slice slice(s, Slice::STEAL_REF);
std::unique_ptr<ByteBuffer> bbuf(new ByteBuffer(&slice, 1));
req->MoveFrom(bbuf.get());
......@@ -116,12 +125,12 @@ void CreateRequest<ByteBuffer>(ByteBuffer *req,
}
}
}
}
};
class Client {
public:
Client() : timer_(new Timer), interarrival_timer_() {}
virtual ~Client();
virtual ~Client() {}
ClientStats Mark(bool reset) {
Histogram latencies;
......@@ -156,6 +165,7 @@ class Client {
stats.set_time_user(timer_result.user);
return stats;
}
protected:
bool closed_loop_;
......@@ -227,6 +237,7 @@ class Client {
return true;
}
}
private:
class Thread {
public:
......@@ -309,15 +320,16 @@ template <class StubType, class RequestType>
class ClientImpl : public Client {
public:
ClientImpl(const ClientConfig& config,
std::function<std::unique_ptr<StubType>(std::shared_ptr<Channel>)> create_stub)
: channels_(config.client_channels()),
create_stub_(create_stub) {
std::function<std::unique_ptr<StubType>(std::shared_ptr<Channel>)>
create_stub)
: channels_(config.client_channels()), create_stub_(create_stub) {
for (int i = 0; i < config.client_channels(); i++) {
channels_[i].init(config.server_targets(i % config.server_targets_size()),
config);
config, create_stub_);
}
ClientRequestCreation::CreateRequest<RequestType>(&request_, config.payload_config());
ClientRequestCreator<RequestType> create_req(&request_,
config.payload_config());
}
virtual ~ClientImpl() {}
......@@ -333,7 +345,9 @@ class ClientImpl : public Client {
// used for empty entries
GPR_ASSERT(!i.channel_ && !i.stub_);
}
void init(const grpc::string& target, const ClientConfig& config) {
void init(const grpc::string& target, const ClientConfig& config,
std::function<std::unique_ptr<StubType>(std::shared_ptr<Channel>)>
create_stub) {
// We have to use a 2-phase init like this with a default
// constructor followed by an initializer function to make
// old compilers happy with using this in std::vector
......@@ -341,7 +355,7 @@ class ClientImpl : public Client {
target, config.security_params().server_host_override(),
config.has_security_params(),
!config.security_params().use_test_ca());
stub_ = create_stub_(channel_);
stub_ = create_stub(channel_);
}
Channel* get_channel() { return channel_.get(); }
StubType* get_stub() { return stub_.get(); }
......@@ -351,7 +365,8 @@ class ClientImpl : public Client {
std::unique_ptr<StubType> stub_;
};
std::vector<ClientChannelInfo> channels_;
std::function<std::unique_ptr<StubType>(std::shared_ptr<Channel>)> create_stub_;
std::function<std::unique_ptr<StubType>(const std::shared_ptr<Channel>&)>
create_stub_;
};
std::unique_ptr<Client> CreateSynchronousUnaryClient(const ClientConfig& args);
......
......@@ -37,20 +37,20 @@
#include <list>
#include <memory>
#include <mutex>
#include <sstream>
#include <string>
#include <thread>
#include <vector>
#include <sstream>
#include <grpc/grpc.h>
#include <grpc/support/histogram.h>
#include <grpc/support/log.h>
#include <gflags/gflags.h>
#include <grpc++/client_context.h>
#include <grpc++/generic/generic_stub.h>
#include <grpc/grpc.h>
#include <grpc/support/histogram.h>
#include <grpc/support/log.h>
#include "test/cpp/qps/timer.h"
#include "test/cpp/qps/client.h"
#include "test/cpp/qps/timer.h"
#include "test/cpp/util/create_test_channel.h"
#include "test/proto/benchmarks/services.grpc.pb.h"
......@@ -93,7 +93,8 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext {
std::function<
std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>(
BenchmarkService::Stub*, grpc::ClientContext*, const RequestType&,
CompletionQueue*)> start_req,
CompletionQueue*)>
start_req,
std::function<void(grpc::Status, ResponseType*)> on_done)
: ClientRpcContext(channel_id),
context_(),
......@@ -139,7 +140,8 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext {
std::function<void(grpc::Status, ResponseType*)> callback_;
std::function<std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>(
BenchmarkService::Stub*, grpc::ClientContext*, const RequestType&,
CompletionQueue*)> start_req_;
CompletionQueue*)>
start_req_;
grpc::Status status_;
double start_;
std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>
......@@ -160,8 +162,10 @@ class AsyncClient : public ClientImpl<StubType, RequestType> {
using ClientImpl<StubType, RequestType>::request_;
AsyncClient(
const ClientConfig& config,
std::function<ClientRpcContext*(int, StubType*, const RequestType&)> setup_ctx,
std::function<std::unique_ptr<StubType>(std::shared_ptr<Channel>)> create_stub)
std::function<ClientRpcContext*(int, StubType*, const RequestType&)>
setup_ctx,
std::function<std::unique_ptr<StubType>(std::shared_ptr<Channel>)>
create_stub)
: ClientImpl<StubType, RequestType>(config, create_stub),
channel_lock_(new std::mutex[config.client_channels()]),
contexts_(config.client_channels()),
......@@ -352,11 +356,16 @@ class AsyncClient : public ClientImpl<StubType, RequestType> {
int pref_channel_inc_;
};
class AsyncUnaryClient GRPC_FINAL :
public AsyncClient<BenchmarkService::Stub, SimpleRequest> {
static std::unique_ptr<BenchmarkService::Stub> BenchmarkStubCreator(
std::shared_ptr<Channel> ch) {
return BenchmarkService::NewStub(ch);
}
class AsyncUnaryClient GRPC_FINAL
: public AsyncClient<BenchmarkService::Stub, SimpleRequest> {
public:
explicit AsyncUnaryClient(const ClientConfig& config)
: AsyncClient(config, SetupCtx, BenchmarkService::NewStub) {
: AsyncClient(config, SetupCtx, BenchmarkStubCreator) {
StartThreads(config.async_client_threads());
}
~AsyncUnaryClient() GRPC_OVERRIDE { EndThreads(); }
......@@ -385,7 +394,8 @@ class ClientRpcContextStreamingImpl : public ClientRpcContext {
std::function<std::unique_ptr<
grpc::ClientAsyncReaderWriter<RequestType, ResponseType>>(
BenchmarkService::Stub*, grpc::ClientContext*, CompletionQueue*,
void*)> start_req,
void*)>
start_req,
std::function<void(grpc::Status, ResponseType*)> on_done)
: ClientRpcContext(channel_id),
context_(),
......@@ -437,20 +447,21 @@ class ClientRpcContextStreamingImpl : public ClientRpcContext {
ResponseType response_;
bool (ClientRpcContextStreamingImpl::*next_state_)(bool, Histogram*);
std::function<void(grpc::Status, ResponseType*)> callback_;
std::function<
std::unique_ptr<grpc::ClientAsyncReaderWriter<RequestType, ResponseType>>(
BenchmarkService::Stub*, grpc::ClientContext*, CompletionQueue*,
void*)> start_req_;
std::function<std::unique_ptr<
grpc::ClientAsyncReaderWriter<RequestType, ResponseType>>(
BenchmarkService::Stub*, grpc::ClientContext*, CompletionQueue*, void*)>
start_req_;
grpc::Status status_;
double start_;
std::unique_ptr<grpc::ClientAsyncReaderWriter<RequestType, ResponseType>>
stream_;
};
class AsyncStreamingClient GRPC_FINAL : public AsyncClient<BenchmarkService::Stub, SimpleRequest> {
class AsyncStreamingClient GRPC_FINAL
: public AsyncClient<BenchmarkService::Stub, SimpleRequest> {
public:
explicit AsyncStreamingClient(const ClientConfig& config)
: AsyncClient(config, SetupCtx, BenchmarkService::NewStub) {
: AsyncClient(config, SetupCtx, BenchmarkStubCreator) {
// async streaming currently only supports closed loop
GPR_ASSERT(closed_loop_);
......@@ -481,10 +492,10 @@ class ClientGenericRpcContextStreamingImpl : public ClientRpcContext {
public:
ClientGenericRpcContextStreamingImpl(
int channel_id, grpc::GenericStub* stub, const ByteBuffer& req,
std::function<std::unique_ptr<
grpc::GenericClientAsyncReaderWriter>(
grpc::GenericStub*, grpc::ClientContext*, const grpc::string& method_name,
CompletionQueue*, void*)> start_req,
std::function<std::unique_ptr<grpc::GenericClientAsyncReaderWriter>(
grpc::GenericStub*, grpc::ClientContext*,
const grpc::string& method_name, CompletionQueue*, void*)>
start_req,
std::function<void(grpc::Status, ByteBuffer*)> on_done)
: ClientRpcContext(channel_id),
context_(),
......@@ -504,8 +515,10 @@ class ClientGenericRpcContextStreamingImpl : public ClientRpcContext {
start_req_, callback_);
}
void Start(CompletionQueue* cq) GRPC_OVERRIDE {
const grpc::string kMethodName("/grpc.testing.BenchmarkService/StreamingCall");
stream_ = start_req_(stub_, &context_, kMethodName, cq, ClientRpcContext::tag(this));
const grpc::string kMethodName(
"/grpc.testing.BenchmarkService/StreamingCall");
stream_ = start_req_(stub_, &context_, kMethodName, cq,
ClientRpcContext::tag(this));
}
private:
......@@ -537,19 +550,25 @@ class ClientGenericRpcContextStreamingImpl : public ClientRpcContext {
ByteBuffer response_;
bool (ClientGenericRpcContextStreamingImpl::*next_state_)(bool, Histogram*);
std::function<void(grpc::Status, ByteBuffer*)> callback_;
std::function<
std::unique_ptr<grpc::GenericClientAsyncReaderWriter>(
grpc::GenericStub*, grpc::ClientContext*, const grpc::string&, CompletionQueue*,
void*)> start_req_;
std::function<std::unique_ptr<grpc::GenericClientAsyncReaderWriter>(
grpc::GenericStub*, grpc::ClientContext*, const grpc::string&,
CompletionQueue*, void*)>
start_req_;
grpc::Status status_;
double start_;
std::unique_ptr<grpc::GenericClientAsyncReaderWriter> stream_;
};
class GenericAsyncStreamingClient GRPC_FINAL : public AsyncClient<grpc::GenericStub, ByteBuffer> {
static std::unique_ptr<grpc::GenericStub> GenericStubCreator(
std::shared_ptr<Channel> ch) {
return std::unique_ptr<grpc::GenericStub>(new grpc::GenericStub(ch));
}
class GenericAsyncStreamingClient GRPC_FINAL
: public AsyncClient<grpc::GenericStub, ByteBuffer> {
public:
explicit GenericAsyncStreamingClient(const ClientConfig& config)
: AsyncClient(config, SetupCtx, grpc::GenericStub) {
: AsyncClient(config, SetupCtx, GenericStubCreator) {
// async streaming currently only supports closed loop
GPR_ASSERT(closed_loop_);
......@@ -560,14 +579,13 @@ class GenericAsyncStreamingClient GRPC_FINAL : public AsyncClient<grpc::GenericS
private:
static void CheckDone(grpc::Status s, ByteBuffer* response) {}
static std::unique_ptr<grpc::GenericClientAsyncReaderWriter>
StartReq(grpc::GenericStub* stub, grpc::ClientContext* ctx,
static std::unique_ptr<grpc::GenericClientAsyncReaderWriter> StartReq(
grpc::GenericStub* stub, grpc::ClientContext* ctx,
const grpc::string& method_name, CompletionQueue* cq, void* tag) {
auto stream = stub->Call(ctx, method_name, cq, tag);
return stream;
};
static ClientRpcContext* SetupCtx(int channel_id,
grpc::GenericStub* stub,
static ClientRpcContext* SetupCtx(int channel_id, grpc::GenericStub* stub,
const ByteBuffer& req) {
return new ClientGenericRpcContextStreamingImpl(
channel_id, stub, req, GenericAsyncStreamingClient::StartReq,
......@@ -581,7 +599,8 @@ std::unique_ptr<Client> CreateAsyncUnaryClient(const ClientConfig& args) {
std::unique_ptr<Client> CreateAsyncStreamingClient(const ClientConfig& args) {
return std::unique_ptr<Client>(new AsyncStreamingClient(args));
}
std::unique_ptr<Client> CreateGenericAsyncStreamingClient(const ClientConfig& args) {
std::unique_ptr<Client> CreateGenericAsyncStreamingClient(
const ClientConfig& args) {
return std::unique_ptr<Client>(new GenericAsyncStreamingClient(args));
}
......
......@@ -35,28 +35,28 @@
#include <chrono>
#include <memory>
#include <mutex>
#include <sstream>
#include <string>
#include <thread>
#include <vector>
#include <sstream>
#include <gflags/gflags.h>
#include <grpc++/client_context.h>
#include <grpc++/server.h>
#include <grpc++/server_builder.h>
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
#include <grpc/support/histogram.h>
#include <grpc/support/host_port.h>
#include <grpc/support/log.h>
#include <grpc/support/time.h>
#include <grpc++/client_context.h>
#include <grpc++/server.h>
#include <grpc++/server_builder.h>
#include <gtest/gtest.h>
#include "test/cpp/util/create_test_channel.h"
#include "test/cpp/qps/client.h"
#include "test/cpp/qps/histogram.h"
#include "test/cpp/qps/interarrival.h"
#include "test/cpp/qps/timer.h"
#include "test/cpp/util/create_test_channel.h"
#include "test/proto/benchmarks/services.grpc.pb.h"
#include "src/core/profiling/timers.h"
......@@ -64,12 +64,17 @@
namespace grpc {
namespace testing {
class SynchronousClient :
public ClientImpl<BenchmarkService::Stub, SimpleRequest> {
static std::unique_ptr<BenchmarkService::Stub> BenchmarkStubCreator(
std::shared_ptr<Channel> ch) {
return BenchmarkService::NewStub(ch);
}
class SynchronousClient
: public ClientImpl<BenchmarkService::Stub, SimpleRequest> {
public:
SynchronousClient(const ClientConfig& config) :
ClientImpl<BenchmarkService::Stub,
SimpleRequest>(config, BenchmarkService::NewStub) {
SynchronousClient(const ClientConfig& config)
: ClientImpl<BenchmarkService::Stub, SimpleRequest>(
config, BenchmarkStubCreator) {
num_threads_ =
config.outstanding_rpcs_per_channel() * config.client_channels();
responses_.resize(num_threads_);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment