Skip to content

Instantly share code, notes, and snippets.

@klacabane
Created December 28, 2021 17:08
Show Gist options
  • Save klacabane/a180c5764dacdde8d181d018aa759672 to your computer and use it in GitHub Desktop.
Save klacabane/a180c5764dacdde8d181d018aa759672 to your computer and use it in GitHub Desktop.
/** run as kibana/packages/kbn-es-archiver/to-datastream.js **/
const { createReadStream, createWriteStream } = require('fs');
const { rename, rm } = require('fs/promises');
const { Transform } = require('stream');
const { createGzip } = require('zlib');
const { createPromiseFromStreams } = require('@kbn/utils');
const { createParseArchiveStreams } = require('./src/lib/archives/parse');
const homedir = require('os').homedir();
const path = require('path');
const esArchivesDir = path.join(
homedir,
'workspace/kibana',
'x-pack/test/functional/es_archives/monitoring'
);
const directories = [path.join(esArchivesDir, 'singlecluster_yellow_platinum_mb')];
(async () => {
await Promise.all(
directories.map(async (directory) => {
const sourceFile = path.join(directory, 'data.json.gz');
const targetFile = path.join(directory, 'data_ds.json.gz');
await createPromiseFromStreams([
createReadStream(sourceFile),
...createParseArchiveStreams({ gzip: true }),
new Transform({
objectMode: true,
transform(item, _encoding, callback) {
const source = item.value.source;
const dataset = source.metricset.name;
if (!source['@timestamp']) {
return callback();
}
if (source.kibana) {
const { index, name, status, transport_address, host, version, snapshot } =
source.kibana.kibana;
delete source.kibana.kibana;
delete source.kibana.stats.kibana;
delete source.elasticsearch.node;
source.kibana.elasticsearch = source.elasticsearch;
delete source.elasticsearch;
source.kibana.stats.index = index;
source.kibana.stats.name = name;
source.kibana.stats.status = status;
source.kibana.stats.snapshot = snapshot;
source.kibana.stats.transport_address = transport_address;
source.kibana.stats.host = { name: host };
source.service.version = version;
}
item.value.index = `.monitoring-${source.kibana ? 'kibana' : 'es'}-8-mb`;
callback(null, JSON.stringify(item, null, 2) + '\n'.repeat(2));
},
}),
createGzip(),
createWriteStream(path.join(directory, 'data_ds.json.gz')),
]);
await rename(targetFile, sourceFile);
await rm(path.join(directory, 'mappings.json'));
})
);
})();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment