Last active
December 14, 2015 11:49
-
-
Save bitwjg/5081967 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public static XContentBuilder makeJson(StreamItem si) throws IOException | |
{ | |
String stream_id = si.stream_id; | |
String source = si.source; | |
String doc_id = si.doc_id; | |
//System.out.println(doc_id); | |
ContentItem body = si.body; | |
ByteBuffer body_cleansed = body.cleansed; | |
String body_clean_text = ""; | |
String body_encode = body.encoding; | |
ByteBuffer body_ner = body.ner; | |
String body_ner_text = ""; | |
StreamTime stream_time = si.stream_time; | |
Double epoch_ticks = stream_time.epoch_ticks; | |
String zulu_timestamp = stream_time.zulu_timestamp; | |
if(body_encode == null) | |
body_encode = ""; | |
if(!body_encode.isEmpty()) | |
{ | |
boolean charset_support = true; | |
try{ | |
charset_support = Charset.isSupported(body_encode); | |
} | |
catch(Exception e) | |
{ | |
System.out.println("There exists an exception: " + e); | |
charset_support = false; | |
} | |
if(charset_support) | |
{ | |
Charset charset = Charset.forName(body_encode); | |
CharsetDecoder decoder = charset.newDecoder(); | |
decoder.onMalformedInput(CodingErrorAction.IGNORE); | |
decoder.onUnmappableCharacter(CodingErrorAction.IGNORE); | |
body_clean_text = decoder.decode(body_cleansed).toString(); | |
body_ner_text = decoder.decode(body_ner).toString(); | |
} | |
else | |
{ | |
Charset charset = Charset.forName("UTF-8"); | |
CharsetDecoder decoder = charset.newDecoder(); | |
decoder.onMalformedInput(CodingErrorAction.IGNORE); | |
decoder.onUnmappableCharacter(CodingErrorAction.IGNORE); | |
body_clean_text = decoder.decode(body_cleansed).toString(); | |
body_ner_text = decoder.decode(body_ner).toString(); | |
System.out.println(body_clean_text); | |
} | |
} | |
XContentBuilder xb = jsonBuilder() | |
.startObject() | |
.field("stream_id", stream_id).field("type", "string").field("store", "yes").field("index","no") | |
.field("source", source).field("type", "string").field("store","yes").field("index", "no") | |
.field("doc_id", doc_id).field("type", "string").field("store","yes").field("index","no") | |
.startObject("stream_time") | |
.field("epoch_ticks", epoch_ticks).field("type", "double").field("index", "no").field("store","no") | |
.field("zulu_timestamp", zulu_timestamp).field("type", "string").field("index","no").field("store","yes") | |
.endObject() | |
.startObject("body") | |
.field("cleansed", body_clean_text).field("type","string").field("index", "yes").field("store","yes") | |
.field("ner", body_ner_text).field("type","string").field("index","no").field("store", "yes") | |
.endObject() | |
.endObject(); | |
return xb; | |
} | |
Node node = nodeBuilder().clusterName("MSRA-KM").client(true).node(); | |
Client client = node.client(); | |
for(int i=0; i<child_folder.length; i++) | |
{ | |
//new a bulk request, for every folder, create an index once | |
BulkRequestBuilder bulkRequest = client.prepareBulk(); | |
if(child_folder[i].isDirectory()) | |
{ | |
//get the files and create the index | |
System.out.println(child_folder[i].getName()); | |
File[] files = child_folder[i].listFiles(); | |
for(int j=0; j<files.length; j++) | |
{ | |
if(files[j].isFile()) | |
{ | |
//process and create index | |
String filePath = child_folder[i].getPath() + "\\" + files[j].getName(); | |
System.out.println(files[j].getName()); | |
StreamItem stream_item = XZCompressorInputStream.streamItem(filePath); | |
if(stream_item.isSetStream_id()) | |
{ | |
XContentBuilder xb = BulkCreateIndex.makeJson(stream_item); | |
bulkRequest.add(client.prepareIndex("test","kba").setSource(xb)); | |
} | |
} | |
} | |
} | |
BulkResponse response = bulkRequest.execute().actionGet(); | |
if(response.hasFailures()) | |
{ | |
System.out.println("Creating index exist error!"); | |
System.out.println(response.toString()); | |
} | |
} | |
node.close(); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment