Skip to content

Instantly share code, notes, and snippets.

@crowdmatt
Last active September 6, 2019 02:26
  • Star 6 You must be signed in to star a gist
  • Fork 6 You must be signed in to fork a gist
Star You must be signed in to star a gist
Save crowdmatt/5256881 to your computer and use it in GitHub Desktop.

Setting up Flume NG, listening to syslog over UDP, with an S3 Sink

My goal was to set up Flume on my web instances, and write all events into s3, so I could easily use other tools like Amazon Elastic Map Reduce, and Amazon Red Shift.

I didn't want to have to deal with log rotation myself, so I setup Flume to read from a syslog UDP source. In this case, Flume NG acts as a syslog server, so as long as Flume is running, my web application can simply write to it in syslog format on the specified port. Most languages have plugins for this.

At the time of this writing, I've been able to get Flume NG up and running on 3 ec2 instances, and all writing to the same bucket.

Install Flume NG on instances

Flume NG is different than Flume OG (original generation) in that it doesn't use a master in any way. So if you see any articles referring to collectors or masters, you're probably looking at Flume OG documentations or guides.

First, go to http://flume.apache.org/download.html and find the link to the latest .tar.gz version. At the time of writing, it's 1.3.1 and the URL to download is http://apache.mirrors.tds.net/flume/1.3.1/apache-flume-1.3.1-bin.tar.gz

SSH into your ec2 instances:

ssh -i ~/.ssh/XXXXXX.pem ec2-user@ec2-NN-NNN-NN-NNN.compute-1.amazonaws.com

On your ec2 instances, wget the .tar.gz, like so:

cd ~
wget http://apache.mirrors.tds.net/flume/1.3.1/apache-flume-1.3.1-bin.tar.gz

Once it's done, untar it, like so:

cd ~
tar xvzf apache-flume-1.3.1-bin.tar.gz
rm apache-flume-1.3.1-bin.tar.gz

To prepare for configuration, let's copy the template flume.conf:

cp ~/apache-flume-1.3.1-bin/conf/flume-conf.properties.template ~/apache-flume-1.3.1-bin/conf/flume.conf

Install Hadoop Core & Commons & Jets3t jars

I ran into an issue which generated the error below. This seems to be caused by the hadoop jar not being included in Flume. So, download the hadoop binary as follows (look for the most recent version, which is 1.0.4 at the time of writing):

cd ~
wget http://mirror.symnds.com/software/Apache/hadoop/common/hadoop-1.0.4/hadoop-1.0.4-bin.tar.gz
tar xvzf hadoop-1.0.4-bin.tar.gz
rm hadoop-1.0.4-bin.tar.gz
cp ~/hadoop-1.0.4/hadoop-core-1.0.4.jar ~/apache-flume-1.3.1-bin/lib/
cp ~/hadoop-1.0.4/lib/commons-configuration-1.6.jar ~/apache-flume-1.3.1-bin/lib/
cp ~/hadoop-1.0.4/lib/commons-httpclient-3.0.1.jar ~/apache-flume-1.3.1-bin/lib/
cp ~/hadoop-1.0.4/lib/jets3t-0.6.1.jar ~/apache-flume-1.3.1-bin/lib/
cp ~/hadoop-1.0.4/lib/commons-codec-1.4.jar ~/apache-flume-1.3.1-bin/lib/

Configure Flume NG on instances

Let's change the config file to write to S3:

vim ~/apache-flume-1.3.1-bin/conf/flume.conf

Replace the existing contents with the contents below.

Note: I originally thought about recoverable memory channels, but flume documentation has this warning: The Recoverable Memory Channel has been deprecated in favor of the FileChannel. FileChannel is durable channel and performs better than the Recoverable Memory Channel.

Another issue I ran into was that my secret key had a / in it, which was causing issues. I generated a new keypair on amazon https://portal.aws.amazon.com/gp/aws/securityCredentials which didn't have any characters with issues, and it worked without a hitch.

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#  http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.


# The configuration file needs to define the sources, 
# the channels and the sinks.
# Sources, channels and sinks are defined per agent, 
# in this case called 'agent'

agent.sources = seqGenSrc
agent.channels = fileChannel
agent.sinks = s3Sink

# For each one of the sources, the type is defined
agent.sources.seqGenSrc.type = syslogudp
agent.sources.seqGenSrc.port = 5140
agent.sources.seqGenSrc.host = localhost

# The channel can be defined as follows.
agent.sources.seqGenSrc.channels = fileChannel

# Each sink's type must be defined
agent.sinks.s3Sink.type = hdfs

#Specify the channel the sink should use
agent.sinks.s3Sink.channel = fileChannel
agent.sinks.s3Sink.hdfs.path = s3://AWS_ACCESS_KEY_ID:AWS_SECRET_ACCESS_KEY@BUCKET-NAME/

# Each channel's type is defined.
agent.channels.fileChannel.type = file

# Other config values specific to each type of channel(sink or source)
# can be defined as well
# In this case, it specifies the capacity of the file channel which can depend on the size of your filesystem
agent.channels.fileChannel.capacity = 1000000

Testing your Configuration

To run flume with the configuration you just setup, run:

cd ~/apache-flume-1.3.1-bin
bin/flume-ng agent --conf ./conf/ -f conf/flume.conf -Dflume.root.logger=DEBUG,console -n agent

Let's trying writing to the UDP syslog on port 5140, and see if they end up in our s3 bucket! To do so, we'll run netcat since logger can't write to different ports.

The syslog implementation in Hadoop expects a format as follows: <14>Mmm DD HH:MM:SS ip-NN-NNN-NNN-NNN YOUR MESSAGE HERE", otherwise it'll get confused. Using that format, we run this (but please replace the timestamp):

nc -w0 -u localhost 5140 <<< "<14>Mar 29 19:25:55 ip-10-204-131-184 Test Message 1"

In a few seconds (give the buffering a chance ;-), you'll be able to see some files show up in your s3 bucket. Go to https://console.aws.amazon.com/s3/home?region=us-east-1 and see them!

The filenames will be in the format: FlumeData.NNNNNNNNNNNN, and encoded in text format, since we didn't compress them in any way in the conf file. The resulting text in it should read as the same message after your IP address, with a newline:

Test Message 1

You can configure all your instances the same way, and try netcating on all of them. You should see all of the writes show up to your s3 bucket, as I have been able to.

Setting up Flume NG init.d for agent on instances

I don't want to have to login and start Flume NG on all my instances manually, of course.

I didn't see anything in the binary tarball for this, so I am going to start with (and revise) the init.d code from here: https://git-wip-us.apache.org/repos/asf?p=bigtop.git;a=blob;f=bigtop-packages/src/common/flume/flume-agent.init;hb=HEAD

My revision is posted at https://github.com/crowdmob/flume-1.3.1-agent.init/blob/master/flume-agent.init

First, let's put that in a script in init.d:

sudo touch /etc/init.d/flume-agent
sudo chmod a+x /etc/init.d/flume-agent
sudo vim /etc/init.d/flume-agent

Simply copy and paste the contents from https://raw.github.com/crowdmob/flume-1.3.1-agent.init/master/flume-agent.init

The next thing you have to do is to setup the directories for runtime. I simply did the following:

sudo mkdir /var/run/flume
sudo mkdir /var/log/flume

Finally, run chkconfig to make sure it starts on boot:

sudo chkconfig flume-agent on

Appendix A: Errors I ran into (for Google Searchers):

Here's the error that was generated, for completeness:

Fixed by including apache commons lib:

2013-03-29 16:53:01,756 (conf-file-poller-0) [ERROR - org.apache.flume.conf.file.AbstractFileConfigurationProvider$FileWatcherRunnable.run(AbstractFileConfigurationProvider.java:207)] Failed to start agent because dependencies were not found in classpath. Error follows.
java.lang.NoClassDefFoundError: org/apache/hadoop/io/SequenceFile$CompressionType
  at org.apache.flume.sink.hdfs.HDFSEventSink.configure(HDFSEventSink.java:214)
	at org.apache.flume.conf.Configurables.configure(Configurables.java:41)
	at org.apache.flume.conf.properties.PropertiesFileConfigurationProvider.loadSinks(PropertiesFileConfigurationProvider.java:373)
	at org.apache.flume.conf.properties.PropertiesFileConfigurationProvider.load(PropertiesFileConfigurationProvider.java:223)
	at org.apache.flume.conf.file.AbstractFileConfigurationProvider.doLoad(AbstractFileConfigurationProvider.java:123)
	at org.apache.flume.conf.file.AbstractFileConfigurationProvider.access$300(AbstractFileConfigurationProvider.java:38)
	at org.apache.flume.conf.file.AbstractFileConfigurationProvider$FileWatcherRunnable.run(AbstractFileConfigurationProvider.java:202)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
	at java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:351)
	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:178)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:165)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:267)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1146)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
	at java.lang.Thread.run(Thread.java:679)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.io.SequenceFile$CompressionType
	at java.net.URLClassLoader$1.run(URLClassLoader.java:217)
	at java.security.AccessController.doPrivileged(Native Method)
	at java.net.URLClassLoader.findClass(URLClassLoader.java:205)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:321)
	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:294)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:266)
  ... 15 more

