Skip to content
Snippets Groups Projects
Commit 8c70eb10 authored by Tim Emiola's avatar Tim Emiola
Browse files

Merge pull request #1298 from tbetbetbe/grpc_nodejs_remove_pubsub

Removes nodejs pubsub example
parents 45617040 ff1ae501
No related branches found
No related tags found
No related merge requests found
// This file will be moved to a new location.
// Copyright 2015, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
syntax = "proto2";
package proto2;
// An empty message that you can re-use to avoid defining duplicated empty
// messages in your project. A typical example is to use it as argument or the
// return value of a service API. For instance:
//
// service Foo {
// rpc Bar (proto2.Empty) returns (proto2.Empty) { };
// };
//
message Empty {}
// This file will be moved to a new location.
// Copyright 2015, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
// Labels provide a way to associate user-defined metadata with various
// objects. Labels may be used to organize objects into non-hierarchical
// groups; think metadata tags attached to mp3s.
syntax = "proto2";
package tech.label;
// A key-value pair applied to a given object.
message Label {
// The key of a label is a syntactically valid URL (as per RFC 1738) with
// the "scheme" and initial slashes omitted and with the additional
// restrictions noted below. Each key should be globally unique. The
// "host" portion is called the "namespace" and is not necessarily
// resolvable to a network endpoint. Instead, the namespace indicates what
// system or entity defines the semantics of the label. Namespaces do not
// restrict the set of objects to which a label may be associated.
//
// Keys are defined by the following grammar:
//
// key = hostname "/" kpath
// kpath = ksegment *[ "/" ksegment ]
// ksegment = alphadigit | *[ alphadigit | "-" | "_" | "." ]
//
// where "hostname" and "alphadigit" are defined as in RFC 1738.
//
// Example key:
// spanner.google.com/universe
required string key = 1;
// The value of the label.
oneof value {
// A string value.
string str_value = 2;
// An integer value.
int64 num_value = 3;
}
}
// A collection of labels, such as the set of all labels attached to an
// object. Each label in the set must have a different key.
//
// Users should prefer to embed "repeated Label" directly when possible.
// This message should only be used in cases where that isn't possible (e.g.
// with oneof).
message Labels {
repeated Label label = 1;
}
This diff is collapsed.
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
'use strict';
var async = require('async');
var fs = require('fs');
var GoogleAuth = require('google-auth-library');
var parseArgs = require('minimist');
var strftime = require('strftime');
var _ = require('underscore');
var grpc = require('../..');
var PROTO_PATH = __dirname + '/pubsub.proto';
var pubsub = grpc.load(PROTO_PATH).tech.pubsub;
function PubsubRunner(pub, sub, args) {
this.pub = pub;
this.sub = sub;
this.args = args;
}
PubsubRunner.prototype.getTestTopicName = function() {
var base_name = '/topics/' + this.args.project_id + '/';
if (this.args.topic_name) {
return base_name + this.args.topic_name;
}
var now_text = strftime('%Y%m%d%H%M%S%L');
return base_name + process.env.USER + '-' + now_text;
};
PubsubRunner.prototype.getTestSubName = function() {
var base_name = '/subscriptions/' + this.args.project_id + '/';
if (this.args.sub_name) {
return base_name + this.args.sub_name;
}
var now_text = strftime('%Y%m%d%H%M%S%L');
return base_name + process.env.USER + '-' + now_text;
};
PubsubRunner.prototype.listProjectTopics = function(callback) {
var q = ('cloud.googleapis.com/project in (/projects/' +
this.args.project_id + ')');
this.pub.listTopics({query: q}, callback);
};
PubsubRunner.prototype.topicExists = function(name, callback) {
this.listProjectTopics(function(err, response) {
if (err) {
callback(err);
} else {
callback(null, _.some(response.topic, function(t) {
return t.name === name;
}));
}
});
};
PubsubRunner.prototype.createTopicIfNeeded = function(name, callback) {
var self = this;
this.topicExists(name, function(err, exists) {
if (err) {
callback(err);
} else{
if (exists) {
callback(null);
} else {
self.pub.createTopic({name: name}, callback);
}
}
});
};
PubsubRunner.prototype.removeTopic = function(callback) {
var name = this.getTestTopicName();
console.log('... removing Topic', name);
this.pub.deleteTopic({topic: name}, function(err, value) {
if (err) {
console.log('Could not delete a topic: rpc failed with', err);
callback(err);
} else {
console.log('removed Topic', name, 'OK');
callback(null);
}
});
};
PubsubRunner.prototype.createTopic = function(callback) {
var name = this.getTestTopicName();
console.log('... creating Topic', name);
this.pub.createTopic({name: name}, function(err, value) {
if (err) {
console.log('Could not create a topic: rpc failed with', err);
callback(err);
} else {
console.log('created Topic', name, 'OK');
callback(null);
}
});
};
PubsubRunner.prototype.listSomeTopics = function(callback) {
console.log('Listing topics');
console.log('-------------_');
this.listProjectTopics(function(err, response) {
if (err) {
console.log('Could not list topic: rpc failed with', err);
callback(err);
} else {
_.each(response.topic, function(t) {
console.log(t.name);
});
callback(null);
}
});
};
PubsubRunner.prototype.checkExists = function(callback) {
var name = this.getTestTopicName();
console.log('... checking for topic', name);
this.topicExists(name, function(err, exists) {
if (err) {
console.log('Could not check for a topics: rpc failed with', err);
callback(err);
} else {
if (exists) {
console.log(name, 'is a topic');
} else {
console.log(name, 'is not a topic');
}
callback(null);
}
});
};
PubsubRunner.prototype.randomPubSub = function(callback) {
var self = this;
var topic_name = this.getTestTopicName();
var sub_name = this.getTestSubName();
var subscription = {name: sub_name, topic: topic_name};
async.waterfall([
_.bind(this.createTopicIfNeeded, this, topic_name),
_.bind(this.sub.createSubscription, this.sub, subscription),
function(resp, cb) {
var msg_count = _.random(10, 30);
// Set up msg_count messages to publish
var message_senders = _.times(msg_count, function(n) {
return _.bind(self.pub.publish, self.pub, {
topic: topic_name,
message: {data: new Buffer('message ' + n)}
});
});
async.parallel(message_senders, function(err, result) {
cb(err, result, msg_count);
});
},
function(result, msg_count, cb) {
console.log('Sent', msg_count, 'messages to', topic_name + ',',
'checking for them now.');
var batch_request = {
subscription: sub_name,
max_events: msg_count
};
self.sub.pullBatch(batch_request, cb);
},
function(batch, cb) {
var ack_id = _.pluck(batch.pull_responses, 'ack_id');
console.log('Got', ack_id.length, 'messages, acknowledging them...');
var ack_request = {
subscription: sub_name,
ack_id: ack_id
};
self.sub.acknowledge(ack_request, cb);
},
function(result, cb) {
console.log(
'Test messages were acknowledged OK, deleting the subscription');
self.sub.deleteSubscription({subscription: sub_name}, cb);
}
], function (err, result) {
if (err) {
console.log('Could not do random pub sub: rpc failed with', err);
}
callback(err, result);
});
};
function main(callback) {
var argv = parseArgs(process.argv, {
string: [
'host',
'oauth_scope',
'port',
'action',
'project_id',
'topic_name',
'sub_name'
],
default: {
host: 'pubsub-staging.googleapis.com',
oauth_scope: 'https://www.googleapis.com/auth/pubsub',
port: 443,
action: 'listSomeTopics',
project_id: 'stoked-keyword-656'
}
});
var valid_actions = [
'createTopic',
'removeTopic',
'listSomeTopics',
'checkExists',
'randomPubSub'
];
if (_.some(valid_actions, function(action) {
return action === argv.action;
})) {
callback(new Error('Action was not valid'));
}
var address = argv.host + ':' + argv.port;
(new GoogleAuth()).getApplicationDefault(function(err, credential) {
if (err) {
callback(err);
return;
}
if (credential.createScopedRequired()) {
credential = credential.createScoped(argv.oauth_scope);
}
var updateMetadata = grpc.getGoogleAuthDelegate(credential);
var ca_path = process.env.SSL_CERT_FILE;
fs.readFile(ca_path, function(err, ca_data) {
if (err) {
callback(err);
return;
}
var ssl_creds = grpc.Credentials.createSsl(ca_data);
var options = {
credentials: ssl_creds,
'grpc.ssl_target_name_override': argv.host
};
var pub = new pubsub.PublisherService(address, options, updateMetadata);
var sub = new pubsub.SubscriberService(address, options, updateMetadata);
var runner = new PubsubRunner(pub, sub, argv);
runner[argv.action](callback);
});
});
}
if (require.main === module) {
main(function(err) {
if (err) {
throw err;
}
});
}
module.exports = PubsubRunner;
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment