Created
May 20, 2015 22:30
-
-
Save dpeterka/b250b71827ad59529cad to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
from elasticsearch import Elasticsearch | |
from elasticsearch.exceptions import RequestError | |
from urlparse import urlparse | |
from subprocess import Popen, PIPE | |
import boto3 | |
import sys | |
import os | |
import argparse | |
import logging | |
import logging.config | |
import Queue | |
# make python <3 utf-8 | |
reload(sys) | |
sys.setdefaultencoding('utf8') | |
logging.config.fileConfig('logging.ini') | |
parser = argparse.ArgumentParser() | |
parser.add_argument('--data', help='The data directory. Either a local directory [/path/to/data] or an S3 bucket [s3://bucket/path/to/data].', required=True) | |
parser.add_argument('--host', help='The elasticsearch host name [ex: localhost]', required=True) | |
parser.add_argument('--protocol', help='The elasticsearch protocol [default: http]', default='http') | |
parser.add_argument('--port', help='The elasticsearch port [default: 9200]', type=int, default=9200) | |
parser.add_argument('--index', help='The elasticsearch index [ex: my-index]', required=True) | |
parser.add_argument('--type', help='The elasticsearch type [ex: my-type]', required=True) | |
parser.add_argument('--mapping', help='A file containing the elasticsearch mappings and settings to create the index. Either a local file [/path/to/mapping.json] or an S3 object [s3://bucket/path/to/mapping.json].', default=None) | |
args = parser.parse_args() | |
es_url = args.protocol + '://' + args.host + ':' + str(args.port) + '/' + args.index + '/' + args.type | |
# Number of parallel jobs to run | |
num_worker_threads=8 | |
# Create a Queue to hold jobs, Threads will pull from the queue and execute them | |
q = Queue.Queue() | |
# decompress gzip string | |
def decompress_gzip(data): | |
return Popen(['zcat'], stdout=PIPE, stdin=PIPE).communicate(input=data)[0] | |
# parse an s3 path into a bucket and key 's3://my-bucket/path/to/data' -> ('my-bucket', 'path/to/data') | |
def parse_s3_path(str): | |
_, _, bucket, key = str.split('/', 3) | |
return (bucket, key) | |
es = Elasticsearch(host=args.host, port=args.port, timeout=120) | |
# create a new elasticsearch index with settings and mappings | |
if args.mapping: | |
logging.info('creating index %s with mapping %s', es_url, args.mapping) | |
if args.mapping.startswith('s3://'): | |
# S3 - https://boto3.readthedocs.org/en/latest/reference/services/s3.html#object | |
s3 = boto3.resource('s3') | |
s3_bucket, s3_key = parse_s3_path(args.mapping) | |
file_handle = s3.Object(s3_bucket, s3_key) | |
mapping = file_handle['Body'].read() | |
else: | |
# local directory | |
with open(args.mapping, 'r') as file_handle: | |
mapping = file_handle.read() | |
try: | |
es.indices.create(index=args.index, body=mapping) | |
except RequestError, e: | |
# you will end up here if the index already exists | |
logging.exception(e) | |
sys.exit(1) | |
# The worker function will pull a job off the queue and execute it. | |
def worker(): | |
while True: | |
load_file_job = q.get() | |
load_file_job.run() | |
q.task_done() | |
# Setup worker threads to pull jobs off queue. | |
for i in range(num_worker_threads): | |
t = Thread(target=worker) | |
t.daemon = True | |
t.start() | |
# load data | |
logging.info('starting to load %s to %s', args.data, es_url) | |
es.indices.put_settings({'index': {'refresh_interval': '-1'}}, index=args.index) | |
if args.data.startswith('s3://'): | |
# S3 - https://boto3.readthedocs.org/en/latest/reference/services/s3.html#bucket | |
s3 = boto3.resource('s3') | |
s3_bucket, s3_key = parse_s3_path(args.data) | |
for file_summary in s3.Bucket(s3_bucket).objects.all(): | |
# filter down to files that start with the specified path | |
if file_summary.key.startswith(s3_key): | |
file_handle = file_summary.get() | |
file_contents = file_handle['Body'].read() | |
if file_contents: | |
logging.info('loading %s/%s to %s', args.data, file_summary.key, es_url) | |
if file_summary.key.endswith('.gz'): | |
file_contents = decompress_gzip(file_contents) | |
es.bulk(body=file_contents, index=args.index, doc_type=args.type, timeout=120) | |
else: | |
# local directory | |
for root, subdirs, files in os.walk(args.data): | |
for file_name in files: | |
if not file_name.startswith('.'): | |
fq_file_name = os.path.join(root, file_name) | |
with open(fq_file_name, 'r') as file_handle: | |
file_contents = file_handle.read() | |
if file_contents: | |
logging.info('loading %s to %s', fq_file_name, es_url) | |
if file_name.endswith('.gz'): | |
file_contents = decompress_gzip(file_contents) | |
es.bulk(body=file_contents, index=args.index, doc_type=args.type, timeout=120) | |
es.indices.put_settings({'index': {'refresh_interval': '1s'}}, index=args.index) | |
logging.info('finished loading %s to %s', args.data, es_url) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment