Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
mongo-connector with ES 2.2.0 and Mongo 2.6

Before starting please have a mongo replica set initiated. I have mongo running at localhost:27170 and elasticsearch at localhost:9200

The test dataset I used is available on the mongodb website.

I am using MongoDB 2.6 and Elasticsearch 2.2.0 on a Windows machine.

Here I have tested three cases:

  • An index already exists, mc-test-index in this case
  • A template for an index exists. mc-template in this case
  • No destination index is specified

Expections:

  • The documents should be directly index according to settings/mappings given in new_template.json for mc-test-index.
  • The index named new_test_template should be created according to the template given in new_template.json and normal indexing should occur.
  • For temp_default, the index should have the default mappings/settings

Files:

  • create_template_and_index.py creates the index and template
  • new_template.json contains the template and settings/mappings of the indices being considered
  • upsert_to_mongo.py reads data from the dataset and inserts into mongo collections

TODO:

  • This tool has not been tested as a service
  • Source and namespace destinations when given as comma-seprated lists do not work

mongo-connector -m localhost:27017 -t localhost:9200 -o E:/mongo-connector.oplog -d elastic2_doc_manager -n test_template.dataset,test_created_index.dataset,test_default.dataset -g new_test_template.dataset,mc-test-index.dataset,test_default.dataset seems to be unstable.

from elasticsearch import Elasticsearch
from pprint import pprint
import json
es_host = {"host": "localhost", "post": 9200}
es_client = Elasticsearch(hosts=[es_host], timeout=180)
template_body = json.load(open("new_template.json", "rb"))
response = es_client.indices.put_template(name="mc-template", body=template_body)
pprint(response)
template_body.pop("template")
response = es_client.indices.create(index="mc-test-index", body=template_body)
pprint(response)
mongo-connector -m localhost:27017 -t localhost:9200 -o E:/mongo-connector.oplog -d elastic2_doc_manager -n test_created_index.dataset -g mc-test-index.dataset
mongo-connector -m localhost:27017 -t localhost:9200 -o E:/mongo-connector.oplog -d elastic2_doc_manager -n new_created_index.dataset -g new_created_index.dataset
mongo-connector -m localhost:27017 -t localhost:9200 -o E:/mongo-connector.oplog -d elastic2_doc_manager -n test_default.dataset
{
"template": "new*",
"settings": {
"index": {
"number_of_shards": 6,
"number_of_replicas": 1,
"store": {
"type": "mmapfs"
},
"similarity.default.type": "BM25"
},
"analysis": {
"filter": {
"english_stop": {
"type": "stop",
"stopwords": "_english_"
}
},
"analyzer": {
"custom_text": {
"type": "custom",
"tokenizer": "standard",
"filter": ["lowercase", "english_stop"]
}
}
}
},
"mappings": {
"dataset": {
"properties": {
"address": {
"properties": {
"building": {
"type": "string",
"index": "not_analyzed",
"store": true
},
"coord": {
"type": "double",
"index": "not_analyzed",
"store": true
},
"street": {
"type": "string",
"index": "not_analyzed",
"store": true
},
"zipcode": {
"type": "string",
"index": "not_analyzed",
"store": true
}
}
},
"borough": {
"type": "string",
"index": "not_analyzed",
"store": true
},
"cuisine": {
"type": "string",
"index": "not_analyzed",
"store": true
},
"grades": {
"properties": {
"date": {
"properties": {
"iso_date": {
"type": "date",
"format": "yyyy-MM-dd'T'HH:mm:ssZ",
"store": true
}
}
},
"grade": {
"type": "string",
"index": "not_analyzed",
"store": true
},
"score": {
"type": "long",
"index": "not_analyzed",
"store": true
}
}
},
"name": {
"type": "string",
"analyzer": "custom_text",
"store": true
},
"restaurant_id": {
"type": "string",
"index": "not_analyzed"
}
}
}
}
}
from pymongo import MongoClient
import json
from datetime import datetime
import pytz
client = MongoClient()
db1 = client["test_template"]
collection1 = db1["dataset"]
db2 = client["test_created_index"]
collection2 = db2["dataset"]
db3 = client["test_default"]
collection3 = db3["dataset"]
with open("dataset.json", "rb") as input_file:
for line in input_file.readlines():
document = json.loads(line.strip())
## removing some issues in the JSON
for grade in document["grades"]:
current_grade = grade["date"]
current_grade_time = current_grade["$date"]
new_date_format = datetime.fromtimestamp(current_grade_time/1000., tz=pytz.utc).isoformat()
current_grade["iso_date"] = new_date_format
current_grade.pop("$date")
print document["restaurant_id"]
collection1.insert_one(document=document)
collection2.insert_one(document=document)
collection3.insert_one(document=document)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.