Skip to content

Instantly share code, notes, and snippets.

@bitwjg
Last active December 14, 2015 11:49
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bitwjg/5081967 to your computer and use it in GitHub Desktop.
Save bitwjg/5081967 to your computer and use it in GitHub Desktop.
public static XContentBuilder makeJson(StreamItem si) throws IOException
{
String stream_id = si.stream_id;
String source = si.source;
String doc_id = si.doc_id;
//System.out.println(doc_id);
ContentItem body = si.body;
ByteBuffer body_cleansed = body.cleansed;
String body_clean_text = "";
String body_encode = body.encoding;
ByteBuffer body_ner = body.ner;
String body_ner_text = "";
StreamTime stream_time = si.stream_time;
Double epoch_ticks = stream_time.epoch_ticks;
String zulu_timestamp = stream_time.zulu_timestamp;
if(body_encode == null)
body_encode = "";
if(!body_encode.isEmpty())
{
boolean charset_support = true;
try{
charset_support = Charset.isSupported(body_encode);
}
catch(Exception e)
{
System.out.println("There exists an exception: " + e);
charset_support = false;
}
if(charset_support)
{
Charset charset = Charset.forName(body_encode);
CharsetDecoder decoder = charset.newDecoder();
decoder.onMalformedInput(CodingErrorAction.IGNORE);
decoder.onUnmappableCharacter(CodingErrorAction.IGNORE);
body_clean_text = decoder.decode(body_cleansed).toString();
body_ner_text = decoder.decode(body_ner).toString();
}
else
{
Charset charset = Charset.forName("UTF-8");
CharsetDecoder decoder = charset.newDecoder();
decoder.onMalformedInput(CodingErrorAction.IGNORE);
decoder.onUnmappableCharacter(CodingErrorAction.IGNORE);
body_clean_text = decoder.decode(body_cleansed).toString();
body_ner_text = decoder.decode(body_ner).toString();
System.out.println(body_clean_text);
}
}
XContentBuilder xb = jsonBuilder()
.startObject()
.field("stream_id", stream_id).field("type", "string").field("store", "yes").field("index","no")
.field("source", source).field("type", "string").field("store","yes").field("index", "no")
.field("doc_id", doc_id).field("type", "string").field("store","yes").field("index","no")
.startObject("stream_time")
.field("epoch_ticks", epoch_ticks).field("type", "double").field("index", "no").field("store","no")
.field("zulu_timestamp", zulu_timestamp).field("type", "string").field("index","no").field("store","yes")
.endObject()
.startObject("body")
.field("cleansed", body_clean_text).field("type","string").field("index", "yes").field("store","yes")
.field("ner", body_ner_text).field("type","string").field("index","no").field("store", "yes")
.endObject()
.endObject();
return xb;
}
Node node = nodeBuilder().clusterName("MSRA-KM").client(true).node();
Client client = node.client();
for(int i=0; i<child_folder.length; i++)
{
//new a bulk request, for every folder, create an index once
BulkRequestBuilder bulkRequest = client.prepareBulk();
if(child_folder[i].isDirectory())
{
//get the files and create the index
System.out.println(child_folder[i].getName());
File[] files = child_folder[i].listFiles();
for(int j=0; j<files.length; j++)
{
if(files[j].isFile())
{
//process and create index
String filePath = child_folder[i].getPath() + "\\" + files[j].getName();
System.out.println(files[j].getName());
StreamItem stream_item = XZCompressorInputStream.streamItem(filePath);
if(stream_item.isSetStream_id())
{
XContentBuilder xb = BulkCreateIndex.makeJson(stream_item);
bulkRequest.add(client.prepareIndex("test","kba").setSource(xb));
}
}
}
}
BulkResponse response = bulkRequest.execute().actionGet();
if(response.hasFailures())
{
System.out.println("Creating index exist error!");
System.out.println(response.toString());
}
}
node.close();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment