Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
elasticsearch hadoop outputformat for bulk indexing
public class ElasticSearchBulkOutputFormat extends OutputFormat<Text, Text> {
private static final Logger LOG = LoggerFactory.getLogger(ElasticSearchBulkOutputFormat.class);
protected class ElasticSearchBulkRecordWriter extends RecordWriter<Text, Text> {
private Conf.ElasticSearchAddress[] addresses;
private Node node;
private Client client;
private String indexName;
private int bulkSize;
private String objType;
// Used for bookkeeping purposes
private AtomicLong totalBulkTime = new AtomicLong();
private AtomicLong totalBulkItems = new AtomicLong();
private long runStartTime = System.currentTimeMillis();
private long lastLogTime = 0;
// For hadoop configuration
private static final String ES_BULK_SIZE = "elasticsearch.bulk.size";
private volatile BulkRequestBuilder currentRequest;
public ElasticSearchBulkRecordWriter(TaskAttemptContext context) {
Configuration conf = context.getConfiguration();
this.indexName = "piquant";
this.bulkSize = Integer.parseInt(conf.get(ES_BULK_SIZE));
this.objType = "bio";
addresses = new Conf.ElasticSearchAddress[conf.getStrings("elasticsearch.cluster.addresses").length];
int i = 0;
for(String esAddr : conf.getStrings("elasticsearch.cluster.addresses")) {
addresses[i] = new Conf.ElasticSearchAddress(esAddr.split(":")[0], esAddr.split(":")[1]);
i++;
}
start_embedded_client();
initialize_index(indexName);
currentRequest = client.prepareBulk();
}
/**
Closes the connection to elasticsearch. Any documents remaining in the bulkRequest object are indexed.
*/
public void close(TaskAttemptContext context) throws IOException {
if (currentRequest.numberOfActions() > 0) {
try {
BulkResponse response = currentRequest.execute().actionGet();
} catch (Exception e) {
LOG.warn("Bulk request failed: " + e.getMessage());
throw new RuntimeException(e);
}
}
LOG.info("Closing record writer");
client.close();
LOG.info("Client is closed");
if (node != null) {
node.close();
}
LOG.info("Record writer closed.");
}
/**
* Writes a single MapWritable record to the bulkRequest object. Once <b>elasticsearch.bulk.size</b> are accumulated the
* records are written to elasticsearch.
*/
public void write(Text key, Text text) throws IOException {
currentRequest.add(Requests.indexRequest(indexName).type(objType)
.id(key.toString()).source(text.toString()));
processBulkIfNeeded();
}
/**
Indexes content to elasticsearch when <b>elasticsearch.bulk.size</b> records have been accumulated.
*/
private void processBulkIfNeeded() {
totalBulkItems.incrementAndGet();
if (currentRequest.numberOfActions() >= bulkSize) {
boolean loggable = (System.currentTimeMillis() - lastLogTime >= 10000);
try {
long startTime = System.currentTimeMillis();
if (loggable){ LOG.info("Sending [" + (currentRequest.numberOfActions()) + "]items"); }
BulkResponse response = currentRequest.execute().actionGet();
totalBulkTime.addAndGet(System.currentTimeMillis() - startTime);
if (loggable) {
LOG.info("Indexed [" + (currentRequest.numberOfActions()) + "]items " +
"in [" + ((System.currentTimeMillis() - startTime)/1000) + "]s; " +
"avg [" + (float)(1000.0*totalBulkItems.get())/(System.currentTimeMillis() - runStartTime) + "]rec/s" +
"(total [" + totalBulkItems.get() + "]items " +
"indexed in [" + (totalBulkTime.get()/1000) + "]s, " +
"wall clock [" + ((System.currentTimeMillis() - runStartTime)/1000) + "]s)");
lastLogTime = System.currentTimeMillis();
}
} catch (Exception e) {
LOG.warn("Bulk request failed: " + e.getMessage());
throw new RuntimeException(e);
}
currentRequest = client.prepareBulk();
}
}
private void initialize_index(String indexName) {
try {
client.admin().indices().prepareCreate(indexName).execute().actionGet();
} catch (Exception e) {
if (ExceptionsHelper.unwrapCause(e) instanceof IndexAlreadyExistsException) {
LOG.warn("Index [" + indexName + "] already exists");
}
}
}
//
// Starts an embedded elasticsearch client (ie. data = false)
//
private void start_embedded_client() {
Settings settings = ImmutableSettings.settingsBuilder()
.put("cluster.name", "piq")
.put("client", true)
.put("data", false).build();
TransportClient client = new TransportClient(settings);
for(Conf.ElasticSearchAddress address : addresses) {
client.addTransportAddress(new InetSocketTransportAddress(address.getHost(), address.getPort()));
}
this.client = client;
}
}
public RecordWriter<Text, Text> getRecordWriter(final TaskAttemptContext context) throws IOException, InterruptedException {
return new ElasticSearchBulkRecordWriter(context);
}
@Override
public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException {
// TODO Check if the object exists?
}
@Override
public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {
return new ElasticSearchOutputCommitter();
}
private class ElasticSearchOutputCommitter extends OutputCommitter {
@Override
public void setupJob(JobContext jobContext) throws IOException {}
@Override
public void setupTask(TaskAttemptContext taskAttemptContext) throws IOException {}
@Override
public boolean needsTaskCommit(TaskAttemptContext taskAttemptContext) throws IOException {
return false;
}
@Override
public void commitTask(TaskAttemptContext taskAttemptContext) throws IOException {}
@Override
public void abortTask(TaskAttemptContext taskAttemptContext) throws IOException {}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.