Skip to content

Instantly share code, notes, and snippets.

@ambud
Created April 7, 2016 07:10
Show Gist options
  • Save ambud/162f8719bf77c9523abaddd3b5d6d1fc to your computer and use it in GitHub Desktop.
Save ambud/162f8719bf77c9523abaddd3b5d6d1fc to your computer and use it in GitHub Desktop.
Apache Flume Syslog Multiline Patch (https://issues.apache.org/jira/browse/FLUME-1938)
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogSourceConfigurationConstants.java b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogSourceConfigurationConstants.java
index 5a73c88..4b870ef 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogSourceConfigurationConstants.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogSourceConfigurationConstants.java
@@ -56,12 +56,14 @@ public final class SyslogSourceConfigurationConstants {
public static final String DEFAULT_CHARSET = "UTF-8";
public static final String CONFIG_PORT_CHARSET_PREFIX = "charset.port.";
+ public static final String CONFIG_IS_SYSLOG_MULTILINE_BODY = "body.multiline";
public static final int DEFAULT_BATCHSIZE = 100;
public static final String CONFIG_PORT_HEADER = "portHeader";
public static final String DEFAULT_PORT_HEADER = "port";
+ public static final boolean DEFAULT_IS_SYSLOG_MULTILINE_BODY = false;
public static final String CONFIG_READBUF_SIZE = "readBufferBytes";
public static final int DEFAULT_READBUF_SIZE = 1024;
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java
index 96a9e85..29a9561 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java
@@ -53,13 +53,14 @@ public class SyslogUDPSource extends AbstractSource
private String host = null;
private Channel nettyChannel;
private Map<String, String> formaterProp;
+ private boolean isSyslogMultiLineBody;
private static final Logger logger = LoggerFactory
.getLogger(SyslogUDPSource.class);
private CounterGroup counterGroup = new CounterGroup();
public class syslogHandler extends SimpleChannelHandler {
- private SyslogUtils syslogUtils = new SyslogUtils(true);
+ private SyslogUtils syslogUtils = new SyslogUtils(true, isSyslogMultiLineBody);
public void setFormater(Map<String, String> prop) {
syslogUtils.addFormats(prop);
@@ -130,6 +131,7 @@ public class SyslogUDPSource extends AbstractSource
context, SyslogSourceConfigurationConstants.CONFIG_PORT);
port = context.getInteger(SyslogSourceConfigurationConstants.CONFIG_PORT);
host = context.getString(SyslogSourceConfigurationConstants.CONFIG_HOST);
+ isSyslogMultiLineBody = context.getBoolean(SyslogSourceConfigurationConstants.CONFIG_IS_SYSLOG_MULTILINE_BODY, SyslogSourceConfigurationConstants.DEFAULT_IS_SYSLOG_MULTILINE_BODY);
formaterProp = context.getSubProperties(
SyslogSourceConfigurationConstants.CONFIG_FORMAT_PREFIX);
}
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUtils.java b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUtils.java
index c2a29a1..8849bd2 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUtils.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUtils.java
@@ -78,6 +78,7 @@ public class SyslogUtils {
private final boolean isUdp;
private boolean isBadEvent;
private boolean isIncompleteEvent;
+ private boolean isSyslogMultiLineBody;
private Integer maxSize;
private class SyslogFormatter {
@@ -98,13 +99,18 @@ public class SyslogUtils {
}
public SyslogUtils(boolean isUdp) {
- this(DEFAULT_SIZE, isUdp);
+ this(DEFAULT_SIZE, isUdp, SyslogSourceConfigurationConstants.DEFAULT_IS_SYSLOG_MULTILINE_BODY);
}
- public SyslogUtils(Integer eventSize, boolean isUdp){
+ public SyslogUtils(boolean isUdp, boolean isMultilineBody) {
+ this(DEFAULT_SIZE, isUdp, isMultilineBody);
+ }
+
+ public SyslogUtils(Integer eventSize, boolean isUdp, boolean isMultilineBody){
this.isUdp = isUdp;
isBadEvent = false;
isIncompleteEvent = false;
+ isSyslogMultiLineBody = isMultilineBody;
maxSize = (eventSize < MIN_SIZE) ? MIN_SIZE : eventSize;
baos = new ByteArrayOutputStream(eventSize);
initHeaderFormats();
@@ -156,7 +162,13 @@ public class SyslogUtils {
// setup RFC3164 formater
SyslogFormatter fmt2 = new SyslogFormatter();
- fmt2.regexPattern = Pattern.compile(SYSLOG_MSG_RFC3164_0);
+ // Multi-line syslog body will cause regular expression extractor below to fail if
+ // Pattern.MULTILINE & Pattern.DOTALL are not activated while parsing the message.
+ // The code below will preserve header extraction logic when multi-line syslog is
+ // enabled.
+ if(!isSyslogMultiLineBody){fmt2.regexPattern = Pattern.compile(SYSLOG_MSG_RFC3164_0);
+ }else{fmt2.regexPattern = Pattern.compile(SYSLOG_MSG_RFC3164_0, Pattern.MULTILINE|Pattern.DOTALL);
+ }
// the single digit date has two spaces, so trim it
fmt2.searchPattern.add(" ");
fmt2.replacePattern.add(" ");
@@ -340,11 +352,17 @@ public class SyslogUtils {
break;
case DATA:
// TCP syslog entries are separated by '\n'
- if (b == '\n') {
- e = buildEvent();
- doneReading = true;
- } else {
- baos.write(b);
+ // UDP syslog entries with multi-line body separated by '\n' or '\r'
+ // multi-line event processing handled by sink
+ if(!isSyslogMultiLineBody){
+ if (b == '\n') {
+ e = buildEvent();
+ doneReading = true;
+ } else {
+ baos.write(b);
+ }
+ }else{
+ baos.write(b);
}
if(baos.size() == this.maxSize && !doneReading){
isIncompleteEvent = true;
@@ -357,7 +375,12 @@ public class SyslogUtils {
}
// UDP doesn't send a newline, so just use what we received
- if (e == null && isUdp) {
+ if(e == null && !isSyslogMultiLineBody){
+ if (isUdp) {
+ doneReading = true;
+ e = buildEvent();
+ }
+ }else{
doneReading = true;
e = buildEvent();
}
diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUtils.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUtils.java
index 7208464..bf9d870 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUtils.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUtils.java
@@ -397,7 +397,7 @@ public class TestSyslogUtils {
@Test
public void testExtractBadEventLarge() {
String badData1 = "<10> bad bad data bad bad\n";
- SyslogUtils util = new SyslogUtils(5, false);
+ SyslogUtils util = new SyslogUtils(5, false, false);
ChannelBuffer buff = ChannelBuffers.buffer(100);
buff.writeBytes(badData1.getBytes());
Event e = util.extractEvent(buff);
@@ -425,4 +425,27 @@ public class TestSyslogUtils {
}
+ @Test
+ public void testMultiLineEvent() {
+ String body = "bad bad data bad bad\n bad bad bad\r bad bad";
+ String badData1 = "<10> "+body;
+ SyslogUtils util = new SyslogUtils(false, true);
+ ChannelBuffer buff = ChannelBuffers.buffer(200);
+ buff.writeBytes(badData1.getBytes());
+ Event e = util.extractEvent(buff);
+ if(e == null){
+ throw new NullPointerException("Event is null");
+ }
+ Map<String, String> headers = e.getHeaders();
+ //System.out.print(new String(e.getBody()).trim());
+ Assert.assertEquals("1", headers.get(SyslogUtils.SYSLOG_FACILITY));
+ Assert.assertEquals("2", headers.get(SyslogUtils.SYSLOG_SEVERITY));
+ //No bad event status is put due to event format variability
+ //Assert.assertEquals(SyslogUtils.SyslogStatus.OTHER.getSyslogStatus(),
+ // headers.get(SyslogUtils.EVENT_STATUS));
+ Assert.assertEquals(body.trim(), new String(e.getBody()).trim());
+
+ //Addition of more unit tests for multi-line configuration is welcome
+ }
+
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.flume.source;
public final class SyslogSourceConfigurationConstants {
public static final String CONFIG_PORT = "port";
/**
* List of ports to listen to.
*/
public static final String CONFIG_PORTS = "ports";
public static final String CONFIG_HOST = "host";
public static final String CONFIG_FORMAT_PREFIX = "format.";
public static final String CONFIG_REGEX = "regex";
public static final String CONFIG_SEARCH = "search";
public static final String CONFIG_REPLACE = "replace";
public static final String CONFIG_DATEFORMAT = "dateFormat";
/**
* Number of processors used to calculate number of threads to spawn.
*/
public static final String CONFIG_NUMPROCESSORS = "numProcessors";
/**
* Maximum allowable size of events.
*/
public static final String CONFIG_EVENTSIZE = "eventSize";
public static final String CONFIG_BATCHSIZE = "batchSize";
public static final String CONFIG_CHARSET = "charset.default";
public static final String DEFAULT_CHARSET = "UTF-8";
public static final String CONFIG_PORT_CHARSET_PREFIX = "charset.port.";
public static final String CONFIG_IS_SYSLOG_MULTILINE_BODY = "body.multiline";
public static final int DEFAULT_BATCHSIZE = 100;
public static final String CONFIG_PORT_HEADER = "portHeader";
public static final String DEFAULT_PORT_HEADER = "port";
public static final boolean DEFAULT_IS_SYSLOG_MULTILINE_BODY = false;
public static final String CONFIG_READBUF_SIZE = "readBufferBytes";
public static final int DEFAULT_READBUF_SIZE = 1024;
private SyslogSourceConfigurationConstants() {
// Disable explicit creation of objects.
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.flume.source;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.flume.ChannelException;
import org.apache.flume.Context;
import org.apache.flume.CounterGroup;
import org.apache.flume.Event;
import org.apache.flume.EventDrivenSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.conf.Configurables;
import org.apache.flume.source.SyslogUtils;
import org.jboss.netty.bootstrap.ConnectionlessBootstrap;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
import org.jboss.netty.channel.socket.oio.OioDatagramChannelFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SyslogUDPSource extends AbstractSource
implements EventDrivenSource, Configurable {
private int port;
private int maxsize = 1 << 16; // 64k is max allowable in RFC 5426
private String host = null;
private Channel nettyChannel;
private Map<String, String> formaterProp;
private boolean isSyslogMultiLineBody;
private static final Logger logger = LoggerFactory
.getLogger(SyslogUDPSource.class);
private CounterGroup counterGroup = new CounterGroup();
public class syslogHandler extends SimpleChannelHandler {
private SyslogUtils syslogUtils = new SyslogUtils(true, isSyslogMultiLineBody);
public void setFormater(Map<String, String> prop) {
syslogUtils.addFormats(prop);
}
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent mEvent) {
try {
syslogUtils.setEventSize(maxsize);
Event e = syslogUtils.extractEvent((ChannelBuffer)mEvent.getMessage());
if (e == null) {
return;
}
getChannelProcessor().processEvent(e);
counterGroup.incrementAndGet("events.success");
} catch (ChannelException ex) {
counterGroup.incrementAndGet("events.dropped");
logger.error("Error writting to channel", ex);
return;
}
}
}
@Override
public void start() {
// setup Netty server
ConnectionlessBootstrap serverBootstrap = new ConnectionlessBootstrap
(new OioDatagramChannelFactory(Executors.newCachedThreadPool()));
final syslogHandler handler = new syslogHandler();
handler.setFormater(formaterProp);
serverBootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() {
return Channels.pipeline(handler);
}
});
if (host == null) {
nettyChannel = serverBootstrap.bind(new InetSocketAddress(port));
} else {
nettyChannel = serverBootstrap.bind(new InetSocketAddress(host, port));
}
super.start();
}
@Override
public void stop() {
logger.info("Syslog UDP Source stopping...");
logger.info("Metrics:{}", counterGroup);
if (nettyChannel != null) {
nettyChannel.close();
try {
nettyChannel.getCloseFuture().await(60, TimeUnit.SECONDS);
} catch (InterruptedException e) {
logger.warn("netty server stop interrupted", e);
} finally {
nettyChannel = null;
}
}
super.stop();
}
@Override
public void configure(Context context) {
Configurables.ensureRequiredNonNull(
context, SyslogSourceConfigurationConstants.CONFIG_PORT);
port = context.getInteger(SyslogSourceConfigurationConstants.CONFIG_PORT);
host = context.getString(SyslogSourceConfigurationConstants.CONFIG_HOST);
isSyslogMultiLineBody = context.getBoolean(SyslogSourceConfigurationConstants.CONFIG_IS_SYSLOG_MULTILINE_BODY,SyslogSourceConfigurationConstants.DEFAULT_IS_SYSLOG_MULTILINE_BODY);
formaterProp = context.getSubProperties(
SyslogSourceConfigurationConstants.CONFIG_FORMAT_PREFIX);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.flume.source;
import org.apache.flume.Event;
import org.apache.flume.event.EventBuilder;
import org.jboss.netty.buffer.ChannelBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.HashMap;
import java.util.Map;
import java.util.regex.MatchResult;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class SyslogUtils {
final public static String SYSLOG_TIMESTAMP_FORMAT_RFC5424_2 = "yyyy-MM-dd'T'HH:mm:ss.SZ";
final public static String SYSLOG_TIMESTAMP_FORMAT_RFC5424_1 = "yyyy-MM-dd'T'HH:mm:ss.S";
final public static String SYSLOG_TIMESTAMP_FORMAT_RFC5424_3 = "yyyy-MM-dd'T'HH:mm:ssZ";
final public static String SYSLOG_TIMESTAMP_FORMAT_RFC5424_4 = "yyyy-MM-dd'T'HH:mm:ss";
final public static String SYSLOG_TIMESTAMP_FORMAT_RFC3164_1 = "yyyyMMM d HH:mm:ss";
final public static String SYSLOG_MSG_RFC5424_0 =
"(?:\\d\\s)?" +// version
// yyyy-MM-dd'T'HH:mm:ss.SZ or yyyy-MM-dd'T'HH:mm:ss.S+hh:mm or - (null stamp)
"(?:(\\d{4}[-]\\d{2}[-]\\d{2}[T]\\d{2}[:]\\d{2}[:]\\d{2}(?:\\.\\d{1,6})?(?:[+-]\\d{2}[:]\\d{2}|Z)?)|-)" + // stamp
"\\s" + // separator
"(?:([\\w][\\w\\d\\.@-]*)|-)" + // host name or - (null)
"\\s" + // separator
"(.*)$"; // body
final public static String SYSLOG_MSG_RFC3164_0 =
// stamp MMM d HH:mm:ss, single digit date has two spaces
"([A-Z][a-z][a-z]\\s{1,2}\\d{1,2}\\s\\d{2}[:]\\d{2}[:]\\d{2})" +
"\\s" + // separator
"([\\w][\\w\\d\\.@-]*)" + // host
"\\s(.*)$"; // body
final public static int SYSLOG_TIMESTAMP_POS = 1;
final public static int SYSLOG_HOSTNAME_POS = 2;
final public static int SYSLOG_BODY_POS = 3;
private Mode m = Mode.START;
private StringBuilder prio = new StringBuilder();
private ByteArrayOutputStream baos;
private static final Logger logger = LoggerFactory
.getLogger(SyslogUtils.class);
final public static String SYSLOG_FACILITY = "Facility";
final public static String SYSLOG_SEVERITY = "Severity";
final public static String EVENT_STATUS = "flume.syslog.status";
final public static Integer MIN_SIZE = 10;
final public static Integer DEFAULT_SIZE = 2500;
private final boolean isUdp;
private boolean isBadEvent;
private boolean isIncompleteEvent;
private boolean isSyslogMultiLineBody;
private Integer maxSize;
private class SyslogFormatter {
public Pattern regexPattern;
public ArrayList<String> searchPattern = new ArrayList<String>();
public ArrayList<String> replacePattern = new ArrayList<String>();
public ArrayList<SimpleDateFormat> dateFormat = new ArrayList<SimpleDateFormat>();
public boolean addYear;
}
private ArrayList<SyslogFormatter> formats = new ArrayList<SyslogFormatter>();
private String timeStamp = null;
private String hostName = null;
private String msgBody = null;
public SyslogUtils() {
this(false);
}
public SyslogUtils(boolean isUdp) {
this(DEFAULT_SIZE, isUdp, SyslogSourceConfigurationConstants.DEFAULT_IS_SYSLOG_MULTILINE_BODY);
}
public SyslogUtils(boolean isUdp, boolean isMultilineBody) {
this(DEFAULT_SIZE, isUdp, isMultilineBody);
}
public SyslogUtils(Integer eventSize, boolean isUdp, boolean isMultilineBody){
this.isUdp = isUdp;
isBadEvent = false;
isIncompleteEvent = false;
this.isSyslogMultiLineBody = isMultilineBody;
maxSize = (eventSize < MIN_SIZE) ? MIN_SIZE : eventSize;
baos = new ByteArrayOutputStream(eventSize);
initHeaderFormats();
}
// extend the default header formatter
public void addFormats(Map<String, String> formatProp) {
if (formatProp.isEmpty() || !formatProp.containsKey(
SyslogSourceConfigurationConstants.CONFIG_REGEX)) {
return;
}
SyslogFormatter fmt1 = new SyslogFormatter();
fmt1.regexPattern = Pattern.compile( formatProp.get(
SyslogSourceConfigurationConstants.CONFIG_REGEX) );
if (formatProp.containsKey(
SyslogSourceConfigurationConstants.CONFIG_SEARCH)) {
fmt1.searchPattern.add(formatProp.get(
SyslogSourceConfigurationConstants.CONFIG_SEARCH));
}
if (formatProp.containsKey(
SyslogSourceConfigurationConstants.CONFIG_REPLACE)) {
fmt1.replacePattern.add(formatProp.get(
SyslogSourceConfigurationConstants.CONFIG_REPLACE));
}
if (formatProp.containsKey(
SyslogSourceConfigurationConstants.CONFIG_DATEFORMAT)) {
fmt1.dateFormat.add(new SimpleDateFormat(formatProp.get(
SyslogSourceConfigurationConstants.CONFIG_DATEFORMAT)));
}
formats.add(0, fmt1);
}
// setup built-in formats
private void initHeaderFormats() {
// setup RFC5424 formater
SyslogFormatter fmt1 = new SyslogFormatter();
fmt1.regexPattern = Pattern.compile(SYSLOG_MSG_RFC5424_0);
// 'Z' in timestamp indicates UTC zone, so replace it it with '+0000' for date formatting
fmt1.searchPattern.add("Z");
fmt1.replacePattern.add("+0000");
// timezone in RFC5424 is [+-]tt:tt, so remove the ':' for java date formatting
fmt1.searchPattern.add("([+-])(\\d{2})[:](\\d{2})");
fmt1.replacePattern.add("$1$2$3");
fmt1.dateFormat.add(new SimpleDateFormat(SYSLOG_TIMESTAMP_FORMAT_RFC5424_1));
fmt1.dateFormat.add(new SimpleDateFormat(SYSLOG_TIMESTAMP_FORMAT_RFC5424_2));
fmt1.dateFormat.add(new SimpleDateFormat(SYSLOG_TIMESTAMP_FORMAT_RFC5424_3));
fmt1.dateFormat.add(new SimpleDateFormat(SYSLOG_TIMESTAMP_FORMAT_RFC5424_4));
fmt1.addYear = false;
// setup RFC3164 formater
SyslogFormatter fmt2 = new SyslogFormatter();
fmt2.regexPattern = Pattern.compile(SYSLOG_MSG_RFC3164_0, Pattern.MULTILINE|Pattern.DOTALL);
// the single digit date has two spaces, so trim it
fmt2.searchPattern.add(" ");
fmt2.replacePattern.add(" ");
fmt2.dateFormat.add(new SimpleDateFormat(SYSLOG_TIMESTAMP_FORMAT_RFC3164_1));
fmt2.addYear = true;
formats.add(fmt1);
formats.add(fmt2);
}
enum Mode {
START, PRIO, DATA
};
public enum SyslogStatus{
OTHER("Unknown"),
INVALID("Invalid"),
INCOMPLETE("Incomplete");
private final String syslogStatus;
private SyslogStatus(String status){
syslogStatus = status;
}
public String getSyslogStatus(){
return this.syslogStatus;
}
}
// create the event from syslog data
Event buildEvent() {
byte[] body;
int pri = 0;
int sev = 0;
int facility = 0;
if(!isBadEvent){
pri = Integer.parseInt(prio.toString());
sev = pri % 8;
facility = pri / 8;
formatHeaders();
}
Map <String, String> headers = new HashMap<String, String>();
headers.put(SYSLOG_FACILITY, String.valueOf(facility));
headers.put(SYSLOG_SEVERITY, String.valueOf(sev));
if ((timeStamp != null) && timeStamp.length() > 0) {
headers.put("timestamp", timeStamp);
}
if ((hostName != null) && (hostName.length() > 0)) {
headers.put("host", hostName);
}
if(isBadEvent){
logger.warn("Event created from Invalid Syslog data.");
headers.put(EVENT_STATUS, SyslogStatus.INVALID.getSyslogStatus());
} else if(isIncompleteEvent){
logger.warn("Event size larger than specified event size: {}. You should " +
"consider increasing your event size.", maxSize);
headers.put(EVENT_STATUS, SyslogStatus.INCOMPLETE.getSyslogStatus());
}
if ((msgBody != null) && (msgBody.length() > 0)) {
body = msgBody.getBytes();
} else {
body = baos.toByteArray();
}
reset();
// format the message
return EventBuilder.withBody(body, headers);
}
// Apply each known pattern to message
private void formatHeaders() {
String eventStr = baos.toString();
for(int p=0; p < formats.size(); p++) {
SyslogFormatter fmt = formats.get(p);
Pattern pattern = fmt.regexPattern;
Matcher matcher = pattern.matcher(eventStr);
if (! matcher.matches()) {
continue;
}
MatchResult res = matcher.toMatchResult();
for (int grp=1; grp <= res.groupCount(); grp++) {
String value = res.group(grp);
if (grp == SYSLOG_TIMESTAMP_POS) {
// apply available format replacements to timestamp
if (value != null) {
for (int sp=0; sp < fmt.searchPattern.size(); sp++) {
value = value.replaceAll(fmt.searchPattern.get(sp), fmt.replacePattern.get(sp));
}
// Add year to timestamp if needed
if (fmt.addYear) {
value = String.valueOf(Calendar.getInstance().get(Calendar.YEAR)) + value;
}
// try the available time formats to timestamp
for (int dt = 0; dt < fmt.dateFormat.size(); dt++) {
try {
timeStamp = String.valueOf(fmt.dateFormat.get(dt).parse(value).getTime());
break; // done. formatted the time
} catch (ParseException e) {
// Error formatting the timeStamp, try next format
continue;
}
}
}
} else if (grp == SYSLOG_HOSTNAME_POS) {
hostName = value;
} else if (grp == SYSLOG_BODY_POS) {
msgBody = value;
}
}
break; // we successfully parsed the message using this pattern
}
}
private void reset(){
baos.reset();
m = Mode.START;
prio.delete(0, prio.length());
isBadEvent = false;
isIncompleteEvent = false;
hostName = null;
timeStamp = null;
msgBody = null;
}
// extract relevant syslog data needed for building Flume event
public Event extractEvent(ChannelBuffer in){
/* for protocol debugging
ByteBuffer bb = in.toByteBuffer();
int remaining = bb.remaining();
byte[] buf = new byte[remaining];
bb.get(buf);
HexDump.dump(buf, 0, System.out, 0);
*/
byte b = 0;
Event e = null;
boolean doneReading = false;
try {
while (!doneReading && in.readable()) {
b = in.readByte();
switch (m) {
case START:
if (b == '<') {
m = Mode.PRIO;
} else if(b == '\n'){
//If the character is \n, it was because the last event was exactly
//as long as the maximum size allowed and
//the only remaining character was the delimiter - '\n', or
//multiple delimiters were sent in a row.
//Just ignore it, and move forward, don't change the mode.
//This is a no-op, just ignore it.
logger.debug("Delimiter found while in START mode, ignoring..");
} else {
isBadEvent = true;
baos.write(b);
//Bad event, just dump everything as if it is data.
m = Mode.DATA;
}
break;
case PRIO:
if (b == '>') {
m = Mode.DATA;
} else {
char ch = (char) b;
prio.append(ch);
if (!Character.isDigit(ch)) {
isBadEvent = true;
//Append the priority to baos:
String badPrio = "<"+ prio;
baos.write(badPrio.getBytes());
//If we hit a bad priority, just write as if everything is data.
m = Mode.DATA;
}
}
break;
case DATA:
// TCP syslog entries are separated by '\n'
// UDP syslog entries with multi-line body separated by '\n' or '\r'
// multi-line event processing handled by sink
if(!isSyslogMultiLineBody){
if (b == '\n') {
e = buildEvent();
doneReading = true;
} else {
baos.write(b);
}
}else{baos.write(b);
}
if(baos.size() == this.maxSize && !doneReading){
isIncompleteEvent = true;
e = buildEvent();
doneReading = true;
}
break;
}
}
// UDP doesn't send a newline, so just use what we received
if (e == null && isUdp) {
doneReading = true;
e = buildEvent();
}
//} catch (IndexOutOfBoundsException eF) {
// e = buildEvent(prio, baos);
} catch (IOException e1) {
//no op
} finally {
// no-op
}
return e;
}
public Integer getEventSize() {
return maxSize;
}
public void setEventSize(Integer eventSize) {
this.maxSize = eventSize;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.flume.source;
import org.apache.flume.Event;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.junit.Assert;
import org.junit.Test;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Map;
public class TestSyslogUtils {
@Test
public void TestHeader0() throws ParseException {
String stamp1 = "2012-04-13T11:11:11";
String format1 = "yyyy-MM-dd'T'HH:mm:ssZ";
String host1 = "ubuntu-11.cloudera.com";
String data1 = "some msg";
// timestamp with hh:mm format timezone with no version
String msg1 = "<10>" + stamp1+ "+08:00" + " " + host1 + " " + data1 + "\n";
checkHeader(msg1, stamp1 + "+0800", format1, host1, data1);
}
@Test
public void TestHeader1() throws ParseException {
String stamp1 = "2012-04-13T11:11:11";
String format1 = "yyyy-MM-dd'T'HH:mm:ss";
String host1 = "ubuntu-11.cloudera.com";
String data1 = "some msg";
String msg1 = "<10>1 " + stamp1 + " " + host1 + " " + data1 + "\n";
checkHeader(msg1, stamp1, format1, host1, data1);
}
@Test
public void TestHeader2() throws ParseException {
String stamp1 = "2012-04-13T11:11:11";
String format1 = "yyyy-MM-dd'T'HH:mm:ssZ";
String host1 = "ubuntu-11.cloudera.com";
String data1 = "some msg";
// timestamp with 'Z' appended, translates to UTC
String msg1 = "<10>1 " + stamp1+ "Z" + " " + host1 + " " + data1 + "\n";
checkHeader(msg1, stamp1 + "+0000", format1, host1, data1);
}
@Test
public void TestHeader3() throws ParseException {
String stamp1 = "2012-04-13T11:11:11";
String format1 = "yyyy-MM-dd'T'HH:mm:ssZ";
String host1 = "ubuntu-11.cloudera.com";
String data1 = "some msg";
// timestamp with hh:mm format timezone
String msg1 = "<10>1 " + stamp1+ "+08:00" + " " + host1 + " " + data1 + "\n";
checkHeader(msg1, stamp1 + "+0800", format1, host1, data1);
}
@Test
public void TestHeader4() throws ParseException {
String host1 = "ubuntu-11.cloudera.com";
String data1 = "some msg";
// null format timestamp (-)
String msg1 = "<10>1 " + "-" + " " + host1 + " " + data1 + "\n";
checkHeader(msg1, null, null, host1, data1);
}
@Test
public void TestHeader5() throws ParseException {
String stamp1 = "2012-04-13T11:11:11";
String format1 = "yyyy-MM-dd'T'HH:mm:ss";
String host1 = "-";
String data1 = "some msg";
// null host
String msg1 = "<10>1 " + stamp1 + " " + host1 + " " + data1 + "\n";
checkHeader(msg1, stamp1, format1, null, data1);
}
@Test
public void TestHeader6() throws ParseException {
String stamp1 = "2012-04-13T11:11:11";
String format1 = "yyyy-MM-dd'T'HH:mm:ssZ";
String host1 = "-";
String data1 = "some msg";
// null host
String msg1 = "<10>1 " + stamp1+ "Z" + " " + host1 + " " + data1 + "\n";
checkHeader(msg1, stamp1 + "+0000", format1, null, data1);
}
@Test
public void TestHeader7() throws ParseException {
String stamp1 = "2012-04-13T11:11:11";
String format1 = "yyyy-MM-dd'T'HH:mm:ssZ";
String host1 = "-";
String data1 = "some msg";
// null host
String msg1 = "<10>1 " + stamp1+ "+08:00" + " " + host1 + " " + data1 + "\n";
checkHeader(msg1, stamp1 + "+0800", format1, null, data1);
}
@Test
public void TestHeader8() throws ParseException {
String stamp1 = "2012-04-13T11:11:11.999";
String format1 = "yyyy-MM-dd'T'HH:mm:ss.S";
String host1 = "ubuntu-11.cloudera.com";
String data1 = "some msg";
String msg1 = "<10>1 " + stamp1 + " " + host1 + " " + data1 + "\n";
checkHeader(msg1, stamp1, format1, host1, data1);
}
@Test
public void TestHeader9() throws ParseException {
String stamp1 = "Apr 11 13:14:04";
String format1 = "yyyyMMM d HH:mm:ss";
String host1 = "ubuntu-11.cloudera.com";
String data1 = "some msg";
// timestamp with 'Z' appended, translates to UTC
String msg1 = "<10>" + stamp1 + " " + host1 + " " + data1 + "\n";
checkHeader(msg1, String.valueOf(Calendar.getInstance().get(Calendar.YEAR)) + stamp1,
format1, host1, data1);
}
@Test
public void TestHeader10() throws ParseException {
String stamp1 = "Apr 1 13:14:04";
String format1 = "yyyyMMM d HH:mm:ss";
String host1 = "ubuntu-11.cloudera.com";
String data1 = "some msg";
// timestamp with 'Z' appended, translates to UTC
String msg1 = "<10>" + stamp1 + " " + host1 + " " + data1 + "\n";
checkHeader(msg1, String.valueOf(Calendar.getInstance().get(Calendar.YEAR)) + stamp1,
format1, host1, data1);
}
@Test
public void TestRfc3164HeaderApacheLogWithNulls() throws ParseException {
String stamp1 = "Apr 1 13:14:04";
String format1 = "yyyyMMM d HH:mm:ss";
String host1 = "ubuntu-11.cloudera.com";
String data1 = "- hyphen_null_breaks_5424_pattern [07/Jun/2012:14:46:44 -0600]";
String msg1 = "<10>" + stamp1 + " " + host1 + " " + data1 + "\n";
checkHeader(msg1, String.valueOf(Calendar.getInstance().get(Calendar.YEAR)) + stamp1,
format1, host1, data1);
}
public void checkHeader(String msg1, String stamp1, String format1, String host1, String data1) throws ParseException {
SyslogUtils util = new SyslogUtils(false);
ChannelBuffer buff = ChannelBuffers.buffer(200);
buff.writeBytes(msg1.getBytes());
Event e = util.extractEvent(buff);
if(e == null){
throw new NullPointerException("Event is null");
}
Map<String, String> headers2 = e.getHeaders();
if (stamp1 == null) {
Assert.assertFalse(headers2.containsKey("timestamp"));
} else {
SimpleDateFormat formater = new SimpleDateFormat(format1);
Assert.assertEquals(String.valueOf(formater.parse(stamp1).getTime()), headers2.get("timestamp"));
}
if (host1 == null) {
Assert.assertFalse(headers2.containsKey("host"));
} else {
String host2 = headers2.get("host");
Assert.assertEquals(host2,host1);
}
Assert.assertEquals(data1, new String(e.getBody()));
}
/**
* Test bad event format 1: Priority is not numeric
*/
@Test
public void testExtractBadEvent1() {
String badData1 = "<10F> bad bad data\n";
SyslogUtils util = new SyslogUtils(false);
ChannelBuffer buff = ChannelBuffers.buffer(100);
buff.writeBytes(badData1.getBytes());
Event e = util.extractEvent(buff);
if(e == null){
throw new NullPointerException("Event is null");
}
Map<String, String> headers = e.getHeaders();
Assert.assertEquals("0", headers.get(SyslogUtils.SYSLOG_FACILITY));
Assert.assertEquals("0", headers.get(SyslogUtils.SYSLOG_SEVERITY));
Assert.assertEquals(SyslogUtils.SyslogStatus.INVALID.getSyslogStatus(),
headers.get(SyslogUtils.EVENT_STATUS));
Assert.assertEquals(badData1.trim(), new String(e.getBody()).trim());
}
/**
* Test bad event format 2: The first char is not <
*/
@Test
public void testExtractBadEvent2() {
String badData1 = "hi guys! <10> bad bad data\n";
SyslogUtils util = new SyslogUtils(false);
ChannelBuffer buff = ChannelBuffers.buffer(100);
buff.writeBytes(badData1.getBytes());
Event e = util.extractEvent(buff);
if(e == null){
throw new NullPointerException("Event is null");
}
Map<String, String> headers = e.getHeaders();
Assert.assertEquals("0", headers.get(SyslogUtils.SYSLOG_FACILITY));
Assert.assertEquals("0", headers.get(SyslogUtils.SYSLOG_SEVERITY));
Assert.assertEquals(SyslogUtils.SyslogStatus.INVALID.getSyslogStatus(),
headers.get(SyslogUtils.EVENT_STATUS));
Assert.assertEquals(badData1.trim(), new String(e.getBody()).trim());
}
/**
* Good event
*/
@Test
public void testExtractGoodEvent() {
String priority = "<10>";
String goodData1 = "Good good good data\n";
SyslogUtils util = new SyslogUtils(false);
ChannelBuffer buff = ChannelBuffers.buffer(100);
buff.writeBytes((priority+goodData1).getBytes());
Event e = util.extractEvent(buff);
if(e == null){
throw new NullPointerException("Event is null");
}
Map<String, String> headers = e.getHeaders();
Assert.assertEquals("1", headers.get(SyslogUtils.SYSLOG_FACILITY));
Assert.assertEquals("2", headers.get(SyslogUtils.SYSLOG_SEVERITY));
Assert.assertEquals(null, headers.get(SyslogUtils.EVENT_STATUS));
Assert.assertEquals(goodData1.trim(), new String(e.getBody()).trim());
}
/**
* Bad event immediately followed by a good event
*/
@Test
public void testBadEventGoodEvent(){
String badData1 = "hi guys! <10F> bad bad data\n";
SyslogUtils util = new SyslogUtils(false);
ChannelBuffer buff = ChannelBuffers.buffer(100);
buff.writeBytes(badData1.getBytes());
String priority = "<10>";
String goodData1 = "Good good good data\n";
buff.writeBytes((priority+goodData1).getBytes());
Event e = util.extractEvent(buff);
if(e == null){
throw new NullPointerException("Event is null");
}
Map<String, String> headers = e.getHeaders();
Assert.assertEquals("0", headers.get(SyslogUtils.SYSLOG_FACILITY));
Assert.assertEquals("0", headers.get(SyslogUtils.SYSLOG_SEVERITY));
Assert.assertEquals(SyslogUtils.SyslogStatus.INVALID.getSyslogStatus(),
headers.get(SyslogUtils.EVENT_STATUS));
Assert.assertEquals(badData1.trim(), new String(e.getBody()).trim());
Event e2 = util.extractEvent(buff);
if(e2 == null){
throw new NullPointerException("Event is null");
}
Map<String, String> headers2 = e2.getHeaders();
Assert.assertEquals("1", headers2.get(SyslogUtils.SYSLOG_FACILITY));
Assert.assertEquals("2", headers2.get(SyslogUtils.SYSLOG_SEVERITY));
Assert.assertEquals(null,
headers2.get(SyslogUtils.EVENT_STATUS));
Assert.assertEquals(goodData1.trim(), new String(e2.getBody()).trim());
}
@Test
public void testGoodEventBadEvent(){
String badData1 = "hi guys! <10F> bad bad data\n";
String priority = "<10>";
String goodData1 = "Good good good data\n";
SyslogUtils util = new SyslogUtils(false);
ChannelBuffer buff = ChannelBuffers.buffer(100);
buff.writeBytes((priority+goodData1).getBytes());
buff.writeBytes(badData1.getBytes());
Event e2 = util.extractEvent(buff);
if(e2 == null){
throw new NullPointerException("Event is null");
}
Map<String, String> headers2 = e2.getHeaders();
Assert.assertEquals("1", headers2.get(SyslogUtils.SYSLOG_FACILITY));
Assert.assertEquals("2", headers2.get(SyslogUtils.SYSLOG_SEVERITY));
Assert.assertEquals(null,
headers2.get(SyslogUtils.EVENT_STATUS));
Assert.assertEquals(goodData1.trim(), new String(e2.getBody()).trim());
Event e = util.extractEvent(buff);
if(e == null){
throw new NullPointerException("Event is null");
}
Map<String, String> headers = e.getHeaders();
Assert.assertEquals("0", headers.get(SyslogUtils.SYSLOG_FACILITY));
Assert.assertEquals("0", headers.get(SyslogUtils.SYSLOG_SEVERITY));
Assert.assertEquals(SyslogUtils.SyslogStatus.INVALID.getSyslogStatus(),
headers.get(SyslogUtils.EVENT_STATUS));
Assert.assertEquals(badData1.trim(), new String(e.getBody()).trim());
}
@Test
public void testBadEventBadEvent(){
String badData1 = "hi guys! <10F> bad bad data\n";
SyslogUtils util = new SyslogUtils(false);
ChannelBuffer buff = ChannelBuffers.buffer(100);
buff.writeBytes(badData1.getBytes());
String badData2 = "hi guys! <20> bad bad data\n";
buff.writeBytes((badData2).getBytes());
Event e = util.extractEvent(buff);
if(e == null){
throw new NullPointerException("Event is null");
}
Map<String, String> headers = e.getHeaders();
Assert.assertEquals("0", headers.get(SyslogUtils.SYSLOG_FACILITY));
Assert.assertEquals("0", headers.get(SyslogUtils.SYSLOG_SEVERITY));
Assert.assertEquals(SyslogUtils.SyslogStatus.INVALID.getSyslogStatus(),
headers.get(SyslogUtils.EVENT_STATUS));
Assert.assertEquals(badData1.trim(), new String(e.getBody()).trim());
Event e2 = util.extractEvent(buff);
if(e2 == null){
throw new NullPointerException("Event is null");
}
Map<String, String> headers2 = e2.getHeaders();
Assert.assertEquals("0", headers2.get(SyslogUtils.SYSLOG_FACILITY));
Assert.assertEquals("0", headers2.get(SyslogUtils.SYSLOG_SEVERITY));
Assert.assertEquals(SyslogUtils.SyslogStatus.INVALID.getSyslogStatus(),
headers2.get(SyslogUtils.EVENT_STATUS));
Assert.assertEquals(badData2.trim(), new String(e2.getBody()).trim());
}
@Test
public void testGoodEventGoodEvent() {
String priority = "<10>";
String goodData1 = "Good good good data\n";
SyslogUtils util = new SyslogUtils(false);
ChannelBuffer buff = ChannelBuffers.buffer(100);
buff.writeBytes((priority+goodData1).getBytes());
String priority2 = "<20>";
String goodData2 = "Good really good data\n";
buff.writeBytes((priority2+goodData2).getBytes());
Event e = util.extractEvent(buff);
if(e == null){
throw new NullPointerException("Event is null");
}
Map<String, String> headers = e.getHeaders();
Assert.assertEquals("1", headers.get(SyslogUtils.SYSLOG_FACILITY));
Assert.assertEquals("2", headers.get(SyslogUtils.SYSLOG_SEVERITY));
Assert.assertEquals(null,
headers.get(SyslogUtils.EVENT_STATUS));
Assert.assertEquals(goodData1.trim(), new String(e.getBody()).trim());
Event e2 = util.extractEvent(buff);
if(e2 == null){
throw new NullPointerException("Event is null");
}
Map<String, String> headers2 = e2.getHeaders();
Assert.assertEquals("2", headers2.get(SyslogUtils.SYSLOG_FACILITY));
Assert.assertEquals("4", headers2.get(SyslogUtils.SYSLOG_SEVERITY));
Assert.assertEquals(null,
headers.get(SyslogUtils.EVENT_STATUS));
Assert.assertEquals(goodData2.trim(), new String(e2.getBody()).trim());
}
@Test
public void testExtractBadEventLarge() {
String badData1 = "<10> bad bad data bad bad\n";
SyslogUtils util = new SyslogUtils(5, false, false);
ChannelBuffer buff = ChannelBuffers.buffer(100);
buff.writeBytes(badData1.getBytes());
Event e = util.extractEvent(buff);
if(e == null){
throw new NullPointerException("Event is null");
}
Map<String, String> headers = e.getHeaders();
Assert.assertEquals("1", headers.get(SyslogUtils.SYSLOG_FACILITY));
Assert.assertEquals("2", headers.get(SyslogUtils.SYSLOG_SEVERITY));
Assert.assertEquals(SyslogUtils.SyslogStatus.INCOMPLETE.getSyslogStatus(),
headers.get(SyslogUtils.EVENT_STATUS));
Assert.assertEquals("bad bad d".trim(), new String(e.getBody()).trim());
Event e2 = util.extractEvent(buff);
if(e2 == null){
throw new NullPointerException("Event is null");
}
Map<String, String> headers2 = e2.getHeaders();
Assert.assertEquals("0", headers2.get(SyslogUtils.SYSLOG_FACILITY));
Assert.assertEquals("0", headers2.get(SyslogUtils.SYSLOG_SEVERITY));
Assert.assertEquals(SyslogUtils.SyslogStatus.INVALID.getSyslogStatus(),
headers2.get(SyslogUtils.EVENT_STATUS));
Assert.assertEquals("ata bad ba".trim(), new String(e2.getBody()).trim());
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment