Skip to content

Instantly share code, notes, and snippets.

@wires
Created October 26, 2015 15:36
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save wires/1a58806720fee874d419 to your computer and use it in GitHub Desktop.
Save wires/1a58806720fee874d419 to your computer and use it in GitHub Desktop.
Elasticsearch + Highland streams

Elasticsearch and Highland streams

Example how to use Elasticsearch with highland streams

module.exports = {
"settings": {
"number_of_shards": 5,
"number_of_replicas": 0
},
"mappings": {
"_default_": {
"properties": {
"geometry": {
"precision": "1m",
"tree": "quadtree",
"type": "geo_shape"
},
"uri": {
"index": "not_analyzed",
"type": "string"
},
"id": {
"index": "not_analyzed",
"store": true,
"type": "string"
},
"type": {
"index": "not_analyzed",
"type": "string"
},
"name": {
"fields": {
"analyzed": {
"index": "analyzed",
"store": true,
"type": "string"
},
"exact": {
//"analyzer": "lowercase",
"store": true,
"type": "string"
}
},
"type": "string"
},
"dataset": {
"index": "not_analyzed",
"type": "string"
},
"validSince": {
"format": "date_optional_time",
"type": "date"
},
"validUntil": {
"format": "date_optional_time",
"type": "date"
}
}
}
}
}
var H = require('highland')
var ES = require('elasticsearch')
var client = new ES.Client({
host: 'localhost:9200',
//log: 'trace'
});
var indexDocument = H.wrapCallback(client.index.bind(client))
var createIndex = H.wrapCallback(client.indices.create.bind(client.indices))
var docs = [
{ index: 'test'
, type: 'test'
, id: 'xyz123'
, body: { fuck: 'you' }
},
{ index: 'test2'
, type: 'test'
, id: 'xyz125'
, body: { flappy: 'flappy' }
}
]
var defaultMapping = require('./default-mapping')
H(docs)
.map(H.get('index'))
.uniq()
.map(function(indexName){
return {
index: indexName,
body: defaultMapping
}
})
.map(createIndex)
.series()
.errors(function(err){console.log("ignored", err && err.message)})
.collect()
.map(function(results){
console.log("Results =>", results)
return H(docs)
})
.flatten()
.map(indexDocument)
.series()
.errors(console.error.bind(console))
.each(H.log)
{
"name": "es",
"version": "1.0.0",
"description": "",
"main": "index.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"keywords": [],
"author": "Jelle 'wires' Herold",
"license": "ISC",
"dependencies": {
"elasticsearch": "^8.2.0",
"highland": "^2.5.1"
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment