Skip to content
Snippets Groups Projects
client_async.cc 19.7 KiB
Newer Older
Craig Tiller's avatar
Craig Tiller committed
 * Copyright 2015, Google Inc.
 * All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions are
 * met:
 *
 *     * Redistributions of source code must retain the above copyright
 * notice, this list of conditions and the following disclaimer.
 *     * Redistributions in binary form must reproduce the above
 * copyright notice, this list of conditions and the following disclaimer
 * in the documentation and/or other materials provided with the
 * distribution.
 *     * Neither the name of Google Inc. nor the names of its
 * contributors may be used to endorse or promote products derived from
 * this software without specific prior written permission.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 *
 */

#include <string>
#include <thread>
#include <vector>

#include <grpc++/alarm.h>
Alistair Veitch's avatar
Alistair Veitch committed
#include <grpc++/channel.h>
#include <grpc++/client_context.h>
Vijay Pai's avatar
Vijay Pai committed
#include <grpc++/generic/generic_stub.h>
#include <grpc/grpc.h>
Alistair Veitch's avatar
Alistair Veitch committed
#include <grpc/support/cpu.h>
#include <grpc/support/log.h>
yang-g's avatar
yang-g committed

Alistair Veitch's avatar
Alistair Veitch committed
#include "src/proto/grpc/testing/services.grpc.pb.h"
Craig Tiller's avatar
Craig Tiller committed
#include "test/cpp/qps/client.h"
#include "test/cpp/qps/usage_timer.h"
yang-g's avatar
yang-g committed
#include "test/cpp/util/create_test_channel.h"
Craig Tiller's avatar
Craig Tiller committed
namespace grpc {
namespace testing {
Vijay Pai's avatar
Vijay Pai committed
class ClientRpcContext {
 public:
  ClientRpcContext() {}
Vijay Pai's avatar
Vijay Pai committed
  virtual ~ClientRpcContext() {}
  // next state, return false if done. Collect stats when appropriate
  virtual bool RunNextState(bool, HistogramEntry* entry) = 0;
  virtual ClientRpcContext* StartNewClone() = 0;
Yang Gao's avatar
Yang Gao committed
  static void* tag(ClientRpcContext* c) { return reinterpret_cast<void*>(c); }
  static ClientRpcContext* detag(void* t) {
    return reinterpret_cast<ClientRpcContext*>(t);

  virtual void Start(CompletionQueue* cq) = 0;
Vijay Pai's avatar
Vijay Pai committed
};
Craig Tiller's avatar
Craig Tiller committed

Vijay Pai's avatar
Vijay Pai committed
template <class RequestType, class ResponseType>
class ClientRpcContextUnaryImpl : public ClientRpcContext {
 public:
Vijay Pai's avatar
Vijay Pai committed
  ClientRpcContextUnaryImpl(
      BenchmarkService::Stub* stub, const RequestType& req,
      std::function<gpr_timespec()> next_issue,
          std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>(
              BenchmarkService::Stub*, grpc::ClientContext*, const RequestType&,
Jan Tattermusch's avatar
Jan Tattermusch committed
              CompletionQueue*)>
          start_req,
Yang Gao's avatar
Yang Gao committed
      std::function<void(grpc::Status, ResponseType*)> on_done)
Craig Tiller's avatar
Craig Tiller committed
        stub_(stub),
Vijay Pai's avatar
Vijay Pai committed
        req_(req),
        response_(),
        next_state_(State::READY),
        callback_(on_done),
        next_issue_(next_issue),
        start_req_(start_req) {}
  ~ClientRpcContextUnaryImpl() GRPC_OVERRIDE {}
  void Start(CompletionQueue* cq) GRPC_OVERRIDE {
    if (!next_issue_) {  // ready to issue
      RunNextState(true, nullptr);
    } else {  // wait for the issue time
      alarm_.reset(new Alarm(cq_, next_issue_(), ClientRpcContext::tag(this)));
    }
  bool RunNextState(bool ok, HistogramEntry* entry) GRPC_OVERRIDE {
    switch (next_state_) {
      case State::READY:
vjpai's avatar
vjpai committed
        start_ = UsageTimer::Now();
        response_reader_ = start_req_(stub_, &context_, req_, cq_);
        response_reader_->Finish(&response_, &status_,
                                 ClientRpcContext::tag(this));
        next_state_ = State::RESP_DONE;
        return true;
      case State::RESP_DONE:
        entry->set_value((UsageTimer::Now() - start_) * 1e9);
        callback_(status_, &response_);
        next_state_ = State::INVALID;
        return false;
      default:
        GPR_ASSERT(false);
        return false;
  ClientRpcContext* StartNewClone() GRPC_OVERRIDE {
    return new ClientRpcContextUnaryImpl(stub_, req_, next_issue_, start_req_,
                                         callback_);
Craig Tiller's avatar
Craig Tiller committed
  }
Vijay Pai's avatar
Vijay Pai committed
 private:
  grpc::ClientContext context_;
  BenchmarkService::Stub* stub_;
  CompletionQueue* cq_;
  std::unique_ptr<Alarm> alarm_;
Vijay Pai's avatar
Vijay Pai committed
  RequestType req_;
  ResponseType response_;
  enum State { INVALID, READY, RESP_DONE };
  State next_state_;
Yang Gao's avatar
Yang Gao committed
  std::function<void(grpc::Status, ResponseType*)> callback_;
  std::function<gpr_timespec()> next_issue_;
Craig Tiller's avatar
Craig Tiller committed
  std::function<std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>(
      BenchmarkService::Stub*, grpc::ClientContext*, const RequestType&,
Craig Tiller's avatar
Craig Tiller committed
      CompletionQueue*)>
      start_req_;
Vijay Pai's avatar
Vijay Pai committed
  grpc::Status status_;
  double start_;
  std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>
      response_reader_;
};
typedef std::forward_list<ClientRpcContext*> context_list;

vjpai's avatar
vjpai committed
template <class StubType, class RequestType>
Vijay Pai's avatar
Vijay Pai committed
class AsyncClient : public ClientImpl<StubType, RequestType> {
  // Specify which protected members we are using since there is no
  // member name resolution until the template types are fully resolved
Craig Tiller's avatar
Craig Tiller committed
 public:
Vijay Pai's avatar
Vijay Pai committed
  using Client::SetupLoadTest;
  using Client::closed_loop_;
  using Client::NextIssuer;
  using ClientImpl<StubType, RequestType>::cores_;
  using ClientImpl<StubType, RequestType>::channels_;
  using ClientImpl<StubType, RequestType>::request_;
  AsyncClient(const ClientConfig& config,
              std::function<ClientRpcContext*(
                  StubType*, std::function<gpr_timespec()> next_issue,
Jan Tattermusch's avatar
Jan Tattermusch committed
                  const RequestType&)>
                  setup_ctx,
              std::function<std::unique_ptr<StubType>(std::shared_ptr<Channel>)>
                  create_stub)
      : ClientImpl<StubType, RequestType>(config, create_stub),
        num_async_threads_(NumThreads(config)) {
Alistair Veitch's avatar
Alistair Veitch committed
    SetupLoadTest(config, num_async_threads_);
Alistair Veitch's avatar
Alistair Veitch committed
    for (int i = 0; i < num_async_threads_; i++) {
Craig Tiller's avatar
Craig Tiller committed
      cli_cqs_.emplace_back(new CompletionQueue);
      next_issuers_.emplace_back(NextIssuer(i));
      shutdown_state_.emplace_back(new PerThreadShutdownState());
Craig Tiller's avatar
Craig Tiller committed
    }
    using namespace std::placeholders;
vjpai's avatar
vjpai committed
    int t = 0;
    for (int ch = 0; ch < config.client_channels(); ch++) {
      for (int i = 0; i < config.outstanding_rpcs_per_channel(); i++) {
        auto* cq = cli_cqs_[t].get();
        auto ctx =
            setup_ctx(channels_[ch].get_stub(), next_issuers_[t], request_);
Craig Tiller's avatar
Craig Tiller committed
      }
      t = (t + 1) % cli_cqs_.size();
    for (auto cq = cli_cqs_.begin(); cq != cli_cqs_.end(); cq++) {
      void* got_tag;
      bool ok;
      while ((*cq)->Next(&got_tag, &ok)) {
        delete ClientRpcContext::detag(got_tag);
      }
    }
Craig Tiller's avatar
Craig Tiller committed
  }
 protected:
  const int num_async_threads_;

 private:
  struct PerThreadShutdownState {
    mutable std::mutex mutex;
    bool shutdown;
    PerThreadShutdownState() : shutdown(false) {}
  };

  int NumThreads(const ClientConfig& config) {
    int num_threads = config.async_client_threads();
    if (num_threads <= 0) {  // Use dynamic sizing
      num_threads = cores_;
      gpr_log(GPR_INFO, "Sizing async client to %d threads", num_threads);
    }
    return num_threads;
  }
  void DestroyMultithreading() GRPC_OVERRIDE GRPC_FINAL {
    for (auto ss = shutdown_state_.begin(); ss != shutdown_state_.end(); ++ss) {
      std::lock_guard<std::mutex> lock((*ss)->mutex);
      (*ss)->shutdown = true;
    }
    for (auto cq = cli_cqs_.begin(); cq != cli_cqs_.end(); cq++) {
      (*cq)->Shutdown();
    }
    this->EndThreads(); // this needed for resolution
  }
Craig Tiller's avatar
Craig Tiller committed

  bool ThreadFunc(HistogramEntry* entry,
                  size_t thread_idx) GRPC_OVERRIDE GRPC_FINAL {
Yang Gao's avatar
Yang Gao committed
    void* got_tag;
Craig Tiller's avatar
Craig Tiller committed
    bool ok;
    if (cli_cqs_[thread_idx]->Next(&got_tag, &ok)) {
      // Got a regular event, so process it
      ClientRpcContext* ctx = ClientRpcContext::detag(got_tag);
      // Proceed while holding a lock to make sure that
      // this thread isn't supposed to shut down
      std::lock_guard<std::mutex> l(shutdown_state_[thread_idx]->mutex);
      if (shutdown_state_[thread_idx]->shutdown) {
        return true;
      } else if (!ctx->RunNextState(ok, entry)) {
        // The RPC and callback are done, so clone the ctx
        // and kickstart the new one
        auto clone = ctx->StartNewClone();
        clone->Start(cli_cqs_[thread_idx].get());
        // delete the old version
        delete ctx;
      }
      return true;
    } else {  // queue is shutting down, so we must be done
      return true;
Vijay Pai's avatar
Vijay Pai committed
    }
Craig Tiller's avatar
Craig Tiller committed
  }
Craig Tiller's avatar
Craig Tiller committed
  std::vector<std::unique_ptr<CompletionQueue>> cli_cqs_;
  std::vector<std::function<gpr_timespec()>> next_issuers_;
  std::vector<std::unique_ptr<PerThreadShutdownState>> shutdown_state_;
static std::unique_ptr<BenchmarkService::Stub> BenchmarkStubCreator(
    std::shared_ptr<Channel> ch) {
  return BenchmarkService::NewStub(ch);
}

class AsyncUnaryClient GRPC_FINAL
    : public AsyncClient<BenchmarkService::Stub, SimpleRequest> {
  explicit AsyncUnaryClient(const ClientConfig& config)
      : AsyncClient<BenchmarkService::Stub, SimpleRequest>(
            config, SetupCtx, BenchmarkStubCreator) {
Alistair Veitch's avatar
Alistair Veitch committed
    StartThreads(num_async_threads_);
  ~AsyncUnaryClient() GRPC_OVERRIDE {}
Vijay Pai's avatar
Vijay Pai committed

 private:
  static void CheckDone(grpc::Status s, SimpleResponse* response) {}
  static std::unique_ptr<grpc::ClientAsyncResponseReader<SimpleResponse>>
  StartReq(BenchmarkService::Stub* stub, grpc::ClientContext* ctx,
Vijay Pai's avatar
Vijay Pai committed
           const SimpleRequest& request, CompletionQueue* cq) {
    return stub->AsyncUnaryCall(ctx, request, cq);
  };
  static ClientRpcContext* SetupCtx(BenchmarkService::Stub* stub,
                                    std::function<gpr_timespec()> next_issue,
                                    const SimpleRequest& req) {
Vijay Pai's avatar
Vijay Pai committed
    return new ClientRpcContextUnaryImpl<SimpleRequest, SimpleResponse>(
        stub, req, next_issue, AsyncUnaryClient::StartReq,
Vijay Pai's avatar
Vijay Pai committed
        AsyncUnaryClient::CheckDone);
template <class RequestType, class ResponseType>
class ClientRpcContextStreamingImpl : public ClientRpcContext {
 public:
Vijay Pai's avatar
Vijay Pai committed
  ClientRpcContextStreamingImpl(
      BenchmarkService::Stub* stub, const RequestType& req,
      std::function<gpr_timespec()> next_issue,
Vijay Pai's avatar
Vijay Pai committed
      std::function<std::unique_ptr<
          grpc::ClientAsyncReaderWriter<RequestType, ResponseType>>(
          BenchmarkService::Stub*, grpc::ClientContext*, CompletionQueue*,
Jan Tattermusch's avatar
Jan Tattermusch committed
          void*)>
          start_req,
      std::function<void(grpc::Status, ResponseType*)> on_done)
        req_(req),
        response_(),
        next_state_(State::INVALID),
        callback_(on_done),
        next_issue_(next_issue),
vjpai's avatar
vjpai committed
        start_req_(start_req) {}
  ~ClientRpcContextStreamingImpl() GRPC_OVERRIDE {}
  void Start(CompletionQueue* cq) GRPC_OVERRIDE {
    cq_ = cq;
    stream_ = start_req_(stub_, &context_, cq, ClientRpcContext::tag(this));
    next_state_ = State::STREAM_IDLE;
  }
  bool RunNextState(bool ok, HistogramEntry* entry) GRPC_OVERRIDE {
    while (true) {
      switch (next_state_) {
        case State::STREAM_IDLE:
          if (!next_issue_) {  // ready to issue
            next_state_ = State::READY_TO_WRITE;
          } else {
            next_state_ = State::WAIT;
          }
          break;  // loop around, don't return
          alarm_.reset(
              new Alarm(cq_, next_issue_(), ClientRpcContext::tag(this)));
          next_state_ = State::READY_TO_WRITE;
          return true;
        case State::READY_TO_WRITE:
          if (!ok) {
            return false;
          }
vjpai's avatar
vjpai committed
          start_ = UsageTimer::Now();
          next_state_ = State::WRITE_DONE;
          stream_->Write(req_, ClientRpcContext::tag(this));
          return true;
        case State::WRITE_DONE:
          if (!ok) {
            return false;
          }
          next_state_ = State::READ_DONE;
          stream_->Read(&response_, ClientRpcContext::tag(this));
          return true;
          break;
        case State::READ_DONE:
          entry->set_value((UsageTimer::Now() - start_) * 1e9);
          callback_(status_, &response_);
          next_state_ = State::STREAM_IDLE;
          break;  // loop around
        default:
          GPR_ASSERT(false);
          return false;
      }
    }
  ClientRpcContext* StartNewClone() GRPC_OVERRIDE {
    return new ClientRpcContextStreamingImpl(stub_, req_, next_issue_,
Vijay Pai's avatar
Vijay Pai committed

 private:
  grpc::ClientContext context_;
  BenchmarkService::Stub* stub_;
  CompletionQueue* cq_;
  std::unique_ptr<Alarm> alarm_;
  RequestType req_;
  ResponseType response_;
  enum State {
    INVALID,
    STREAM_IDLE,
    WAIT,
    READY_TO_WRITE,
    WRITE_DONE,
    READ_DONE
  };
  State next_state_;
  std::function<void(grpc::Status, ResponseType*)> callback_;
  std::function<gpr_timespec()> next_issue_;
Craig Tiller's avatar
Craig Tiller committed
  std::function<std::unique_ptr<
      grpc::ClientAsyncReaderWriter<RequestType, ResponseType>>(
      BenchmarkService::Stub*, grpc::ClientContext*, CompletionQueue*, void*)>
      start_req_;
  grpc::Status status_;
  double start_;
  std::unique_ptr<grpc::ClientAsyncReaderWriter<RequestType, ResponseType>>
      stream_;
class AsyncStreamingClient GRPC_FINAL
    : public AsyncClient<BenchmarkService::Stub, SimpleRequest> {
  explicit AsyncStreamingClient(const ClientConfig& config)
      : AsyncClient<BenchmarkService::Stub, SimpleRequest>(
            config, SetupCtx, BenchmarkStubCreator) {
Alistair Veitch's avatar
Alistair Veitch committed
    StartThreads(num_async_threads_);
  ~AsyncStreamingClient() GRPC_OVERRIDE {}
Vijay Pai's avatar
Vijay Pai committed

 private:
  static void CheckDone(grpc::Status s, SimpleResponse* response) {}
Vijay Pai's avatar
Vijay Pai committed
  static std::unique_ptr<
      grpc::ClientAsyncReaderWriter<SimpleRequest, SimpleResponse>>
  StartReq(BenchmarkService::Stub* stub, grpc::ClientContext* ctx,
Vijay Pai's avatar
Vijay Pai committed
           CompletionQueue* cq, void* tag) {
    auto stream = stub->AsyncStreamingCall(ctx, cq, tag);
    return stream;
  };
  static ClientRpcContext* SetupCtx(BenchmarkService::Stub* stub,
                                    std::function<gpr_timespec()> next_issue,
                                    const SimpleRequest& req) {
    return new ClientRpcContextStreamingImpl<SimpleRequest, SimpleResponse>(
        stub, req, next_issue, AsyncStreamingClient::StartReq,
Vijay Pai's avatar
Vijay Pai committed
        AsyncStreamingClient::CheckDone);
class ClientRpcContextGenericStreamingImpl : public ClientRpcContext {
vjpai's avatar
vjpai committed
 public:
  ClientRpcContextGenericStreamingImpl(
      grpc::GenericStub* stub, const ByteBuffer& req,
      std::function<gpr_timespec()> next_issue,
      std::function<std::unique_ptr<grpc::GenericClientAsyncReaderWriter>(
          grpc::GenericStub*, grpc::ClientContext*,
Jan Tattermusch's avatar
Jan Tattermusch committed
          const grpc::string& method_name, CompletionQueue*, void*)>
          start_req,
vjpai's avatar
vjpai committed
      std::function<void(grpc::Status, ByteBuffer*)> on_done)
vjpai's avatar
vjpai committed
        stub_(stub),
vjpai's avatar
vjpai committed
        req_(req),
        response_(),
        next_state_(State::INVALID),
vjpai's avatar
vjpai committed
        callback_(on_done),
        next_issue_(next_issue),
vjpai's avatar
vjpai committed
        start_req_(start_req) {}
  ~ClientRpcContextGenericStreamingImpl() GRPC_OVERRIDE {}
vjpai's avatar
vjpai committed
  void Start(CompletionQueue* cq) GRPC_OVERRIDE {
    const grpc::string kMethodName(
        "/grpc.testing.BenchmarkService/StreamingCall");
    stream_ = start_req_(stub_, &context_, kMethodName, cq,
                         ClientRpcContext::tag(this));
    next_state_ = State::STREAM_IDLE;
vjpai's avatar
vjpai committed
  }
  bool RunNextState(bool ok, HistogramEntry* entry) GRPC_OVERRIDE {
    while (true) {
      switch (next_state_) {
        case State::STREAM_IDLE:
          if (!next_issue_) {  // ready to issue
            next_state_ = State::READY_TO_WRITE;
          } else {
            next_state_ = State::WAIT;
          }
          break;  // loop around, don't return
          alarm_.reset(
              new Alarm(cq_, next_issue_(), ClientRpcContext::tag(this)));
          next_state_ = State::READY_TO_WRITE;
          return true;
        case State::READY_TO_WRITE:
          if (!ok) {
            return false;
          }
vjpai's avatar
vjpai committed
          start_ = UsageTimer::Now();
          next_state_ = State::WRITE_DONE;
          stream_->Write(req_, ClientRpcContext::tag(this));
          return true;
        case State::WRITE_DONE:
          if (!ok) {
            return false;
          }
          next_state_ = State::READ_DONE;
          stream_->Read(&response_, ClientRpcContext::tag(this));
          return true;
          break;
        case State::READ_DONE:
          entry->set_value((UsageTimer::Now() - start_) * 1e9);
          callback_(status_, &response_);
          next_state_ = State::STREAM_IDLE;
          break;  // loop around
        default:
          GPR_ASSERT(false);
          return false;
      }
vjpai's avatar
vjpai committed
    }
  }
  ClientRpcContext* StartNewClone() GRPC_OVERRIDE {
    return new ClientRpcContextGenericStreamingImpl(stub_, req_, next_issue_,
                                                    start_req_, callback_);
vjpai's avatar
vjpai committed
  }
vjpai's avatar
vjpai committed
  grpc::ClientContext context_;
  grpc::GenericStub* stub_;
  CompletionQueue* cq_;
  std::unique_ptr<Alarm> alarm_;
vjpai's avatar
vjpai committed
  ByteBuffer req_;
  ByteBuffer response_;
  enum State {
    INVALID,
    STREAM_IDLE,
    WAIT,
    READY_TO_WRITE,
    WRITE_DONE,
    READ_DONE
  };
  State next_state_;
vjpai's avatar
vjpai committed
  std::function<void(grpc::Status, ByteBuffer*)> callback_;
  std::function<gpr_timespec()> next_issue_;
  std::function<std::unique_ptr<grpc::GenericClientAsyncReaderWriter>(
      grpc::GenericStub*, grpc::ClientContext*, const grpc::string&,
Craig Tiller's avatar
Craig Tiller committed
      CompletionQueue*, void*)>
      start_req_;
vjpai's avatar
vjpai committed
  grpc::Status status_;
  double start_;
  std::unique_ptr<grpc::GenericClientAsyncReaderWriter> stream_;
};

static std::unique_ptr<grpc::GenericStub> GenericStubCreator(
    std::shared_ptr<Channel> ch) {
  return std::unique_ptr<grpc::GenericStub>(new grpc::GenericStub(ch));
}

class GenericAsyncStreamingClient GRPC_FINAL
    : public AsyncClient<grpc::GenericStub, ByteBuffer> {
vjpai's avatar
vjpai committed
 public:
  explicit GenericAsyncStreamingClient(const ClientConfig& config)
      : AsyncClient<grpc::GenericStub, ByteBuffer>(config, SetupCtx,
                                                   GenericStubCreator) {
Alistair Veitch's avatar
Alistair Veitch committed
    StartThreads(num_async_threads_);
vjpai's avatar
vjpai committed
  }

  ~GenericAsyncStreamingClient() GRPC_OVERRIDE {}
vjpai's avatar
vjpai committed

 private:
  static void CheckDone(grpc::Status s, ByteBuffer* response) {}
  static std::unique_ptr<grpc::GenericClientAsyncReaderWriter> StartReq(
      grpc::GenericStub* stub, grpc::ClientContext* ctx,
      const grpc::string& method_name, CompletionQueue* cq, void* tag) {
vjpai's avatar
vjpai committed
    auto stream = stub->Call(ctx, method_name, cq, tag);
    return stream;
  };
  static ClientRpcContext* SetupCtx(grpc::GenericStub* stub,
                                    std::function<gpr_timespec()> next_issue,
Vijay Pai's avatar
Vijay Pai committed
                                    const ByteBuffer& req) {
    return new ClientRpcContextGenericStreamingImpl(
        stub, req, next_issue, GenericAsyncStreamingClient::StartReq,
Vijay Pai's avatar
Vijay Pai committed
        GenericAsyncStreamingClient::CheckDone);
vjpai's avatar
vjpai committed
  }
};

std::unique_ptr<Client> CreateAsyncUnaryClient(const ClientConfig& args) {
  return std::unique_ptr<Client>(new AsyncUnaryClient(args));
}
std::unique_ptr<Client> CreateAsyncStreamingClient(const ClientConfig& args) {
  return std::unique_ptr<Client>(new AsyncStreamingClient(args));
Craig Tiller's avatar
Craig Tiller committed
}
std::unique_ptr<Client> CreateGenericAsyncStreamingClient(
    const ClientConfig& args) {
vjpai's avatar
vjpai committed
  return std::unique_ptr<Client>(new GenericAsyncStreamingClient(args));
}
Craig Tiller's avatar
Craig Tiller committed
}  // namespace testing
}  // namespace grpc