Skip to content

Instantly share code, notes, and snippets.

Created April 13, 2011 22:03
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save anonymous/5abc61c70114a127cd13 to your computer and use it in GitHub Desktop.
Save anonymous/5abc61c70114a127cd13 to your computer and use it in GitHub Desktop.
LoggerTest.java
package com.edmunds.training.real_time_logger;
import java.io.IOException;
import java.util.LinkedList;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.json.JSONObject;
public class ESMessageHandler {
private ElasticSearchServer server;
private Client client;
private LinkedList<JSONObject> queue;
public ESMessageHandler(ElasticSearchServer server) {
queue = new LinkedList<JSONObject>();
this.server = server;
this.client = new TransportClient().addTransportAddress(new InetSocketTransportAddress(server.getHost(), 9300));
initialize();
}
private void initialize() {
try {
String mappings = XContentFactory.safeJsonBuilder().startObject().startObject(server.getType()).startObject("properties")
.startObject("sourceIP").field("type", "string").field("store", "yes").endObject()
.startObject("url").field("type", "string").field("index", "not_analyzed").endObject()
.startObject("cookieID").field("type", "string").field("store", "yes").endObject()
.startObject("timestamp").field("type", "string").field("store", "yes").endObject()
.startObject("make").field("type", "string").field("store", "yes").endObject()
.startObject("model").field("type", "string").field("store", "yes").endObject()
.startObject("year").field("type", "string").field("store", "yes").endObject()
.startObject("category").field("type", "string").field("index", "not_analyzed").endObject()
.endObject().endObject().string();
client.admin().indices().preparePutMapping(server.getIndex()).setType(server.getType())
.setSource(mappings).execute().actionGet();
} catch (ElasticSearchException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
public synchronized void increaseBulk(JSONObject object) {
queue.add(object);
}
public String getIndex() {
return server.getIndex();
}
public String getType() {
return server.getType();
}
public JSONObject getObject() {
return queue.remove();
}
public int getBulkSize() {
return server.getBulkSize();
}
public Client getClient() {
return this.client;
}
}
package com.edmunds.training.real_time_logger;
import org.apache.commons.io.input.Tailer;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.Properties;
public class LoggerTest {
/**
* @param args
*/
public static void main(String[] args) {
String pfile = "files.properties";
if (args.length > 0) {
pfile = "C:\\files.properties";
}
Properties properties = new Properties();
try {
properties.load(new FileInputStream(pfile));
} catch(IOException e) {
System.out.println("Failed to read from files.properties: " + e.getMessage());
}
String filename = properties.getProperty("tailer.filename");
long delay = Long.parseLong(properties.getProperty("tailer.delay"));
String host = properties.getProperty("elasticsearch.host");
String index = properties.getProperty("elasticsearch.index");
String type = properties.getProperty("elasticsearch.type");
int bulkSize = Integer.parseInt(properties.getProperty("elasticsearch.bulkSize"));
ElasticSearchServer server = new ElasticSearchServer(host, index, type, bulkSize);
ESMessageHandler handler = new ESMessageHandler(server);
MyTailerListener listener = new MyTailerListener(handler);
File logfile = new File(filename);
Tailer tailer = new Tailer(logfile, listener, delay);
Thread processor = new Thread(new LogProcessor(handler));
Thread thread = new Thread(tailer);
thread.start();
processor.start();
}
}
package com.edmunds.training.real_time_logger;
import java.io.IOException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.client.Requests;
import org.elasticsearch.client.action.bulk.BulkRequestBuilder;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.json.JSONException;
import org.json.JSONObject;
public class LogProcessor implements Runnable {
ESMessageHandler handler;
BulkRequestBuilder request;
public LogProcessor(ESMessageHandler handler) {
this.handler = handler;
this.request = handler.getClient().prepareBulk();
}
private void reinitRequest() {
request = handler.getClient().prepareBulk();
}
public void run() {
while(true) {
try {
JSONObject object = handler.getObject();
XContentBuilder builder = XContentFactory.jsonBuilder().startObject();
builder.field("sourceIP", object.getString("sourceIP"));
builder.field("url", object.getString("url"));
builder.field("cookieID", object.getString("cookieID"));
builder.field("timestamp", object.getString("timestamp"));
builder.field("make", object.getString("make"));
builder.field("model", object.getString("model"));
builder.field("year", object.getString("year"));
builder.field("category", object.getString("category"));
builder.endObject();
request.add(Requests.indexRequest(handler.getIndex()).type(handler.getType())
.source(builder));
if (request.numberOfActions() == 0) {
reinitRequest();
}
} catch(IOException e) {
e.printStackTrace();
} catch(JSONException e) {
e.printStackTrace();
} catch(Exception e) {
System.out.println("Failed to process bulk - " + e.getMessage());
}
}
}
}
package com.edmunds.training.real_time_logger;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
import org.apache.commons.io.input.TailerListenerAdapter;
import org.apache.commons.lang.StringUtils;
import org.elasticsearch.ElasticSearchException;
import org.json.JSONException;
import org.json.JSONObject;
public class MyTailerListener extends TailerListenerAdapter {
private ESMessageHandler handler;
public MyTailerListener(ESMessageHandler handler) {
this.handler = handler;
}
public void handle(String line) {
try {
String sourceIP = StringUtils.substringBefore(line, "- -").trim();
String timestamp = StringUtils.substringBetween(line, "[", "-").trim();
String cookieID = StringUtils.substringBetween(line, "&edwck=", "&");
cookieID = (cookieID == null) ? "" : cookieID.trim();
String url = StringUtils.substringBetween(line, "&edwurl=", "&");
url = (url == null)? "" : url.trim();
String category = StringUtils.substringBetween(line, "&edwcat=", "&");
category = (category == null)? "" : category.trim();
String make = StringUtils.substringBetween(line, "&edwmk=", "&");
make = (make == null)? "" : make.trim();
String model = StringUtils.substringBetween(line, "&edwmdl=", "&");
model = (model == null)? "" : model.trim();
String year = StringUtils.substringBetween(line, "&edwyr=", "&");
year = (year == null)? "" : year.trim();
SimpleDateFormat sdf = new SimpleDateFormat("dd/MMM/yyyy:hh:mm:ss");
SimpleDateFormat sdf2 = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
Date date = sdf.parse(timestamp);
Calendar cal = Calendar.getInstance();
cal.setTime(date);
JSONObject object = new JSONObject();
object.put("sourceIP", sourceIP);
object.put("url", url);
object.put("cookieID", cookieID);
object.put("timestamp", sdf2.format(cal.getTime()));
object.put("make", make);
object.put("model", model);
object.put("year", year);
object.put("category", category);
handler.increaseBulk(object);
} catch(NullPointerException e) {
e.printStackTrace();
}catch (ParseException e1) {
e1.printStackTrace();
} catch (ElasticSearchException e) {
e.printStackTrace();
} catch (JSONException e) {
e.printStackTrace();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment