Skip to content

Instantly share code, notes, and snippets.

@varontron
Created October 27, 2020 14:10
Show Gist options
  • Save varontron/ebcf52544e50157a3d4c59681838fa1b to your computer and use it in GitHub Desktop.
Save varontron/ebcf52544e50157a3d4c59681838fa1b to your computer and use it in GitHub Desktop.
Diskover deduping
#!/usr/bin/env node
'use strict'
import { createRequire } from 'module';
const require = createRequire(import.meta.url);
const fs = require('fs')
const { Client } = require('@elastic/elasticsearch')
const client = new Client({ node: 'http://localhost:9200' })
let i = 0
const size = 1000
const num_partitions = (4*10**6)/size
function getParams (partition) {
return {
index: 'diskover-index',
size: 1,
_source: ["filehash","path_parent","filename","worker_name","indexing_date"],
body: {
query: {
"bool": {
"filter": [{
"exists": {
"field": "filename"
}
},{
"exists": {
"field": "path_parent"
}
}
]
}
},
"aggs": {
"path":{
"terms":{
"include": {
"partition": partition,
"num_partitions": num_partitions
},
"script": {
"source": "doc['filehash'].value + '|' + doc['path_parent'].value + '/' + doc['filename'].value"
},
"size": size,
"min_doc_count":2
}
}
}
}
}
}
async function * partitionSearch () {
while (i < num_partitions) {
var response = await client.search(getParams(i++))
const aggs = response.body.aggregations.path.buckets
let sourceHits
for (const agg of aggs) {
yield agg
}
}
}
async function run () {
for await (const agg of partitionSearch()) {
const hash = agg.key.split(/\|/)[0]
fs.appendFile('./hashes.txt',`${hash}\n`, (err) => {
if (err) throw err;
})
}
}
run().catch(e => console.dir(e,{depth:null}))
#!/usr/bin/env node
'use strict'
import { createRequire } from 'module';
const require = createRequire(import.meta.url);
const readline = require('readline');
const fs = require('fs')
const { Client } = require('@elastic/elasticsearch')
const client = new Client({ node: 'http://localhost:9200' })
const readInterface = readline.createInterface({
input: fs.createReadStream('./hashesDates.txt')
});
function getHashParams (hash,date) {
return {
index: 'diskover-index',
conflicts: "proceed",
body: {
script: {
lang: 'painless',
source: 'ctx._source.ignore=true'
},
"query": {
"bool": {
"must":[{
"match": {
"filehash": hash
}
},{
"match": {
"indexing_date": date
}
}]
}
}
}
}
}
function getPathParams (path,date) {
let arr = path.split(/\//)
const filename = arr.pop()
const path_parent = arr.join('/')
return {
index: 'diskover-index',
conflicts: "proceed",
body: {
script: {
lang: 'painless',
source: 'ctx._source.ignore=true'
},
"query": {
"bool": {
"must": [
{
"match": {
"path_parent": path_parent
}
},{
"match": {
"filename": filename
}
},{
"match": {
"indexing_date": date
}
}
]
}
}
}
}
}
async function updateByQuery (line) {
const [val,date] = line.split(/\|/)
let response
try {
if (val.length === 32 && !/^\//.test(val))
{
response = await client.updateByQuery(getHashParams(val,date))
}
else
{
response = await client.updateByQuery(getPathParams(val,date))
}
}
catch(err) {
console.dir(err, {depth:null})
console.dir(response.body, {depth:null})
}
return response.body
}
async function run () {
readInterface.on('line', async function(line) {
const result = await updateByQuery(line)
fs.appendFile('./updates.log', JSON.stringify(result) , (err) => {
if (err) throw err;
})
})
}
run()
#!/usr/bin/env node
'use strict'
import { createRequire } from 'module';
const require = createRequire(import.meta.url);
const readline = require('readline');
const fs = require('fs')
const { Client } = require('@elastic/elasticsearch')
const client = new Client({ node: 'http://localhost:9200' })
const readInterface = readline.createInterface({
input: fs.createReadStream('./hashes.txt'),
});
function getHashParams (hash) {
return {
index: 'diskover-index',
"_source": ["filehash","path_parent","filename","worker_name","indexing_date"],
body: {
"query": {
"term": {
"filehash": {
"value": hash
}
}
},
"aggs": {
"max_date": {
"max": { "field": "indexing_date" }
}
}
}
}
}
function getPathParams (path) {
let arr = path.split(/\//)
const filename = arr.pop()
const path_parent = arr.join('/')
return {
index: 'diskover-index',
"_source": ["filehash","path_parent","filename","worker_name","indexing_date"],
body: {
"query": {
"bool": {
"must": [{
"match": {
"path_parent": path_parent
}
},{
"match": {
"filename": filename
}
}
]
}
},
"aggs": {
"max_date": {
"max": { "field": "indexing_date" }
}
}
}
}
}
async function termSearch (val) {
let response
try {
if (val.length === 32 && !/^\//.test(val))
{
response = await client.search(getHashParams(val))
}
else
{
response = await client.search(getPathParams(val))
}
}
catch(err) {
console.dir(err, {depth:null})
console.dir(response.body, {depth:null})
}
const max_date = response.body.aggregations.max_date.value
return max_date
}
async function run () {
readInterface.on('line', async function(line) {
const max_date = await termSearch(line)
fs.appendFile('./hashesDates.txt',`${line}|${max_date}\n`, (err) => {
if (err) throw err;
})
})
}
run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment