udpated the log generator to pause when there is a bulk queue overflow

This commit is contained in:
Spencer Alger
2014-05-19 10:12:00 -07:00
parent 37d7877f7a
commit 512516c73f
2 changed files with 109 additions and 87 deletions

View File

@ -32,7 +32,7 @@
},
"devDependencies": {
"mocha": "^1.18.2",
"async": "~0.2.9",
"async": "~0.8.0",
"moment": "~2.4.0",
"js-yaml": "~2.1.3",
"optimist": "~0.6.0",

View File

@ -105,10 +105,6 @@ function createIndex(indexName) {
index: 'not_analyzed',
include_in_all: false
},
country: {
type: 'string',
index: 'not_analyzed'
},
agent: {
type: 'multi_field',
fields: {
@ -196,9 +192,9 @@ function createIndex(indexName) {
var queue = async.queue(function (events, done) {
var body = [];
var deps = [];
var esBulkQueueOverflow = 0;
events.forEach(function (event) {
var header = event.header;
@ -224,23 +220,31 @@ var queue = async.queue(function (events, done) {
})
.then(function (resp) {
if (resp.errors) {
var errors = [];
resp.items.forEach(function (item, i) {
if (item.index.error) {
errors.push(item.index.error);
if (item.index.error.match(/^EsRejectedExecutionException/)) {
esBulkQueueOverflow ++;
eventBuffer.push(events[i]);
}
}
});
console.log('\n - errors - \n' + errors.join('\n') + '\n');
}
})
.finally(function () {
if (esBulkQueueOverflow) {
process.stdout.write('w' + esBulkQueueOverflow + '-');
// pause for 10ms per queue overage
queue.pause();
setTimeout(function () {
queue.resume();
}, 10 * esBulkQueueOverflow);
} else {
process.stdout.write('.');
}
})
.nodeify(done);
}, 1);
var eventBuffer = [];
@ -265,8 +269,22 @@ queue.drain = function () {
}
};
async.timesSeries(total, function (i, done) {
async.series([
function (done) {
client.cluster.putSettings({
body: {
transient: {
threadpool: {
bulk: {
queue_size: -1
}
}
}
}
}, done);
},
function (done) {
async.timesSeries(total, function (i, done) {
// random date, plus less random time
var date = new Date(samples.randomMsInDayRange());
@ -288,7 +306,8 @@ async.timesSeries(total, function (i, done) {
date.setUTCHours(hours, minutes, seconds, ms);
var dateAsIso = date.toISOString();
var indexName = 'logstash-' + dateAsIso.substr(0, 4) + '.' + dateAsIso.substr(5, 2) + '.' + dateAsIso.substr(8, 2);
var indexName = 'logstash-' +
dateAsIso.substr(0, 4) + '.' + dateAsIso.substr(5, 2) + '.' + dateAsIso.substr(8, 2);
var event = {};
event.index = indexName;
@ -347,7 +366,10 @@ async.timesSeries(total, function (i, done) {
eventBuffer.flush();
setImmediate(done);
}, function () {
}, done);
}
], function (err) {
if (err) throw err;
doneCreatingEvents = true;
eventBuffer.flush();
});