Skip to content

Instantly share code, notes, and snippets.

@yoshi0309
Created October 22, 2014 04:55
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save yoshi0309/e43f0b35a4bd7141b818 to your computer and use it in GitHub Desktop.
Save yoshi0309/e43f0b35a4bd7141b818 to your computer and use it in GitHub Desktop.
ElasticsearchRecordWriter.java (using JSON over HTTP not API)
package com.yoslab.process.writer;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import com.yoslab.exception.CommonException;
import com.yoslab.model.ItemRecord;
import com.yoslab.model.MapItemRecord;
import com.yoslab.process.ItemWriter;
import org.apache.commons.io.IOUtils;
import org.apache.http.Header;
import org.apache.http.HttpHost;
import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.message.BasicHeader;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
public class ElasticsearchRecordWriter implements ItemWriter<ItemRecord> {
private String elasticsearchHost;
private String elasticsearchPort;
private String index;
private String type;
public String getElasticsearchHost() {
return elasticsearchHost;
}
public void setElasticsearchHost(String elasticsearchHost) {
this.elasticsearchHost = elasticsearchHost;
}
public String getElasticsearchPort() {
return elasticsearchPort;
}
public void setElasticsearchPort(String elasticsearchPort) {
this.elasticsearchPort = elasticsearchPort;
}
public String getIndex() {
return index;
}
public void setIndex(String index) {
this.index = index;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
@Override
public void write(List<ItemRecord> items) throws CommonException {
if(items.size() == 0) return;
StringBuffer idxData = new StringBuffer();
for(ItemRecord item : items){
MapItemRecord m = (MapItemRecord)item;
ObjectMapper mapper = new ObjectMapper();
String json = null;
try {
json = mapper.writeValueAsString(m.getMap());
json = json.replaceAll("\r", " ");
json = json.replaceAll("\n", " ");
json = json.replaceAll("\r\n", " ");
} catch (JsonProcessingException e) {
throw new CommonException(null,"json変換時にエラーが発生しました。",e);
}
BulkActionBean bean = new BulkActionBean();
String id = (String)m.getValue("id");
bean.set(this.index, this.type, id);
String action = null;
try {
action = mapper.writeValueAsString(bean);
} catch (JsonProcessingException e) {
throw new CommonException(null,"json変換時にエラーが発生しました。",e);
}
idxData.append(action);
idxData.append("\r\n");
idxData.append(json);
idxData.append("\r\n");
}
Map b = ((MapItemRecord)items.get(items.size() - 1)).getMap();
String data = idxData.toString();
postData(data);
// dump(data);
}
private void dump(String string) {
File file = new File("c:/temp/out.json");
PrintWriter writer = null;
try {
writer = new PrintWriter(new BufferedWriter(new OutputStreamWriter(new FileOutputStream(file),"UTF-8")));
writer.write(string);
} catch (IOException e) {
e.printStackTrace();
} finally {
writer.close();
}
}
private void postData(String data) throws CommonException {
try {
String url = "http://" + elasticsearchHost + ":" + elasticsearchPort + "/_bulk";
URIBuilder builder = new URIBuilder(url);
HttpHost proxy = null;
String proxyHost = System.getProperty("http.proxyHost");
String proxyPortStr = System.getProperty("http.proxyPort");
if(proxyHost != null || proxyHost.equalsIgnoreCase("")){
if(proxyPortStr != null || proxyHost.equalsIgnoreCase("")){
int proxyPort = Integer.parseInt(proxyPortStr);
proxy = new HttpHost(proxyHost, proxyPort);
} else {
proxy = new HttpHost(proxyHost);
}
}
RequestConfig requestConfig = null;
if(proxy != null) {
requestConfig = RequestConfig.custom()
// .setConnectTimeout(connectionTimeout)
// .setSocketTimeout(socketTimeout)
.setProxy(proxy)
.build();
} else {
requestConfig = RequestConfig.custom().build();
}
List<Header> headers = new ArrayList<Header>();
headers.add(new BasicHeader("Accept-Charset","utf-8"));
headers.add(new BasicHeader("Accept-Language","ja, en;q=0.8"));
headers.add(new BasicHeader("User-Agent","My Http Client 0.1"));
HttpClient httpClient = HttpClientBuilder.create()
.setDefaultRequestConfig(requestConfig)
.setDefaultHeaders(headers)
.build();
HttpPost post = new HttpPost(builder.build());
ByteArrayEntity entity = new ByteArrayEntity(data.getBytes());
post.setEntity(entity);
HttpResponse response = httpClient.execute(post);
String responseBody = IOUtils.toString(response.getEntity().getContent());
} catch (IOException e) {
e.printStackTrace();
throw new CommonException(null,"HTTPによるデータの送信時ににエラーが発生しました。",e);
} catch (URISyntaxException e) {
e.printStackTrace();
throw new CommonException(null,"HTTPによるデータの送信時ににエラーが発生しました。",e);
}
}
@Override
public void postProcess() throws CommonException {
//NOOP
}
@Override
public void commit() throws CommonException {
//NOOP
}
@Override
public void close() {
//NOOP
}
@Override
public void init() throws CommonException {
//NOOP
}
private class BulkActionBean {
private Map create = new HashMap();
public void set(String indexName, String typeName, String id) {
create.put("_index", indexName);
create.put("_type", typeName);
create.put("_id", id);
}
public Map getCreate() {
return create;
}
public void setCreate(Map create) {
this.create = create;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment