Created
October 22, 2014 04:55
-
-
Save yoshi0309/e43f0b35a4bd7141b818 to your computer and use it in GitHub Desktop.
ElasticsearchRecordWriter.java (using JSON over HTTP not API)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.yoslab.process.writer; | |
import java.io.BufferedWriter; | |
import java.io.File; | |
import java.io.FileOutputStream; | |
import java.io.IOException; | |
import java.io.OutputStreamWriter; | |
import java.io.PrintWriter; | |
import java.net.URISyntaxException; | |
import java.util.ArrayList; | |
import java.util.HashMap; | |
import java.util.List; | |
import java.util.Map; | |
import com.yoslab.exception.CommonException; | |
import com.yoslab.model.ItemRecord; | |
import com.yoslab.model.MapItemRecord; | |
import com.yoslab.process.ItemWriter; | |
import org.apache.commons.io.IOUtils; | |
import org.apache.http.Header; | |
import org.apache.http.HttpHost; | |
import org.apache.http.HttpResponse; | |
import org.apache.http.client.HttpClient; | |
import org.apache.http.client.config.RequestConfig; | |
import org.apache.http.client.methods.HttpPost; | |
import org.apache.http.client.utils.URIBuilder; | |
import org.apache.http.entity.ByteArrayEntity; | |
import org.apache.http.impl.client.HttpClientBuilder; | |
import org.apache.http.message.BasicHeader; | |
import com.fasterxml.jackson.core.JsonProcessingException; | |
import com.fasterxml.jackson.databind.ObjectMapper; | |
public class ElasticsearchRecordWriter implements ItemWriter<ItemRecord> { | |
private String elasticsearchHost; | |
private String elasticsearchPort; | |
private String index; | |
private String type; | |
public String getElasticsearchHost() { | |
return elasticsearchHost; | |
} | |
public void setElasticsearchHost(String elasticsearchHost) { | |
this.elasticsearchHost = elasticsearchHost; | |
} | |
public String getElasticsearchPort() { | |
return elasticsearchPort; | |
} | |
public void setElasticsearchPort(String elasticsearchPort) { | |
this.elasticsearchPort = elasticsearchPort; | |
} | |
public String getIndex() { | |
return index; | |
} | |
public void setIndex(String index) { | |
this.index = index; | |
} | |
public String getType() { | |
return type; | |
} | |
public void setType(String type) { | |
this.type = type; | |
} | |
@Override | |
public void write(List<ItemRecord> items) throws CommonException { | |
if(items.size() == 0) return; | |
StringBuffer idxData = new StringBuffer(); | |
for(ItemRecord item : items){ | |
MapItemRecord m = (MapItemRecord)item; | |
ObjectMapper mapper = new ObjectMapper(); | |
String json = null; | |
try { | |
json = mapper.writeValueAsString(m.getMap()); | |
json = json.replaceAll("\r", " "); | |
json = json.replaceAll("\n", " "); | |
json = json.replaceAll("\r\n", " "); | |
} catch (JsonProcessingException e) { | |
throw new CommonException(null,"json変換時にエラーが発生しました。",e); | |
} | |
BulkActionBean bean = new BulkActionBean(); | |
String id = (String)m.getValue("id"); | |
bean.set(this.index, this.type, id); | |
String action = null; | |
try { | |
action = mapper.writeValueAsString(bean); | |
} catch (JsonProcessingException e) { | |
throw new CommonException(null,"json変換時にエラーが発生しました。",e); | |
} | |
idxData.append(action); | |
idxData.append("\r\n"); | |
idxData.append(json); | |
idxData.append("\r\n"); | |
} | |
Map b = ((MapItemRecord)items.get(items.size() - 1)).getMap(); | |
String data = idxData.toString(); | |
postData(data); | |
// dump(data); | |
} | |
private void dump(String string) { | |
File file = new File("c:/temp/out.json"); | |
PrintWriter writer = null; | |
try { | |
writer = new PrintWriter(new BufferedWriter(new OutputStreamWriter(new FileOutputStream(file),"UTF-8"))); | |
writer.write(string); | |
} catch (IOException e) { | |
e.printStackTrace(); | |
} finally { | |
writer.close(); | |
} | |
} | |
private void postData(String data) throws CommonException { | |
try { | |
String url = "http://" + elasticsearchHost + ":" + elasticsearchPort + "/_bulk"; | |
URIBuilder builder = new URIBuilder(url); | |
HttpHost proxy = null; | |
String proxyHost = System.getProperty("http.proxyHost"); | |
String proxyPortStr = System.getProperty("http.proxyPort"); | |
if(proxyHost != null || proxyHost.equalsIgnoreCase("")){ | |
if(proxyPortStr != null || proxyHost.equalsIgnoreCase("")){ | |
int proxyPort = Integer.parseInt(proxyPortStr); | |
proxy = new HttpHost(proxyHost, proxyPort); | |
} else { | |
proxy = new HttpHost(proxyHost); | |
} | |
} | |
RequestConfig requestConfig = null; | |
if(proxy != null) { | |
requestConfig = RequestConfig.custom() | |
// .setConnectTimeout(connectionTimeout) | |
// .setSocketTimeout(socketTimeout) | |
.setProxy(proxy) | |
.build(); | |
} else { | |
requestConfig = RequestConfig.custom().build(); | |
} | |
List<Header> headers = new ArrayList<Header>(); | |
headers.add(new BasicHeader("Accept-Charset","utf-8")); | |
headers.add(new BasicHeader("Accept-Language","ja, en;q=0.8")); | |
headers.add(new BasicHeader("User-Agent","My Http Client 0.1")); | |
HttpClient httpClient = HttpClientBuilder.create() | |
.setDefaultRequestConfig(requestConfig) | |
.setDefaultHeaders(headers) | |
.build(); | |
HttpPost post = new HttpPost(builder.build()); | |
ByteArrayEntity entity = new ByteArrayEntity(data.getBytes()); | |
post.setEntity(entity); | |
HttpResponse response = httpClient.execute(post); | |
String responseBody = IOUtils.toString(response.getEntity().getContent()); | |
} catch (IOException e) { | |
e.printStackTrace(); | |
throw new CommonException(null,"HTTPによるデータの送信時ににエラーが発生しました。",e); | |
} catch (URISyntaxException e) { | |
e.printStackTrace(); | |
throw new CommonException(null,"HTTPによるデータの送信時ににエラーが発生しました。",e); | |
} | |
} | |
@Override | |
public void postProcess() throws CommonException { | |
//NOOP | |
} | |
@Override | |
public void commit() throws CommonException { | |
//NOOP | |
} | |
@Override | |
public void close() { | |
//NOOP | |
} | |
@Override | |
public void init() throws CommonException { | |
//NOOP | |
} | |
private class BulkActionBean { | |
private Map create = new HashMap(); | |
public void set(String indexName, String typeName, String id) { | |
create.put("_index", indexName); | |
create.put("_type", typeName); | |
create.put("_id", id); | |
} | |
public Map getCreate() { | |
return create; | |
} | |
public void setCreate(Map create) { | |
this.create = create; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment