Skip to content
Snippets Groups Projects
Commit 405392c2 authored by Chen Wang's avatar Chen Wang
Browse files

fixed typos and add ListTopics implementation.

parent b532ef89
No related branches found
No related tags found
No related merge requests found
...@@ -69,7 +69,7 @@ const char kMessageData[] = "Test Data"; ...@@ -69,7 +69,7 @@ const char kMessageData[] = "Test Data";
} // namespace } // namespace
grpc::string GetServiceAccountJsonKey() { grpc::string GetServiceAccountJsonKey() {
static grpc::string json_key; grpc::string json_key;
if (json_key.empty()) { if (json_key.empty()) {
std::ifstream json_key_file(FLAGS_service_account_key_file); std::ifstream json_key_file(FLAGS_service_account_key_file);
std::stringstream key_stream; std::stringstream key_stream;
...@@ -116,7 +116,7 @@ int main(int argc, char** argv) { ...@@ -116,7 +116,7 @@ int main(int argc, char** argv) {
ss << FLAGS_project_id << "/" << kSubscriptionName; ss << FLAGS_project_id << "/" << kSubscriptionName;
grpc::string subscription_name = ss.str(); grpc::string subscription_name = ss.str();
// Clean up test topic and subcription. // Clean up test topic and subcription if they exist before.
grpc::string subscription_topic; grpc::string subscription_topic;
if (subscriber.GetSubscription( if (subscriber.GetSubscription(
subscription_name, &subscription_topic).IsOk()) { subscription_name, &subscription_topic).IsOk()) {
...@@ -134,6 +134,18 @@ int main(int argc, char** argv) { ...@@ -134,6 +134,18 @@ int main(int argc, char** argv) {
s.code(), s.details().c_str()); s.code(), s.details().c_str());
GPR_ASSERT(s.IsOk()); GPR_ASSERT(s.IsOk());
std::vector<grpc::string> topics;
s = publisher.ListTopics(FLAGS_project_id, &topics);
gpr_log(GPR_INFO, "List topic returns code %d, %s",
s.code(), s.details().c_str());
bool topic_found = false;
for (unsigned int i = 0; i < topics.size(); i++) {
if (topics[i] == topic) topic_found = true;
gpr_log(GPR_INFO, "topic: %s", topics[i].c_str());
}
GPR_ASSERT(s.IsOk());
GPR_ASSERT(topic_found);
s = subscriber.CreateSubscription(topic, subscription_name); s = subscriber.CreateSubscription(topic, subscription_name);
gpr_log(GPR_INFO, "create subscrption returns code %d, %s", gpr_log(GPR_INFO, "create subscrption returns code %d, %s",
s.code(), s.details().c_str()); s.code(), s.details().c_str());
......
...@@ -31,6 +31,8 @@ ...@@ -31,6 +31,8 @@
* *
*/ */
#include <sstream>
#include <grpc++/client_context.h> #include <grpc++/client_context.h>
#include "examples/tips/publisher.h" #include "examples/tips/publisher.h"
...@@ -56,7 +58,7 @@ void Publisher::Shutdown() { ...@@ -56,7 +58,7 @@ void Publisher::Shutdown() {
stub_.reset(); stub_.reset();
} }
Status Publisher::CreateTopic(const string& topic) { Status Publisher::CreateTopic(const grpc::string& topic) {
Topic request; Topic request;
Topic response; Topic response;
request.set_name(topic); request.set_name(topic);
...@@ -65,15 +67,28 @@ Status Publisher::CreateTopic(const string& topic) { ...@@ -65,15 +67,28 @@ Status Publisher::CreateTopic(const string& topic) {
return stub_->CreateTopic(&context, request, &response); return stub_->CreateTopic(&context, request, &response);
} }
Status Publisher::ListTopics() { Status Publisher::ListTopics(const grpc::string& project_id,
std::vector<grpc::string>* topics) {
ListTopicsRequest request; ListTopicsRequest request;
ListTopicsResponse response; ListTopicsResponse response;
ClientContext context; ClientContext context;
return stub_->ListTopics(&context, request, &response); std::stringstream ss;
ss << "cloud.googleapis.com/project in (/projects/" << project_id << ")";
request.set_query(ss.str());
Status s = stub_->ListTopics(&context, request, &response);
tech::pubsub::Topic topic;
for (int i = 0; i < response.topic_size(); i++) {
topic = response.topic(i);
topics->push_back(topic.name());
}
return s;
} }
Status Publisher::GetTopic(const string& topic) { Status Publisher::GetTopic(const grpc::string& topic) {
GetTopicRequest request; GetTopicRequest request;
Topic response; Topic response;
ClientContext context; ClientContext context;
...@@ -83,7 +98,7 @@ Status Publisher::GetTopic(const string& topic) { ...@@ -83,7 +98,7 @@ Status Publisher::GetTopic(const string& topic) {
return stub_->GetTopic(&context, request, &response); return stub_->GetTopic(&context, request, &response);
} }
Status Publisher::DeleteTopic(const string& topic) { Status Publisher::DeleteTopic(const grpc::string& topic) {
DeleteTopicRequest request; DeleteTopicRequest request;
proto2::Empty response; proto2::Empty response;
ClientContext context; ClientContext context;
...@@ -93,7 +108,7 @@ Status Publisher::DeleteTopic(const string& topic) { ...@@ -93,7 +108,7 @@ Status Publisher::DeleteTopic(const string& topic) {
return stub_->DeleteTopic(&context, request, &response); return stub_->DeleteTopic(&context, request, &response);
} }
Status Publisher::Publish(const string& topic, const string& data) { Status Publisher::Publish(const grpc::string& topic, const grpc::string& data) {
PublishRequest request; PublishRequest request;
proto2::Empty response; proto2::Empty response;
ClientContext context; ClientContext context;
......
...@@ -48,12 +48,13 @@ class Publisher { ...@@ -48,12 +48,13 @@ class Publisher {
Publisher(std::shared_ptr<ChannelInterface> channel); Publisher(std::shared_ptr<ChannelInterface> channel);
void Shutdown(); void Shutdown();
Status CreateTopic(const string& topic); Status CreateTopic(const grpc::string& topic);
Status GetTopic(const string& topic); Status GetTopic(const grpc::string& topic);
Status DeleteTopic(const string& topic); Status DeleteTopic(const grpc::string& topic);
Status ListTopics(); Status ListTopics(const grpc::string& project_id,
std::vector<grpc::string>* topics);
Status Publish(const string& topic, const string& data); Status Publish(const grpc::string& topic, const grpc::string& data);
private: private:
std::unique_ptr<tech::pubsub::PublisherService::Stub> stub_; std::unique_ptr<tech::pubsub::PublisherService::Stub> stub_;
......
...@@ -51,6 +51,7 @@ namespace grpc { ...@@ -51,6 +51,7 @@ namespace grpc {
namespace testing { namespace testing {
namespace { namespace {
const char kProjectId[] = "project id";
const char kTopic[] = "test topic"; const char kTopic[] = "test topic";
const char kMessageData[] = "test message data"; const char kMessageData[] = "test message data";
...@@ -80,6 +81,10 @@ class PublisherServiceImpl : public tech::pubsub::PublisherService::Service { ...@@ -80,6 +81,10 @@ class PublisherServiceImpl : public tech::pubsub::PublisherService::Service {
Status ListTopics(ServerContext* context, Status ListTopics(ServerContext* context,
const ::tech::pubsub::ListTopicsRequest* request, const ::tech::pubsub::ListTopicsRequest* request,
::tech::pubsub::ListTopicsResponse* response) override { ::tech::pubsub::ListTopicsResponse* response) override {
std::stringstream ss;
ss << "cloud.googleapis.com/project in (/projects/" << kProjectId << ")";
EXPECT_EQ(request->query(), ss.str());
response->add_topic()->set_name(kTopic);
return Status::OK; return Status::OK;
} }
...@@ -124,9 +129,15 @@ class PublisherTest : public ::testing::Test { ...@@ -124,9 +129,15 @@ class PublisherTest : public ::testing::Test {
TEST_F(PublisherTest, TestPublisher) { TEST_F(PublisherTest, TestPublisher) {
EXPECT_TRUE(publisher_->CreateTopic(kTopic).IsOk()); EXPECT_TRUE(publisher_->CreateTopic(kTopic).IsOk());
EXPECT_TRUE(publisher_->Publish(kTopic, kMessageData).IsOk()); EXPECT_TRUE(publisher_->Publish(kTopic, kMessageData).IsOk());
EXPECT_TRUE(publisher_->GetTopic(kTopic).IsOk()); EXPECT_TRUE(publisher_->GetTopic(kTopic).IsOk());
EXPECT_TRUE(publisher_->ListTopics().IsOk());
std::vector<grpc::string> topics;
EXPECT_TRUE(publisher_->ListTopics(kProjectId, &topics).IsOk());
EXPECT_EQ(topics.size(), 1);
EXPECT_EQ(topics[0], kTopic);
} }
} // namespace } // namespace
......
// This file will be moved to new location. // This file will be moved to a new location.
// Specification of the Pubsub API. // Specification of the Pubsub API.
......
...@@ -56,7 +56,8 @@ void Subscriber::Shutdown() { ...@@ -56,7 +56,8 @@ void Subscriber::Shutdown() {
stub_.reset(); stub_.reset();
} }
Status Subscriber::CreateSubscription(const string& topic, const string& name) { Status Subscriber::CreateSubscription(const grpc::string& topic,
const grpc::string& name) {
tech::pubsub::Subscription request; tech::pubsub::Subscription request;
tech::pubsub::Subscription response; tech::pubsub::Subscription response;
ClientContext context; ClientContext context;
...@@ -67,7 +68,8 @@ Status Subscriber::CreateSubscription(const string& topic, const string& name) { ...@@ -67,7 +68,8 @@ Status Subscriber::CreateSubscription(const string& topic, const string& name) {
return stub_->CreateSubscription(&context, request, &response); return stub_->CreateSubscription(&context, request, &response);
} }
Status Subscriber::GetSubscription(const string& name, string* topic) { Status Subscriber::GetSubscription(const grpc::string& name,
grpc::string* topic) {
tech::pubsub::GetSubscriptionRequest request; tech::pubsub::GetSubscriptionRequest request;
tech::pubsub::Subscription response; tech::pubsub::Subscription response;
ClientContext context; ClientContext context;
...@@ -79,7 +81,7 @@ Status Subscriber::GetSubscription(const string& name, string* topic) { ...@@ -79,7 +81,7 @@ Status Subscriber::GetSubscription(const string& name, string* topic) {
return s; return s;
} }
Status Subscriber::DeleteSubscription(const string& name) { Status Subscriber::DeleteSubscription(const grpc::string& name) {
tech::pubsub::DeleteSubscriptionRequest request; tech::pubsub::DeleteSubscriptionRequest request;
proto2::Empty response; proto2::Empty response;
ClientContext context; ClientContext context;
...@@ -89,7 +91,7 @@ Status Subscriber::DeleteSubscription(const string& name) { ...@@ -89,7 +91,7 @@ Status Subscriber::DeleteSubscription(const string& name) {
return stub_->DeleteSubscription(&context, request, &response); return stub_->DeleteSubscription(&context, request, &response);
} }
Status Subscriber::Pull(const string& name, string* data) { Status Subscriber::Pull(const grpc::string& name, grpc::string* data) {
tech::pubsub::PullRequest request; tech::pubsub::PullRequest request;
tech::pubsub::PullResponse response; tech::pubsub::PullResponse response;
ClientContext context; ClientContext context;
......
...@@ -48,14 +48,14 @@ class Subscriber { ...@@ -48,14 +48,14 @@ class Subscriber {
Subscriber(std::shared_ptr<ChannelInterface> channel); Subscriber(std::shared_ptr<ChannelInterface> channel);
void Shutdown(); void Shutdown();
Status CreateSubscription(const string& topic, Status CreateSubscription(const grpc::string& topic,
const string& name); const grpc::string& name);
Status GetSubscription(const string& name, string* topic); Status GetSubscription(const grpc::string& name, grpc::string* topic);
Status DeleteSubscription(const string& name); Status DeleteSubscription(const grpc::string& name);
Status Pull(const string& name, string* data); Status Pull(const grpc::string& name, grpc::string* data);
private: private:
std::unique_ptr<tech::pubsub::SubscriberService::Stub> stub_; std::unique_ptr<tech::pubsub::SubscriberService::Stub> stub_;
......
...@@ -45,8 +45,6 @@ ...@@ -45,8 +45,6 @@
#include "test/core/util/port.h" #include "test/core/util/port.h"
#include "test/core/util/test_config.h" #include "test/core/util/test_config.h"
using grpc::ChannelInterface;
namespace grpc { namespace grpc {
namespace testing { namespace testing {
namespace { namespace {
...@@ -132,7 +130,6 @@ TEST_F(SubscriberTest, TestSubscriber) { ...@@ -132,7 +130,6 @@ TEST_F(SubscriberTest, TestSubscriber) {
EXPECT_TRUE(subscriber_->CreateSubscription(kTopic, EXPECT_TRUE(subscriber_->CreateSubscription(kTopic,
kSubscriptionName).IsOk()); kSubscriptionName).IsOk());
grpc::string topic; grpc::string topic;
EXPECT_TRUE(subscriber_->GetSubscription(kSubscriptionName, EXPECT_TRUE(subscriber_->GetSubscription(kSubscriptionName,
&topic).IsOk()); &topic).IsOk());
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment