diff --git a/examples/tips/subscriber.cc b/examples/tips/subscriber.cc index a482ad6263d81a46a81dae08555a7ce95d9e0730..2e2370ee2dac7f260c43f2db756886609be51919 100644 --- a/examples/tips/subscriber.cc +++ b/examples/tips/subscriber.cc @@ -81,6 +81,29 @@ Status Subscriber::GetSubscription(const grpc::string& name, return s; } +Status Subscriber::Pull(const grpc::string& name, + grpc::string* data) { + tech::pubsub::PullRequest request; + tech::pubsub::PullResponse response; + ClientContext context; + + request.set_subscription(name); + Status s = stub_->Pull(&context, request, &response); + if (s.IsOk()) { + tech::pubsub::PubsubEvent event = response.pubsub_event(); + if (event.has_message()) { + *data = event.message().data(); + } + tech::pubsub::AcknowledgeRequest ack; + proto2::Empty empty; + ClientContext ack_context; + ack.set_subscription(name); + ack.add_ack_id(response.ack_id()); + stub_->Acknowledge(&ack_context, ack, &empty); + } + return s; +} + } // namespace tips } // namespace examples } // namespace grpc diff --git a/examples/tips/subscriber.h b/examples/tips/subscriber.h index e0491ff919a6ad244c16d624b73cde4d82b00456..38345c0c5a984c52a9c7b09046a03fa3cf29a1e2 100644 --- a/examples/tips/subscriber.h +++ b/examples/tips/subscriber.h @@ -53,6 +53,8 @@ class Subscriber { Status GetSubscription(const grpc::string& name, grpc::string* topic); + Status Pull(const grpc::string& name, grpc::string* data); + private: std::unique_ptr<tech::pubsub::SubscriberService::Stub> stub_; }; diff --git a/examples/tips/subscriber_test.cc b/examples/tips/subscriber_test.cc index 4894814252edc017213a8636c7293f3c983d5660..4ff93643ae4fa752b59c08386d336c519c49a79e 100644 --- a/examples/tips/subscriber_test.cc +++ b/examples/tips/subscriber_test.cc @@ -53,6 +53,7 @@ namespace { const char kTopic[] = "test topic"; const char kSubscriptionName[] = "subscription name"; +const char kData[] = "Message data"; class SubscriberServiceImpl : public tech::pubsub::SubscriberService::Service { public: @@ -72,6 +73,21 @@ class SubscriberServiceImpl : public tech::pubsub::SubscriberService::Service { return Status::OK; } + Status Pull(ServerContext* context, + const tech::pubsub::PullRequest* request, + tech::pubsub::PullResponse* response) override { + EXPECT_EQ(request->subscription(), kSubscriptionName); + response->set_ack_id("1"); + response->mutable_pubsub_event()->mutable_message()->set_data(kData); + return Status::OK; + } + + Status Acknowledge(ServerContext* context, + const tech::pubsub::AcknowledgeRequest* request, + proto2::Empty* response) override { + return Status::OK; + } + }; class SubscriberTest : public ::testing::Test { @@ -108,10 +124,15 @@ TEST_F(SubscriberTest, TestSubscriber) { EXPECT_TRUE(subscriber_->CreateSubscription(kTopic, kSubscriptionName).IsOk()); + grpc::string topic; EXPECT_TRUE(subscriber_->GetSubscription(kSubscriptionName, &topic).IsOk()); EXPECT_EQ(topic, kTopic); + + grpc::string data; + EXPECT_TRUE(subscriber_->Pull(kSubscriptionName, + &data).IsOk()); } } // namespace