Fixed by including the jets3t http libraries:

java.lang.NoClassDefFoundError: org/apache/http/client/methods/HttpHead
	at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.initialize(Jets3tNativeFileSystemStore.java:54)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:616)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:82)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:59)
	at org.apache.hadoop.fs.s3native.$Proxy1.initialize(Unknown Source)
	at org.apache.hadoop.fs.s3native.NativeS3FileSystem.initialize(NativeS3FileSystem.java:216)
	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:1386)
	at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:66)
	at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:1404)
	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:254)
	at org.apache.hadoop.fs.Path.getFileSystem(Path.java:187)
	at org.apache.flume.sink.hdfs.BucketWriter.doOpen(BucketWriter.java:207)
	at org.apache.flume.sink.hdfs.BucketWriter.access$000(BucketWriter.java:53)
	at org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:172)
	at org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:170)
	at org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:143)
	at org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:170)
	at org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:364)
	at org.apache.flume.sink.hdfs.HDFSEventSink$2.call(HDFSEventSink.java:729)
	at org.apache.flume.sink.hdfs.HDFSEventSink$2.call(HDFSEventSink.java:727)
	at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
	at java.util.concurrent.FutureTask.run(FutureTask.java:166)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1146)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
	at java.lang.Thread.run(Thread.java:679)
Caused by: java.lang.ClassNotFoundException: org.apache.http.client.methods.HttpHead
	at java.net.URLClassLoader$1.run(URLClassLoader.java:217)
	at java.security.AccessController.doPrivileged(Native Method)
	at java.net.URLClassLoader.findClass(URLClassLoader.java:205)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:321)
	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:294)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:266)
	... 28 more
Exception in thread "SinkRunner-PollingRunner-DefaultSinkProcessor" java.lang.NoClassDefFoundError: org/apache/http/client/methods/HttpHead
	at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.initialize(Jets3tNativeFileSystemStore.java:54)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:616)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:82)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:59)
	at org.apache.hadoop.fs.s3native.$Proxy1.initialize(Unknown Source)
	at org.apache.hadoop.fs.s3native.NativeS3FileSystem.initialize(NativeS3FileSystem.java:216)
	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:1386)
	at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:66)
	at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:1404)
	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:254)
	at org.apache.hadoop.fs.Path.getFileSystem(Path.java:187)
	at org.apache.flume.sink.hdfs.BucketWriter.doOpen(BucketWriter.java:207)
	at org.apache.flume.sink.hdfs.BucketWriter.access$000(BucketWriter.java:53)
	at org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:172)
	at org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:170)
	at org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:143)
	at org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:170)
	at org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:364)
	at org.apache.flume.sink.hdfs.HDFSEventSink$2.call(HDFSEventSink.java:729)
	at org.apache.flume.sink.hdfs.HDFSEventSink$2.call(HDFSEventSink.java:727)
	at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
	at java.util.concurrent.FutureTask.run(FutureTask.java:166)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1146)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
	at java.lang.Thread.run(Thread.java:679)
Caused by: java.lang.ClassNotFoundException: org.apache.http.client.methods.HttpHead
	at java.net.URLClassLoader$1.run(URLClassLoader.java:217)
	at java.security.AccessController.doPrivileged(Native Method)
	at java.net.URLClassLoader.findClass(URLClassLoader.java:205)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:321)
	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:294)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:266)
	... 28 more
@oeegee
Copy link

oeegee commented Jul 26, 2013

Hi~ matt!
How can I logging hdfs, jets3t ?

@soniclavier
Copy link

Really helpful :)
was struggling with this for 2 days. your post finally helped me write contents to hdfs using flume :)

cheers :)

@overcomer
Copy link

Hi Matt,

the link https://github.com/crowdmob/flume-1.3.1-agent.init/blob/master/flume-agent.init is no longer available. Is there any other link where I can see it?

Thanks.

@crowdmatt
Copy link
Author

Actually I abandoned flume for Kafka to be honest :(

@joshjiang
Copy link

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment