Skip to content

Instantly share code, notes, and snippets.

@muhammadyaseen
Created December 16, 2017 10:12
Show Gist options
  • Save muhammadyaseen/c097552fe196e07ec70b4c2bc90eca27 to your computer and use it in GitHub Desktop.
Save muhammadyaseen/c097552fe196e07ec70b4c2bc90eca27 to your computer and use it in GitHub Desktop.
Code for MyTestEvent and MyTestEventHandler files
package io.myaseen.myflumeapp;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.flume.Event;
public class MyTestEvent implements Event, Serializable {
private static final long serialVersionUID = 1L;
// custom keys of your json schema
private String my_event_property1;
private String my_event_property2;
private transient String charset = "UTF-8";
private Map<String, String> headers = new HashMap<String, String>();
@Override
public Map<String, String> getHeaders() {
return this.headers;
}
@Override
public void setHeaders(Map<String, String> headers) {
this.headers = headers;
}
@Override
public byte[] getBody() {
try {
return getByteRepresentation();
} catch (Exception e) {
return new byte[0];
}
}
@Override
public void setBody(byte[] body) {
}
public void setCharset(String charset) {
this.charset = charset;
}
private byte[] getByteRepresentation() {
return SerializationUtils.serialize(this);
}
}
package io.myaseen.myflumeapp;
import java.io.BufferedReader;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.List;
import javax.servlet.http.HttpServletRequest;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.source.http.HTTPBadRequestException;
import org.apache.flume.source.http.HTTPSourceHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonSyntaxException;
import com.google.gson.reflect.TypeToken;
public class MyTestEventHandler implements HTTPSourceHandler {
private static final Logger LOG = LoggerFactory.getLogger(MyTestEventHandler.class);
private final Type listType = new TypeToken<List<MyTestEvent>>(){}.getType();
private final Gson gson;
public TrackingEventHandler() {
gson = new GsonBuilder().disableHtmlEscaping().create();
}
@Override
public void configure(Context context) {
}
@Override
public List<Event> getEvents(HttpServletRequest request) throws Exception {
BufferedReader reader = request.getReader();
String charset = request.getCharacterEncoding();
List<Event> eventList = new ArrayList<Event>(0);
try {
eventList = gson.fromJson(reader, listType);
} catch (JsonSyntaxException ex) {
throw new HTTPBadRequestException("Request has invalid JSON Syntax.", ex);
}
for (Event e : eventList) {
((MyTestEvent) e).setCharset(charset);
}
return eventList;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment