Last active
January 5, 2020 21:11
-
-
Save hakanbaysal/92db82f368bda8ad0782ab4ccce30138 to your computer and use it in GitHub Desktop.
mongo2elasticsearch
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class Main: | |
def __init__(self): | |
print("Transfer is started") | |
self.col = self.mongo_connect() | |
self.es = self.elastic_connect() | |
self.get_mongo_data() | |
@staticmethod | |
def mongo_connect(): | |
client = pymongo.MongoClient("mongodb://localhost:37017/") | |
db = client["hgs"] | |
return db["transitions"] | |
@staticmethod | |
def elastic_connect(): | |
return Elasticsearch([{'host': 'localhost', 'port': 9200}]) | |
def get_mongo_data(self): | |
try: | |
count = self.col.count() | |
print(count) | |
except Exception as e: | |
print(e) | |
counter = 1 # ObjectId('5d97543c2af2153e57635330') | |
message = '' | |
try: | |
transitions = self.col.find({'_id':{'$gte':ObjectId('5d97543c2af2153e57635330')}}) | |
for item in transitions: | |
counter += 1 | |
message = str(counter) + ": " + str(item) | |
print(message) | |
Thread(target=self.create_elastic, args=(item,)).start() | |
sleep(0.01) | |
except Exception as e: | |
print("ERROR") | |
print(message) | |
print(e) | |
sys.exit() | |
def create_elastic(self, data): | |
id = data['referenceNo']+'_'+str(data['fee'])+'_'+str(data['totalFee']) | |
del data['_id'] | |
res = self.es.index(index='test', doc_type='test_type', id=id, body=data) | |
if res['result'] == 'updated': | |
print('Updated: True') | |
elif res['result'] == 'created': | |
print('Created: True') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment