-
-
Save markharwood/7bca8a15a6b0c36b6b412971ee192037 to your computer and use it in GitHub Desktop.
Merge documents from 2 indices with a common field into a new index
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from elasticsearch import helpers | |
from elasticsearch.client import Elasticsearch | |
import argparse | |
parser = argparse.ArgumentParser() | |
parser.add_argument("in1", help="Input index name 1") | |
parser.add_argument("in2", help="Input index name 2") | |
parser.add_argument("commonField", help="The field name common to both in1 and in2") | |
parser.add_argument("out", help="The index where fused results are written") | |
parser.add_argument("-host", help="The elasticsearch host", default="localhost:9200") | |
parser.add_argument("-writesPerBulk", help="The number of records to write in each bulk request", type=int, | |
default=1000) | |
parser.add_argument("-readsPerBulk", help="The number of input records per scroll page ", type=int, | |
default=1000) | |
parser.add_argument("-maxTimeToProcessScrollPage", help="The max time to process page of events", default="1m") | |
args = parser.parse_args() | |
def get_merged_records(es, in1, in2, commonField): | |
lastEntityId="" | |
mergedDoc=None | |
mergeQuery={ | |
"query": { | |
"match_all": { } | |
}, | |
"sort": [ | |
{ | |
commonField: { | |
"order": "asc" | |
} | |
} | |
] | |
} | |
for doc in helpers.scan(es, | |
index=in1+","+in2, | |
query=mergeQuery, | |
size=args.readsPerBulk, | |
scroll=args.maxTimeToProcessScrollPage, | |
preserve_order=True): | |
docSrc=doc["_source"] | |
thisEntityId = docSrc[commonField] | |
if thisEntityId == lastEntityId: | |
# Copy all fields from docSrc to merged doc | |
mergedDoc.update(docSrc) | |
else: | |
if mergedDoc is not None: | |
yield { | |
'_op_type': 'index', | |
"doc": mergedDoc | |
} | |
mergedDoc = docSrc | |
lastEntityId = thisEntityId | |
if mergedDoc is not None: | |
yield { | |
'_op_type': 'index', | |
"doc": mergedDoc | |
} | |
es = Elasticsearch(hosts=args.host) | |
helpers.bulk(es, get_merged_records(es, args.in1, args.in2, args.commonField), | |
index=args.out, | |
# Required for < 7.0 elasticsearch | |
# doc_type="_doc", | |
chunk_size=args.writesPerBulk) | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
DELETE test1 | |
DELETE test2 | |
DELETE test3 | |
POST test1/_doc/2 | |
{ | |
"myId":2, | |
"name":"barName" | |
} | |
POST test1/_doc/1 | |
{ | |
"myId":1, | |
"name":"fooName" | |
} | |
POST test2/_doc/1 | |
{ | |
"myId":1, | |
"address":"fooAddress" | |
} | |
POST test2/_doc/2 | |
{ | |
"myId":2, | |
"address":"barAddress" | |
} | |
python MergeDocs.py test1 test2 myId test3 | |
GET test3/_search |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment