General refactor of the log generator so that it will finally create indexes for all of the events it creates. It should also perform a smidge better, but the real bottleneck is the event creation.
This commit is contained in:
@ -20,7 +20,6 @@ var argv = require('optimist')
|
||||
.argv;
|
||||
|
||||
var es = require('../../../src/elasticsearch');
|
||||
var _ = require('../../../src/lib/utils');
|
||||
var async = require('async');
|
||||
var path = require('path');
|
||||
var moment = require('moment');
|
||||
@ -30,11 +29,11 @@ var startingMoment = moment().utc().startOf('day').subtract('days', argv.days);
|
||||
var endingMoment = moment().utc().endOf('day').add('days', argv.days);
|
||||
|
||||
var clientConfig = {
|
||||
log: {
|
||||
level: 'trace',
|
||||
type: 'file',
|
||||
path: path.join(__dirname, '../../../log')
|
||||
}
|
||||
// log: {
|
||||
// level: 'trace',
|
||||
// type: 'file',
|
||||
// path: path.join(__dirname, '../../../log')
|
||||
// }
|
||||
};
|
||||
|
||||
if (argv.host) {
|
||||
@ -48,70 +47,14 @@ var samples = makeSamples(startingMoment, endingMoment);
|
||||
|
||||
console.log('Generating', argv.count, 'events across ±', argv.days, 'days');
|
||||
|
||||
fillIndecies(function () {
|
||||
var actions = [];
|
||||
var indices = {};
|
||||
var events = [];
|
||||
var doneEventing = false;
|
||||
var eventsPerBulk = 3500;
|
||||
var eventElementsPerBulk = eventsPerBulk * 2; // events are stored next to their headers, so each event has two elements;
|
||||
|
||||
async.timesSeries(argv.count, function (i, done) {
|
||||
// random date, plus less random time
|
||||
var date = moment(samples.randomMsInDayRange())
|
||||
.utc()
|
||||
.startOf('day')
|
||||
.add('milliseconds', samples.lessRandomMsInDay());
|
||||
|
||||
var event = {
|
||||
index: date.format('[logstash-]YYYY.MM.DD'),
|
||||
'@timestamp': date.toISOString(),
|
||||
ip: samples.ips(),
|
||||
extension: samples.extensions(),
|
||||
response: samples.responseCodes(),
|
||||
country: samples.countries(),
|
||||
point: samples.airports(),
|
||||
'@tags': [samples.tags(), samples.tags2()],
|
||||
utc_time: date.toISOString(),
|
||||
referer: 'http://' + samples.referrers() + '/' + samples.tags() + '/' + samples.astronauts(),
|
||||
agent: samples.userAgents(),
|
||||
};
|
||||
|
||||
event.clientip = event.ip;
|
||||
event.bytes = event.response < 500 ? samples.lessRandomRespSize() : 0;
|
||||
event.request = '/' + samples.astronauts() + '.' + event.extension;
|
||||
event.memory = event.extension === 'php' ? event.bytes * 40 : 0;
|
||||
if (event.memory) {
|
||||
event.phpmemory = event.memory;
|
||||
}
|
||||
|
||||
event['@message'] = event.ip + ' - - [' + date.toISOString() + '] "GET ' + event.request + ' HTTP/1.1" ' +
|
||||
event.response + ' ' + event.bytes + ' "-" "' + event.agent + '"';
|
||||
|
||||
actions.push({
|
||||
index: {
|
||||
_index: event.index,
|
||||
_type: samples.types(),
|
||||
_id: i
|
||||
}
|
||||
});
|
||||
actions.push(event);
|
||||
|
||||
if (actions.length === 3000 || i === argv.count - 1) {
|
||||
console.info('writing', actions.length / 2, 'documents');
|
||||
client.bulk({
|
||||
body: actions
|
||||
}, done);
|
||||
actions = [];
|
||||
} else {
|
||||
done();
|
||||
}
|
||||
}, function (err) {
|
||||
if (err) {
|
||||
throw err;
|
||||
} else {
|
||||
console.log('Done!');
|
||||
process.exit();
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
function fillIndecies(cb) {
|
||||
function createIndex(indexName, done) {
|
||||
console.log('made index', indexName);
|
||||
var indexBody = {
|
||||
mappings: {
|
||||
_default_: {
|
||||
@ -142,6 +85,9 @@ function fillIndecies(cb) {
|
||||
}
|
||||
}
|
||||
},
|
||||
clientip: {
|
||||
type: 'ip'
|
||||
},
|
||||
ip: {
|
||||
type: 'ip'
|
||||
},
|
||||
@ -153,35 +99,91 @@ function fillIndecies(cb) {
|
||||
}
|
||||
};
|
||||
|
||||
function createDateIndex(indexName) {
|
||||
return function (done) {
|
||||
client.indices.create({
|
||||
ignore: 400,
|
||||
index: indexName,
|
||||
body: indexBody
|
||||
}, function (err, resp) {
|
||||
if (err) {
|
||||
done(err);
|
||||
} else {
|
||||
done(null, resp.error ? 'existed' : 'created');
|
||||
}
|
||||
});
|
||||
};
|
||||
client.indices.create({
|
||||
ignore: 400,
|
||||
index: indexName,
|
||||
body: indexBody
|
||||
}, done);
|
||||
}
|
||||
|
||||
var bulk = async.queue(function (chunk, done) {
|
||||
if (typeof chunk === 'string') {
|
||||
return createIndex(chunk, done);
|
||||
}
|
||||
|
||||
var indexPushActions = samples.days.map(function (moment) {
|
||||
return createDateIndex(moment.format('[logstash-]YYYY.MM.DD'));
|
||||
});
|
||||
console.info('writing', chunk.length / 2, 'documents');
|
||||
client.bulk({
|
||||
body: chunk
|
||||
}, done);
|
||||
}, 3);
|
||||
|
||||
async.parallel(indexPushActions, function (err, responses) {
|
||||
if (err) {
|
||||
console.error(err.message = 'Unable to create indicies: ' + err.message);
|
||||
console.error(err.stack);
|
||||
} else {
|
||||
_.each(_.groupBy(responses), function (list, did) {
|
||||
console.info(list.length, 'indicies', did);
|
||||
});
|
||||
cb();
|
||||
}
|
||||
});
|
||||
}
|
||||
bulk.drain = function () {
|
||||
if (!doneEventing) {
|
||||
// console.log('indexed faster than the events were created');
|
||||
return;
|
||||
}
|
||||
|
||||
client.close();
|
||||
console.log('done');
|
||||
};
|
||||
|
||||
async.timesSeries(argv.count, function (i, done) {
|
||||
|
||||
// random date, plus less random time
|
||||
var date = moment(samples.randomMsInDayRange())
|
||||
.utc()
|
||||
.startOf('day')
|
||||
.add('milliseconds', samples.lessRandomMsInDay());
|
||||
|
||||
var event = {};
|
||||
|
||||
event.index = date.format('[logstash-]YYYY.MM.DD');
|
||||
event['@timestamp'] = date.toISOString();
|
||||
event.ip = samples.ips();
|
||||
event.extension = samples.extensions();
|
||||
event.response = samples.responseCodes();
|
||||
event.country = samples.countries();
|
||||
event.point = samples.airports();
|
||||
event['@tags'] = [
|
||||
samples.tags(),
|
||||
samples.tags2()
|
||||
];
|
||||
event.utc_time = date.toISOString();
|
||||
event.referer = 'http://' + samples.referrers() + '/' + samples.tags() + '/' + samples.astronauts();
|
||||
event.agent = samples.userAgents();
|
||||
event.clientip = event.ip;
|
||||
event.bytes = event.response < 500 ? samples.lessRandomRespSize() : 0;
|
||||
event.request = '/' + samples.astronauts() + '.' + event.extension;
|
||||
if (event.extension === 'php') {
|
||||
event.phpmemory = event.memory = event.bytes * 40;
|
||||
}
|
||||
event['@message'] = event.ip + ' - - [' + date.toISOString() + '] "GET ' + event.request + ' HTTP/1.1" ' +
|
||||
event.response + ' ' + event.bytes + ' "-" "' + event.agent + '"';
|
||||
|
||||
|
||||
if (indices[event.index] !== true) {
|
||||
bulk.push(event.index); // when it receives a string it handles that immediately
|
||||
indices[event.index] = true;
|
||||
}
|
||||
|
||||
events.push(
|
||||
{
|
||||
index: {
|
||||
_index: event.index,
|
||||
_type: samples.types(),
|
||||
_id: i
|
||||
}
|
||||
},
|
||||
event
|
||||
);
|
||||
|
||||
// eventsPerBulk must be multiplied by 2 because each event is two elements long
|
||||
if (events.length === eventElementsPerBulk || i === argv.count - 1) {
|
||||
bulk.push([events.splice(0, eventElementsPerBulk)]);
|
||||
}
|
||||
|
||||
setImmediate(done);
|
||||
}, function () {
|
||||
console.log('done creating events');
|
||||
doneEventing = true;
|
||||
});
|
||||
@ -9,18 +9,6 @@ exports.make = function (startingMoment, endingMoment) {
|
||||
|
||||
var sets = {};
|
||||
|
||||
sets.days = (function () {
|
||||
var days = [];
|
||||
var moving = startingMoment.clone();
|
||||
|
||||
while (moving <= endingMoment) {
|
||||
days.push(moving.clone());
|
||||
moving.add('day', 1);
|
||||
}
|
||||
|
||||
return days;
|
||||
}());
|
||||
|
||||
sets.randomMsInDayRange = new Stochator({
|
||||
min: startingMoment.toDate().getTime(),
|
||||
max: endingMoment.toDate().getTime()
|
||||
|
||||
Reference in New Issue
Block a user