Skip to content

Instantly share code, notes, and snippets.

@hakanbaysal
Last active January 5, 2020 21:11
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save hakanbaysal/92db82f368bda8ad0782ab4ccce30138 to your computer and use it in GitHub Desktop.
Save hakanbaysal/92db82f368bda8ad0782ab4ccce30138 to your computer and use it in GitHub Desktop.
mongo2elasticsearch
class Main:
def __init__(self):
print("Transfer is started")
self.col = self.mongo_connect()
self.es = self.elastic_connect()
self.get_mongo_data()
@staticmethod
def mongo_connect():
client = pymongo.MongoClient("mongodb://localhost:37017/")
db = client["hgs"]
return db["transitions"]
@staticmethod
def elastic_connect():
return Elasticsearch([{'host': 'localhost', 'port': 9200}])
def get_mongo_data(self):
try:
count = self.col.count()
print(count)
except Exception as e:
print(e)
counter = 1 # ObjectId('5d97543c2af2153e57635330')
message = ''
try:
transitions = self.col.find({'_id':{'$gte':ObjectId('5d97543c2af2153e57635330')}})
for item in transitions:
counter += 1
message = str(counter) + ": " + str(item)
print(message)
Thread(target=self.create_elastic, args=(item,)).start()
sleep(0.01)
except Exception as e:
print("ERROR")
print(message)
print(e)
sys.exit()
def create_elastic(self, data):
id = data['referenceNo']+'_'+str(data['fee'])+'_'+str(data['totalFee'])
del data['_id']
res = self.es.index(index='test', doc_type='test_type', id=id, body=data)
if res['result'] == 'updated':
print('Updated: True')
elif res['result'] == 'created':
print('Created: True')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment