Skip to content

Instantly share code, notes, and snippets.

@michellebeard
Forked from stevehanson/es-attach-full.py
Last active January 11, 2017 21:25
Show Gist options
  • Save michellebeard/bbcbeaabe0295aa4266fead0403c4007 to your computer and use it in GitHub Desktop.
Save michellebeard/bbcbeaabe0295aa4266fead0403c4007 to your computer and use it in GitHub Desktop.
Interactive Python script to recursively index files in directory tree to elasticSearch using the elasticsearch-mapper-attachments (https://github.com/elasticsearch/elasticsearch-mapper-attachments) plugin to index files (pdf, docx, html, etc).
# Update to support Elasticsearch 5.1.1 with ingest-attachment plugin
# Also includeded support for HTTP Basic Authentication
# Fixed bug in base64 encoding were newline was introduced
import os
# Constants
HOST = 'http://localhost:9200'
INDEX = 'demo'
TYPE = 'attachment'
# Default X-Pack username and password
USER = 'elastic'
PSWD = 'changeme'
TMP_FILE_NAME = 'tmp.json'
INDEX_FILE_TYPES = ['html', 'pdf', 'doc', 'docx', 'xls', 'xlsx', 'xml']
def ingest():
indexDirectory = raw_input('Index entire directory [Y/n]: ')
if not indexDirectory:
indexDirectory = 'y'
if indexDirectory.lower() == 'y':
dir = raw_input('Directory to index (relative to script): ')
indexDir(dir)
else:
fname = raw_input('File to index (relative to script): ')
createIndexIfDoesntExist()
indexFile(fname)
def indexFile(fname):
print '\nIndexing ' + fname
createEncodedTempFile(fname)
postFileToTheIndex()
os.remove(TMP_FILE_NAME)
print '\n-----------'
def indexDir(dir):
print 'Indexing dir ' + dir
createIndexIfDoesntExist()
for path, dirs, files in os.walk(dir):
for file in files:
fname = os.path.join(path, file)
base, extension = file.rsplit('.', 1)
if extension.lower() in INDEX_FILE_TYPES:
indexFile(fname)
else:
'Skipping {}, not approved file type: {}'.format(fname, extension)
def postFileToTheIndex():
cmd = 'curl -XPOST -u {}:{} "{}/{}/{}?pipeline=attachment" -d @'.format(USER, PSWD, HOST, INDEX, TYPE) + TMP_FILE_NAME
print cmd
os.system(cmd)
def createEncodedTempFile(fname):
import json
import base64
file64 = open(fname, "rb").read()
file64 = base64.b64encode(file64)
print 'writing JSON with base64 encoded file to temp file {}'.format(TMP_FILE_NAME)
f = open(TMP_FILE_NAME, 'w')
data = {'file': file64, 'title': fname}
json.dump(data, f)
f.close()
def createPipeline():
print 'Creating attachment pipeline....'
# create pipeline
os.system('curl -XPUT -u {}:{} "{}/_ingest/pipeline/attachment" -d'.format(USER, PSWD, HOST) + ''' '{
"description": "Extract attachment information",
"processors":
[
{ "attachment": {
"field": "file",
"target_field": "fields",
"indexed_chars": -1
}
},
{ "remove" : { "field" : "file" } }
]
}' ''')
print '\n'
def createIndexIfDoesntExist():
import urllib2
import base64
class HeadRequest(urllib2.Request):
def get_method(self):
return "HEAD"
# check if type exists by sending HEAD request to index
try:
base64string = base64.b64encode('%s:%s' % (USER, PSWD))
r = HeadRequest(HOST + '/' + INDEX + '/' + TYPE)
r.add_header("Authorization", "Basic %s" % base64string)
urllib2.urlopen(r)
except urllib2.HTTPError, e:
if e.code == 404:
createPipeline()
print 'Index doesn\'t exist, creating...'
# Create Mapping
os.system('curl -XPUT -u {}:{} "{}/{}" -d'.format(USER, PSWD, HOST, INDEX) + ''' '{
"mappings": {
"attachment": {
"properties": {
"fields" : {
"properties": {
"date": {
"type": "date",
"format": "strict_date_optional_time",
"store": true
},
"content": {
"type": "text",
"term_vector": "with_positions_offsets",
"store": true
},
"content_type": {
"type": "keyword",
"store": true
},
"title": {
"type": "keyword",
"store": true
},
"keywords": {
"type": "object"
}
}
}
}
}
}
}' ''')
else:
print 'Failed to retrieve index with error code - %s.' % e.code
if __name__ == '__main__':
ingest()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment