diff --git a/scripts/generate/logs/index.js b/scripts/generate/logs/index.js index 40d6488c7..1924d740f 100644 --- a/scripts/generate/logs/index.js +++ b/scripts/generate/logs/index.js @@ -1,3 +1,4 @@ +/* jshint curly:false */ // args var argv = require('optimist') .usage('node scripts/generate/logs [-h|--host localhost:9200] [-c|--count 14000] [-d|--days 7]') @@ -23,6 +24,7 @@ var es = require('../../../src/elasticsearch'); var async = require('async'); var moment = require('moment'); var makeSamples = require('./samples').make; +// var Promise = require('bluebird'); var startingMoment = moment().utc().startOf('day').subtract('days', argv.days); var endingMoment = moment().utc().endOf('day').add('days', argv.days); @@ -30,7 +32,7 @@ var clientConfig = { // log: { // level: 'trace', // type: 'file', - // path: path.join(__dirname, '../../../log') + // path: require('path').join(__dirname, '../../../log') // } }; @@ -43,16 +45,15 @@ if (argv.host) { var client = new es.Client(clientConfig); var samples = makeSamples(startingMoment, endingMoment); -console.log('Generating', argv.count, 'events across ±', argv.days, 'days'); - var indices = {}; -var events = []; -var doneEventing = false; -var eventsPerBulk = 3500; -var eventElementsPerBulk = eventsPerBulk * 2; // events are stored next to their headers, so each event has two elements; +var doneCreatingEvents = false; +var total = argv.count; + +console.log('Generating', total, 'events across ±', argv.days, 'days'); function createIndex(indexName, done) { - console.log('made index', indexName); + // console.log('ensuring index "%s" exists', indexName); + var indexBody = { settings: { index: { @@ -64,8 +65,7 @@ function createIndex(indexName, done) { _default_: { properties: { '@timestamp': { - type: 'date', - index: 'not_analyzed' + type: 'date' }, id: { type: 'integer', @@ -107,42 +107,103 @@ function createIndex(indexName, done) { ignore: 400, index: indexName, body: indexBody - }, done); + }, function (err) { + if (err) return done(err); + + client.cluster.health({ + index: indexName, + waitForStatus: 'yellow' + }, done); + }); } -var bulk = async.queue(function (chunk, done) { - if (typeof chunk === 'string') { - return createIndex(chunk, done); +var queue = async.queue(function (events, done) { + + var body = []; + var deps = []; + + events.forEach(function (event) { + var header = event.header; + event = event.body; + + if (indices[event.index] !== true) { + deps.push(async.apply(createIndex, event.index)); + indices[event.index] = true; + } + + body.push({ index: header }, event); + }); + + async.parallel(deps, function (err) { + if (err) return done(err); + + client.bulk({ + body: body + }, function (err, resp) { + if (err) return done(err); + + if (resp.errors) { + console.log(resp); + console.log(JSON.stringify(body, null, ' ')); + console.log(JSON.stringify(resp, null, ' ')); + process.exit(); + } + + process.stdout.write('.'); + done(); + }); + }); + +}, 1); + +queue.drain = function () { + if (doneCreatingEvents) { + client.close(); + process.stdout.write('.\n\ncreated ' + total + ' events\n\n'); } - - console.info('writing', chunk.length / 2, 'documents'); - client.bulk({ - body: chunk - }, done); -}, 3); - -bulk.drain = function () { - if (!doneEventing) { - // console.log('indexed faster than the events were created'); - return; - } - - client.close(); - console.log('done'); }; -async.timesSeries(argv.count, function (i, done) { +var topLevelErrorHandler = function (err) { + if (err) { + console.error(err.resp); + console.error(err.stack); + process.exit(); + } +}; + +var unqueuedEvents = []; +unqueuedEvents.queue = function () { + queue.push([unqueuedEvents.splice(0)], topLevelErrorHandler); +}; + +async.timesSeries(total, function (i, done) { // random date, plus less random time - var date = moment(samples.randomMsInDayRange()) - .utc() - .startOf('day') - .add('milliseconds', samples.lessRandomMsInDay()); + var date = new Date(samples.randomMsInDayRange()); + var ms = samples.lessRandomMsInDay(); + + // extract number of hours from the milliseconds + var hours = Math.floor(ms / 3600000); + ms = ms - hours * 3600000; + + // extract number of minutes from the milliseconds + var minutes = Math.floor(ms / 60000); + ms = ms - minutes * 60000; + + // extract number of seconds from the milliseconds + var seconds = Math.floor(ms / 1000); + ms = ms - seconds * 1000; + + // apply the values found to the date + date.setUTCHours(hours, minutes, seconds, ms); + + var dateAsIso = date.toISOString(); + var indexName = 'logstash-' + dateAsIso.substr(0, 4) + '.' + dateAsIso.substr(5, 2) + '.' + dateAsIso.substr(8, 2); var event = {}; - event.index = date.format('[logstash-]YYYY.MM.DD'); - event['@timestamp'] = date.toISOString(); + event.index = indexName; + event['@timestamp'] = dateAsIso; event.ip = samples.ips(); event.extension = samples.extensions(); event.response = samples.responseCodes(); @@ -152,7 +213,7 @@ async.timesSeries(argv.count, function (i, done) { samples.tags(), samples.tags2() ]; - event.utc_time = date.toISOString(); + event.utc_time = dateAsIso; event.referer = 'http://' + samples.referrers() + '/' + samples.tags() + '/' + samples.astronauts(); event.agent = samples.userAgents(); event.clientip = event.ip; @@ -161,33 +222,21 @@ async.timesSeries(argv.count, function (i, done) { if (event.extension === 'php') { event.phpmemory = event.memory = event.bytes * 40; } - event['@message'] = event.ip + ' - - [' + date.toISOString() + '] "GET ' + event.request + ' HTTP/1.1" ' + + event['@message'] = event.ip + ' - - [' + dateAsIso + '] "GET ' + event.request + ' HTTP/1.1" ' + event.response + ' ' + event.bytes + ' "-" "' + event.agent + '"'; - - if (indices[event.index] !== true) { - bulk.push(event.index); // when it receives a string it handles that immediately - indices[event.index] = true; - } - - events.push( - { - index: { - _index: event.index, - _type: samples.types(), - _id: i - } + unqueuedEvents.push({ + header: { + _index: event.index, + _type: samples.types(), + _id: i, }, - event - ); - - // eventsPerBulk must be multiplied by 2 because each event is two elements long - if (events.length === eventElementsPerBulk || i === argv.count - 1) { - bulk.push([events.splice(0, eventElementsPerBulk)]); - } + body: event + }); + unqueuedEvents.length === 3500 && unqueuedEvents.queue(); setImmediate(done); }, function () { - console.log('done creating events'); - doneEventing = true; + unqueuedEvents.length && unqueuedEvents.queue(); + doneCreatingEvents = true; }); \ No newline at end of file diff --git a/scripts/generate/logs/index_cbs.js b/scripts/generate/logs/index_cbs.js new file mode 100644 index 000000000..1924d740f --- /dev/null +++ b/scripts/generate/logs/index_cbs.js @@ -0,0 +1,242 @@ +/* jshint curly:false */ +// args +var argv = require('optimist') + .usage('node scripts/generate/logs [-h|--host localhost:9200] [-c|--count 14000] [-d|--days 7]') + .options({ + count: { + alias: 'c', + type: 'number', + default: 14000 + }, + days: { + alias: 'd', + type: 'number', + required: true + }, + host: { + alias: 'h', + default: 'localhost:9200' + } + }) + .argv; + +var es = require('../../../src/elasticsearch'); +var async = require('async'); +var moment = require('moment'); +var makeSamples = require('./samples').make; +// var Promise = require('bluebird'); + +var startingMoment = moment().utc().startOf('day').subtract('days', argv.days); +var endingMoment = moment().utc().endOf('day').add('days', argv.days); +var clientConfig = { + // log: { + // level: 'trace', + // type: 'file', + // path: require('path').join(__dirname, '../../../log') + // } +}; + +if (argv.host) { + clientConfig.hosts = argv.host; +} else if (argv.hosts) { + clientConfig.hosts = JSON.parse(argv.hosts); +} + +var client = new es.Client(clientConfig); +var samples = makeSamples(startingMoment, endingMoment); + +var indices = {}; +var doneCreatingEvents = false; +var total = argv.count; + +console.log('Generating', total, 'events across ±', argv.days, 'days'); + +function createIndex(indexName, done) { + // console.log('ensuring index "%s" exists', indexName); + + var indexBody = { + settings: { + index: { + number_of_shards: 1, + number_of_replicas: 0 + } + }, + mappings: { + _default_: { + properties: { + '@timestamp': { + type: 'date' + }, + id: { + type: 'integer', + index: 'not_analyzed', + include_in_all: false + }, + country: { + type: 'string', + index: 'not_analyzed' + }, + agent: { + type: 'multi_field', + fields: { + agent: { + type: 'string', + index: 'analyzed' + }, + raw: { + type: 'string', + index: 'not_analyzed' + } + } + }, + clientip: { + type: 'ip' + }, + ip: { + type: 'ip' + }, + memory: { + type: 'double' + } + } + } + } + }; + + client.indices.create({ + ignore: 400, + index: indexName, + body: indexBody + }, function (err) { + if (err) return done(err); + + client.cluster.health({ + index: indexName, + waitForStatus: 'yellow' + }, done); + }); +} + +var queue = async.queue(function (events, done) { + + var body = []; + var deps = []; + + events.forEach(function (event) { + var header = event.header; + event = event.body; + + if (indices[event.index] !== true) { + deps.push(async.apply(createIndex, event.index)); + indices[event.index] = true; + } + + body.push({ index: header }, event); + }); + + async.parallel(deps, function (err) { + if (err) return done(err); + + client.bulk({ + body: body + }, function (err, resp) { + if (err) return done(err); + + if (resp.errors) { + console.log(resp); + console.log(JSON.stringify(body, null, ' ')); + console.log(JSON.stringify(resp, null, ' ')); + process.exit(); + } + + process.stdout.write('.'); + done(); + }); + }); + +}, 1); + +queue.drain = function () { + if (doneCreatingEvents) { + client.close(); + process.stdout.write('.\n\ncreated ' + total + ' events\n\n'); + } +}; + +var topLevelErrorHandler = function (err) { + if (err) { + console.error(err.resp); + console.error(err.stack); + process.exit(); + } +}; + +var unqueuedEvents = []; +unqueuedEvents.queue = function () { + queue.push([unqueuedEvents.splice(0)], topLevelErrorHandler); +}; + +async.timesSeries(total, function (i, done) { + + // random date, plus less random time + var date = new Date(samples.randomMsInDayRange()); + + var ms = samples.lessRandomMsInDay(); + + // extract number of hours from the milliseconds + var hours = Math.floor(ms / 3600000); + ms = ms - hours * 3600000; + + // extract number of minutes from the milliseconds + var minutes = Math.floor(ms / 60000); + ms = ms - minutes * 60000; + + // extract number of seconds from the milliseconds + var seconds = Math.floor(ms / 1000); + ms = ms - seconds * 1000; + + // apply the values found to the date + date.setUTCHours(hours, minutes, seconds, ms); + + var dateAsIso = date.toISOString(); + var indexName = 'logstash-' + dateAsIso.substr(0, 4) + '.' + dateAsIso.substr(5, 2) + '.' + dateAsIso.substr(8, 2); + var event = {}; + + event.index = indexName; + event['@timestamp'] = dateAsIso; + event.ip = samples.ips(); + event.extension = samples.extensions(); + event.response = samples.responseCodes(); + event.country = samples.countries(); + event.point = samples.airports(); + event['@tags'] = [ + samples.tags(), + samples.tags2() + ]; + event.utc_time = dateAsIso; + event.referer = 'http://' + samples.referrers() + '/' + samples.tags() + '/' + samples.astronauts(); + event.agent = samples.userAgents(); + event.clientip = event.ip; + event.bytes = event.response < 500 ? samples.lessRandomRespSize() : 0; + event.request = '/' + samples.astronauts() + '.' + event.extension; + if (event.extension === 'php') { + event.phpmemory = event.memory = event.bytes * 40; + } + event['@message'] = event.ip + ' - - [' + dateAsIso + '] "GET ' + event.request + ' HTTP/1.1" ' + + event.response + ' ' + event.bytes + ' "-" "' + event.agent + '"'; + + unqueuedEvents.push({ + header: { + _index: event.index, + _type: samples.types(), + _id: i, + }, + body: event + }); + + unqueuedEvents.length === 3500 && unqueuedEvents.queue(); + setImmediate(done); +}, function () { + unqueuedEvents.length && unqueuedEvents.queue(); + doneCreatingEvents = true; +}); \ No newline at end of file diff --git a/scripts/generate/logs/index_promises.js b/scripts/generate/logs/index_promises.js new file mode 100644 index 000000000..a5713b122 --- /dev/null +++ b/scripts/generate/logs/index_promises.js @@ -0,0 +1,253 @@ +/* jshint curly:false, latedef:false */ +// args +var argv = require('optimist') + .usage('node scripts/generate/logs [-h|--host localhost:9200] [-c|--count 14000] [-d|--days 7]') + .options({ + count: { + alias: 'c', + type: 'number', + default: 14000 + }, + days: { + alias: 'd', + type: 'number', + required: true + }, + host: { + alias: 'h', + default: 'localhost:9200' + } + }) + .argv; + +var es = require('../../../src/elasticsearch'); +var async = require('async'); +var moment = require('moment'); +var makeSamples = require('./samples').make; +var Promise = require('bluebird'); + +var startingMoment = moment().utc().startOf('day').subtract('days', argv.days); +var endingMoment = moment().utc().endOf('day').add('days', argv.days); +var clientConfig = { + // log: { + // level: 'trace', + // type: 'file', + // path: require('path').join(__dirname, '../../../log') + // } +}; + +if (argv.host) { + clientConfig.hosts = argv.host; +} else if (argv.hosts) { + clientConfig.hosts = JSON.parse(argv.hosts); +} + +var client = new es.Client(clientConfig); +var samples = makeSamples(startingMoment, endingMoment); + +var indices = {}; +var doneCreatingEvents = false; +var total = argv.count; + +console.log('Generating', total, 'events across ±', argv.days, 'days'); + +function createIndex(indexName) { + // console.log('ensuring index "%s" exists', indexName); + + var indexBody = { + settings: { + index: { + number_of_shards: 1, + number_of_replicas: 0 + } + }, + mappings: { + _default_: { + properties: { + '@timestamp': { + type: 'date' + }, + id: { + type: 'integer', + index: 'not_analyzed', + include_in_all: false + }, + country: { + type: 'string', + index: 'not_analyzed' + }, + agent: { + type: 'multi_field', + fields: { + agent: { + type: 'string', + index: 'analyzed' + }, + raw: { + type: 'string', + index: 'not_analyzed' + } + } + }, + clientip: { + type: 'ip' + }, + ip: { + type: 'ip' + }, + memory: { + type: 'double' + } + } + } + } + }; + + return client.indices.create({ + ignore: 400, + index: indexName, + body: indexBody + }) + .then(function () { + return client.cluster.health({ + index: indexName, + waitForStatus: 'yellow' + }); + }); +} + + +var queue = async.queue(function (events, done) { + + var body = []; + var deps = []; + + events.forEach(function (event) { + var header = event.header; + event = event.body; + + if (indices[event.index] !== true) { + deps.push(createIndex(event.index)); + indices[event.index] = true; + } + + body.push({ index: header }, event); + }); + + Promise.all(deps) + .then(function () { + if (body.length) { + return client.bulk({ + body: body + }); + } else { + return {}; + } + }) + .then(function (resp) { + if (resp.errors) { + var errors = []; + + resp.items.forEach(function (item, i) { + if (item.index.error) { + errors.push(item.index.error); + eventBuffer.push(events[i]); + } + }); + + console.log('\n - errors - \n' + errors.join('\n') + '\n'); + } + }) + .finally(function () { + process.stdout.write('.'); + }) + .nodeify(done); + +}, 1); + +var eventBuffer = []; +eventBuffer.flush = function () { + if (eventBuffer.length === 3500 || doneCreatingEvents) { + queue.push([eventBuffer.splice(0)], function (err) { + if (err) { + console.error(err.resp); + console.error(err.stack); + process.exit(); + } + }); + } +}; + +queue.drain = function () { + if (doneCreatingEvents && eventBuffer.length === 0) { + client.close(); + process.stdout.write('.\n\ncreated ' + total + ' events\n\n'); + } else { + eventBuffer.flush(); + } +}; + +async.timesSeries(total, function (i, done) { + + // random date, plus less random time + var date = new Date(samples.randomMsInDayRange()); + + var ms = samples.lessRandomMsInDay(); + + // extract number of hours from the milliseconds + var hours = Math.floor(ms / 3600000); + ms = ms - hours * 3600000; + + // extract number of minutes from the milliseconds + var minutes = Math.floor(ms / 60000); + ms = ms - minutes * 60000; + + // extract number of seconds from the milliseconds + var seconds = Math.floor(ms / 1000); + ms = ms - seconds * 1000; + + // apply the values found to the date + date.setUTCHours(hours, minutes, seconds, ms); + + var dateAsIso = date.toISOString(); + var indexName = 'logstash-' + dateAsIso.substr(0, 4) + '.' + dateAsIso.substr(5, 2) + '.' + dateAsIso.substr(8, 2); + var event = {}; + + event.index = indexName; + event['@timestamp'] = dateAsIso; + event.ip = samples.ips(); + event.extension = samples.extensions(); + event.response = samples.responseCodes(); + event.country = samples.countries(); + event.point = samples.airports(); + event['@tags'] = [ + samples.tags(), + samples.tags2() + ]; + event.utc_time = dateAsIso; + event.referer = 'http://' + samples.referrers() + '/' + samples.tags() + '/' + samples.astronauts(); + event.agent = samples.userAgents(); + event.clientip = event.ip; + event.bytes = event.response < 500 ? samples.lessRandomRespSize() : 0; + event.request = '/' + samples.astronauts() + '.' + event.extension; + if (event.extension === 'php') { + event.phpmemory = event.memory = event.bytes * 40; + } + event['@message'] = event.ip + ' - - [' + dateAsIso + '] "GET ' + event.request + ' HTTP/1.1" ' + + event.response + ' ' + event.bytes + ' "-" "' + event.agent + '"'; + + eventBuffer.push({ + header: { + _index: event.index, + _type: samples.types(), + _id: i, + }, + body: event + }); + + eventBuffer.flush(); + setImmediate(done); +}, function () { + doneCreatingEvents = true; + eventBuffer.flush(); +}); \ No newline at end of file diff --git a/scripts/generate/logs/samples/index.js b/scripts/generate/logs/samples/index.js index 4df7ad4ee..20b318ef9 100644 --- a/scripts/generate/logs/samples/index.js +++ b/scripts/generate/logs/samples/index.js @@ -8,11 +8,12 @@ var dayMs = 86400000; exports.make = function (startingMoment, endingMoment) { var sets = {}; + var startms = startingMoment.toDate().getTime(); + var endms = endingMoment.toDate().getTime(); - sets.randomMsInDayRange = new Stochator({ - min: startingMoment.toDate().getTime(), - max: endingMoment.toDate().getTime() - }, 'get'); + sets.randomMsInDayRange = function () { + return _.random(startms, endms); + }; sets.lessRandomRespSize = new Stochator({ mean: 4500, @@ -86,11 +87,9 @@ exports.make = function (startingMoment, endingMoment) { 'apache': 4 }); - return _.transform(sets, function (note, set, name) { - if (name === 'days') { - return note[name] = set; - } - - note[name] = _.bindKey(set, 'get'); - }, {}); + return _.mapValues(sets, function (set) { + return (typeof set === 'function') ? set : function () { + return set.get(); + }; + }); }; diff --git a/scripts/generate/logs/samples/weighted_list.js b/scripts/generate/logs/samples/weighted_list.js index c96fa9efd..5f2c73f1c 100644 --- a/scripts/generate/logs/samples/weighted_list.js +++ b/scripts/generate/logs/samples/weighted_list.js @@ -111,7 +111,7 @@ WeightedList.prototype._update = function () { sum = 0, totals = []; - _.each(me, function (item) { + me.forEach(function (item) { sum += item.weight; totals.push(sum); });