Skip to content
Snippets Groups Projects
Commit 2822e9ad authored by murgatroid99's avatar murgatroid99
Browse files

Added pubsub demo client

parent 9c360477
No related branches found
No related tags found
No related merge requests found
......@@ -34,8 +34,8 @@
syntax = "proto2";
import "examples/pubsub/empty.proto";
import "examples/pubsub/label.proto";
import "empty.proto";
import "label.proto";
package tech.pubsub;
......
......@@ -28,7 +28,10 @@
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
var async = require('async');
var fs = require('fs');
var GoogleAuth = require('googleauth');
var parseArgs = require('minimist');
var strftime = require('strftime');
var _ = require('underscore');
var grpc = require('../..');
var PROTO_PATH = __dirname + '/pubsub.proto';
......@@ -45,7 +48,7 @@ PubsubRunner.prototype.getTestTopicName = function() {
if (this.args.topic_name) {
return base_name + this.args.topic_name;
}
var now_text = new Date().toLocaleFormat('%Y%m%d%H%M%S%L');
var now_text = strftime('%Y%m%d%H%M%S%L');
return base_name + process.env.USER + '-' + now_text;
};
......@@ -54,7 +57,7 @@ PubsubRunner.prototype.getTestSubName = function() {
if (this.args.sub_name) {
return base_name + this.args.sub_name;
}
var now_text = new Date().toLocaleFormat('%Y%m%d%H%M%S%L');
var now_text = strftime('%Y%m%d%H%M%S%L');
return base_name + process.env.USER + '-' + now_text;
};
......@@ -77,6 +80,7 @@ PubsubRunner.prototype.topicExists = function(name, callback) {
};
PubsubRunner.prototype.createTopicIfNeeded = function(name, callback) {
var self = this;
this.topicExists(name, function(err, exists) {
if (err) {
callback(err);
......@@ -84,7 +88,7 @@ PubsubRunner.prototype.createTopicIfNeeded = function(name, callback) {
if (exists) {
callback(null);
} else {
this.pub.createTopic({name: name}, callback);
self.pub.createTopic({name: name}, callback);
}
}
});
......@@ -153,45 +157,48 @@ PubsubRunner.prototype.checkExists = function(callback) {
};
PubsubRunner.prototype.randomPubSub = function(callback) {
var self = this;
var topic_name = this.getTestTopicName();
var sub_name = this.getTestSubName();
var subscription = {name: sub_name, topic: topic_name};
async.waterfall([
_.bind(this.createTopicIfNeeded, this, topic_name),
_.bind(this.sub.createSubscription, this, subscription),
_.bind(this.sub.createSubscription, this.sub, subscription),
function(resp, cb) {
var msg_count = _.random(10, 30);
// Set up msg_count messages to publish
var message_senders = _.times(msg_count, function(n) {
return _.bind(this.pub.publish, this.pub, {
return _.bind(self.pub.publish, self.pub, {
topic: topic_name,
message: {data: 'message ' + n}
message: {data: new Buffer('message ' + n)}
});
});
async.parallel(message_senders, cb);
async.parallel(message_senders, function(err, result) {
cb(err, result, msg_count);
});
},
function(result, cb) {
function(result, msg_count, cb) {
console.log('Sent', msg_count, 'messages to', topic_name + ',',
'checking for them now.');
var batch_request = {
subscription: sub_name,
max_events: msg_count
};
this.sub.pull_batch(batch_request, cb);
self.sub.pullBatch(batch_request, cb);
},
function(batch, cb) {
var ack_ids = _.pluck(batch.pull_responses, 'ack_id');
console.log('Got', ack_ids.length, 'messages, acknowledging them...');
var ack_id = _.pluck(batch.pull_responses, 'ack_id');
console.log('Got', ack_id.length, 'messages, acknowledging them...');
var ack_request = {
subscription: sub_name,
ack_ids: ack_ids
ack_id: ack_id
};
this.sub.acknowledge(ack_request, cb);
self.sub.acknowledge(ack_request, cb);
},
function(result, cb) {
console.log(
'Test messages were acknowledged OK, deleting the subscription');
this.sub.delete({subscription: sub_name}, cb);
self.sub.deleteSubscription({subscription: sub_name}, cb);
}
], function (err, result) {
if (err) {
......@@ -213,23 +220,23 @@ function main(callback) {
'sub_name'
],
default: {
host: 'pubsub-testing.googleapis.com',
host: 'pubsub-staging.googleapis.com',
oauth_scope: 'https://www.googleapis.com/auth/pubsub',
port: 443,
action: 'all',
project: 'stoked-keyword-656'
action: 'listSomeTopics',
project_id: 'stoked-keyword-656'
}
});
var valid_actions = [
'removeTopic',
'createTopic',
'listSomeTopic',
'removeTopic',
'listSomeTopics',
'checkExists',
'randomPubSub'
];
if (!(argv.action === 'all' || _.some(valid_actions, function(action) {
if (_.some(valid_actions, function(action) {
return action === argv.action;
}))) {
})) {
callback(new Error('Action was not valid'));
}
var address = argv.host + ':' + argv.port;
......@@ -249,17 +256,14 @@ function main(callback) {
return;
}
var ssl_creds = grpc.Credentials.createSsl(ca_data);
var options = {credentials: ssl_creds};
var options = {
credentials: ssl_creds,
'grpc.ssl_target_name_override': argv.host
};
var pub = new pubsub.PublisherService(address, options, updateMetadata);
var sub = new pubsub.SubscriberService(address, options, updateMetadata);
var runner = new PubsubRunner(pub, sub, argv);
if (argv.action === 'all') {
async.series(_.map(valid_actions, function(name) {
_.bind(runner[name], runner);
}), callback);
} else {
runner[argv.action](callback);
}
runner[argv.action](callback);
});
});
}
......
......@@ -15,9 +15,10 @@
"underscore.string": "^3.0.0"
},
"devDependencies": {
"mocha": "~1.21.0",
"googleauth": "google/google-auth-library-nodejs",
"minimist": "^1.1.0",
"googleauth": "google/google-auth-library-nodejs"
"mocha": "~1.21.0",
"strftime": "^0.8.2"
},
"files": [
"README.md",
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment