Skip to content

Instantly share code, notes, and snippets.

@dpeterka
Created May 20, 2015 22:30
Show Gist options
  • Save dpeterka/b250b71827ad59529cad to your computer and use it in GitHub Desktop.
Save dpeterka/b250b71827ad59529cad to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from elasticsearch import Elasticsearch
from elasticsearch.exceptions import RequestError
from urlparse import urlparse
from subprocess import Popen, PIPE
import boto3
import sys
import os
import argparse
import logging
import logging.config
import Queue
# make python <3 utf-8
reload(sys)
sys.setdefaultencoding('utf8')
logging.config.fileConfig('logging.ini')
parser = argparse.ArgumentParser()
parser.add_argument('--data', help='The data directory. Either a local directory [/path/to/data] or an S3 bucket [s3://bucket/path/to/data].', required=True)
parser.add_argument('--host', help='The elasticsearch host name [ex: localhost]', required=True)
parser.add_argument('--protocol', help='The elasticsearch protocol [default: http]', default='http')
parser.add_argument('--port', help='The elasticsearch port [default: 9200]', type=int, default=9200)
parser.add_argument('--index', help='The elasticsearch index [ex: my-index]', required=True)
parser.add_argument('--type', help='The elasticsearch type [ex: my-type]', required=True)
parser.add_argument('--mapping', help='A file containing the elasticsearch mappings and settings to create the index. Either a local file [/path/to/mapping.json] or an S3 object [s3://bucket/path/to/mapping.json].', default=None)
args = parser.parse_args()
es_url = args.protocol + '://' + args.host + ':' + str(args.port) + '/' + args.index + '/' + args.type
# Number of parallel jobs to run
num_worker_threads=8
# Create a Queue to hold jobs, Threads will pull from the queue and execute them
q = Queue.Queue()
# decompress gzip string
def decompress_gzip(data):
return Popen(['zcat'], stdout=PIPE, stdin=PIPE).communicate(input=data)[0]
# parse an s3 path into a bucket and key 's3://my-bucket/path/to/data' -> ('my-bucket', 'path/to/data')
def parse_s3_path(str):
_, _, bucket, key = str.split('/', 3)
return (bucket, key)
es = Elasticsearch(host=args.host, port=args.port, timeout=120)
# create a new elasticsearch index with settings and mappings
if args.mapping:
logging.info('creating index %s with mapping %s', es_url, args.mapping)
if args.mapping.startswith('s3://'):
# S3 - https://boto3.readthedocs.org/en/latest/reference/services/s3.html#object
s3 = boto3.resource('s3')
s3_bucket, s3_key = parse_s3_path(args.mapping)
file_handle = s3.Object(s3_bucket, s3_key)
mapping = file_handle['Body'].read()
else:
# local directory
with open(args.mapping, 'r') as file_handle:
mapping = file_handle.read()
try:
es.indices.create(index=args.index, body=mapping)
except RequestError, e:
# you will end up here if the index already exists
logging.exception(e)
sys.exit(1)
# The worker function will pull a job off the queue and execute it.
def worker():
while True:
load_file_job = q.get()
load_file_job.run()
q.task_done()
# Setup worker threads to pull jobs off queue.
for i in range(num_worker_threads):
t = Thread(target=worker)
t.daemon = True
t.start()
# load data
logging.info('starting to load %s to %s', args.data, es_url)
es.indices.put_settings({'index': {'refresh_interval': '-1'}}, index=args.index)
if args.data.startswith('s3://'):
# S3 - https://boto3.readthedocs.org/en/latest/reference/services/s3.html#bucket
s3 = boto3.resource('s3')
s3_bucket, s3_key = parse_s3_path(args.data)
for file_summary in s3.Bucket(s3_bucket).objects.all():
# filter down to files that start with the specified path
if file_summary.key.startswith(s3_key):
file_handle = file_summary.get()
file_contents = file_handle['Body'].read()
if file_contents:
logging.info('loading %s/%s to %s', args.data, file_summary.key, es_url)
if file_summary.key.endswith('.gz'):
file_contents = decompress_gzip(file_contents)
es.bulk(body=file_contents, index=args.index, doc_type=args.type, timeout=120)
else:
# local directory
for root, subdirs, files in os.walk(args.data):
for file_name in files:
if not file_name.startswith('.'):
fq_file_name = os.path.join(root, file_name)
with open(fq_file_name, 'r') as file_handle:
file_contents = file_handle.read()
if file_contents:
logging.info('loading %s to %s', fq_file_name, es_url)
if file_name.endswith('.gz'):
file_contents = decompress_gzip(file_contents)
es.bulk(body=file_contents, index=args.index, doc_type=args.type, timeout=120)
es.indices.put_settings({'index': {'refresh_interval': '1s'}}, index=args.index)
logging.info('finished loading %s to %s', args.data, es_url)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment