-
-
Save anonymous/5abc61c70114a127cd13 to your computer and use it in GitHub Desktop.
LoggerTest.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.edmunds.training.real_time_logger; | |
import java.io.IOException; | |
import java.util.LinkedList; | |
import org.elasticsearch.ElasticSearchException; | |
import org.elasticsearch.client.Client; | |
import org.elasticsearch.client.transport.TransportClient; | |
import org.elasticsearch.common.transport.InetSocketTransportAddress; | |
import org.elasticsearch.common.xcontent.XContentFactory; | |
import org.json.JSONObject; | |
public class ESMessageHandler { | |
private ElasticSearchServer server; | |
private Client client; | |
private LinkedList<JSONObject> queue; | |
public ESMessageHandler(ElasticSearchServer server) { | |
queue = new LinkedList<JSONObject>(); | |
this.server = server; | |
this.client = new TransportClient().addTransportAddress(new InetSocketTransportAddress(server.getHost(), 9300)); | |
initialize(); | |
} | |
private void initialize() { | |
try { | |
String mappings = XContentFactory.safeJsonBuilder().startObject().startObject(server.getType()).startObject("properties") | |
.startObject("sourceIP").field("type", "string").field("store", "yes").endObject() | |
.startObject("url").field("type", "string").field("index", "not_analyzed").endObject() | |
.startObject("cookieID").field("type", "string").field("store", "yes").endObject() | |
.startObject("timestamp").field("type", "string").field("store", "yes").endObject() | |
.startObject("make").field("type", "string").field("store", "yes").endObject() | |
.startObject("model").field("type", "string").field("store", "yes").endObject() | |
.startObject("year").field("type", "string").field("store", "yes").endObject() | |
.startObject("category").field("type", "string").field("index", "not_analyzed").endObject() | |
.endObject().endObject().string(); | |
client.admin().indices().preparePutMapping(server.getIndex()).setType(server.getType()) | |
.setSource(mappings).execute().actionGet(); | |
} catch (ElasticSearchException e) { | |
e.printStackTrace(); | |
} catch (IOException e) { | |
e.printStackTrace(); | |
} | |
} | |
public synchronized void increaseBulk(JSONObject object) { | |
queue.add(object); | |
} | |
public String getIndex() { | |
return server.getIndex(); | |
} | |
public String getType() { | |
return server.getType(); | |
} | |
public JSONObject getObject() { | |
return queue.remove(); | |
} | |
public int getBulkSize() { | |
return server.getBulkSize(); | |
} | |
public Client getClient() { | |
return this.client; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.edmunds.training.real_time_logger; | |
import org.apache.commons.io.input.Tailer; | |
import java.io.File; | |
import java.io.FileInputStream; | |
import java.io.IOException; | |
import java.util.Properties; | |
public class LoggerTest { | |
/** | |
* @param args | |
*/ | |
public static void main(String[] args) { | |
String pfile = "files.properties"; | |
if (args.length > 0) { | |
pfile = "C:\\files.properties"; | |
} | |
Properties properties = new Properties(); | |
try { | |
properties.load(new FileInputStream(pfile)); | |
} catch(IOException e) { | |
System.out.println("Failed to read from files.properties: " + e.getMessage()); | |
} | |
String filename = properties.getProperty("tailer.filename"); | |
long delay = Long.parseLong(properties.getProperty("tailer.delay")); | |
String host = properties.getProperty("elasticsearch.host"); | |
String index = properties.getProperty("elasticsearch.index"); | |
String type = properties.getProperty("elasticsearch.type"); | |
int bulkSize = Integer.parseInt(properties.getProperty("elasticsearch.bulkSize")); | |
ElasticSearchServer server = new ElasticSearchServer(host, index, type, bulkSize); | |
ESMessageHandler handler = new ESMessageHandler(server); | |
MyTailerListener listener = new MyTailerListener(handler); | |
File logfile = new File(filename); | |
Tailer tailer = new Tailer(logfile, listener, delay); | |
Thread processor = new Thread(new LogProcessor(handler)); | |
Thread thread = new Thread(tailer); | |
thread.start(); | |
processor.start(); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.edmunds.training.real_time_logger; | |
import java.io.IOException; | |
import org.elasticsearch.action.ActionListener; | |
import org.elasticsearch.action.bulk.BulkResponse; | |
import org.elasticsearch.client.Requests; | |
import org.elasticsearch.client.action.bulk.BulkRequestBuilder; | |
import org.elasticsearch.common.xcontent.XContentBuilder; | |
import org.elasticsearch.common.xcontent.XContentFactory; | |
import org.json.JSONException; | |
import org.json.JSONObject; | |
public class LogProcessor implements Runnable { | |
ESMessageHandler handler; | |
BulkRequestBuilder request; | |
public LogProcessor(ESMessageHandler handler) { | |
this.handler = handler; | |
this.request = handler.getClient().prepareBulk(); | |
} | |
private void reinitRequest() { | |
request = handler.getClient().prepareBulk(); | |
} | |
public void run() { | |
while(true) { | |
try { | |
JSONObject object = handler.getObject(); | |
XContentBuilder builder = XContentFactory.jsonBuilder().startObject(); | |
builder.field("sourceIP", object.getString("sourceIP")); | |
builder.field("url", object.getString("url")); | |
builder.field("cookieID", object.getString("cookieID")); | |
builder.field("timestamp", object.getString("timestamp")); | |
builder.field("make", object.getString("make")); | |
builder.field("model", object.getString("model")); | |
builder.field("year", object.getString("year")); | |
builder.field("category", object.getString("category")); | |
builder.endObject(); | |
request.add(Requests.indexRequest(handler.getIndex()).type(handler.getType()) | |
.source(builder)); | |
if (request.numberOfActions() == 0) { | |
reinitRequest(); | |
} | |
} catch(IOException e) { | |
e.printStackTrace(); | |
} catch(JSONException e) { | |
e.printStackTrace(); | |
} catch(Exception e) { | |
System.out.println("Failed to process bulk - " + e.getMessage()); | |
} | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.edmunds.training.real_time_logger; | |
import java.text.ParseException; | |
import java.text.SimpleDateFormat; | |
import java.util.Calendar; | |
import java.util.Date; | |
import org.apache.commons.io.input.TailerListenerAdapter; | |
import org.apache.commons.lang.StringUtils; | |
import org.elasticsearch.ElasticSearchException; | |
import org.json.JSONException; | |
import org.json.JSONObject; | |
public class MyTailerListener extends TailerListenerAdapter { | |
private ESMessageHandler handler; | |
public MyTailerListener(ESMessageHandler handler) { | |
this.handler = handler; | |
} | |
public void handle(String line) { | |
try { | |
String sourceIP = StringUtils.substringBefore(line, "- -").trim(); | |
String timestamp = StringUtils.substringBetween(line, "[", "-").trim(); | |
String cookieID = StringUtils.substringBetween(line, "&edwck=", "&"); | |
cookieID = (cookieID == null) ? "" : cookieID.trim(); | |
String url = StringUtils.substringBetween(line, "&edwurl=", "&"); | |
url = (url == null)? "" : url.trim(); | |
String category = StringUtils.substringBetween(line, "&edwcat=", "&"); | |
category = (category == null)? "" : category.trim(); | |
String make = StringUtils.substringBetween(line, "&edwmk=", "&"); | |
make = (make == null)? "" : make.trim(); | |
String model = StringUtils.substringBetween(line, "&edwmdl=", "&"); | |
model = (model == null)? "" : model.trim(); | |
String year = StringUtils.substringBetween(line, "&edwyr=", "&"); | |
year = (year == null)? "" : year.trim(); | |
SimpleDateFormat sdf = new SimpleDateFormat("dd/MMM/yyyy:hh:mm:ss"); | |
SimpleDateFormat sdf2 = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss"); | |
Date date = sdf.parse(timestamp); | |
Calendar cal = Calendar.getInstance(); | |
cal.setTime(date); | |
JSONObject object = new JSONObject(); | |
object.put("sourceIP", sourceIP); | |
object.put("url", url); | |
object.put("cookieID", cookieID); | |
object.put("timestamp", sdf2.format(cal.getTime())); | |
object.put("make", make); | |
object.put("model", model); | |
object.put("year", year); | |
object.put("category", category); | |
handler.increaseBulk(object); | |
} catch(NullPointerException e) { | |
e.printStackTrace(); | |
}catch (ParseException e1) { | |
e1.printStackTrace(); | |
} catch (ElasticSearchException e) { | |
e.printStackTrace(); | |
} catch (JSONException e) { | |
e.printStackTrace(); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment