Skip to content

Instantly share code, notes, and snippets.

@priancho
Last active October 30, 2015 06:26
Show Gist options
  • Save priancho/8ab6c9ef078cc1160f65 to your computer and use it in GitHub Desktop.
Save priancho/8ab6c9ef078cc1160f65 to your computer and use it in GitHub Desktop.
flume.conf with a custome twitter source (for userstream), file channel and hdfs sink
agent.sources = s1
agent.sinks = k1
agent.channels = c1
# Flume proxy option needs the FLUME-2618 patch
agent.sources.s1.http.proxyHost=<http proxy ip address>
agent.sources.s1.http.proxyPort=<http proxy port>
# To use TwitterUserStreamToJSONSource, check the following java program.
# https://github.com/priancho/flume/blob/trunk/flume-ng-sources/flume-twitter-source/src/main/java/org/apache/flume/source/twitter/TwitterUserStreamToJSONSource.java
#
# You can get a new jar file with this custom source class by running the commands:
# user@server ~/tmp/]$
# git@github.com:priancho/flume.git
# cd flume/flume-ng-sources/flume-twitter-source
# user@server ~/tmp/flume/flume-ng-sources/flume-twitter-source]$
# mvn compile; mvn package
# ls target/*jar
# target/flume-twitter-source-1.7.0-SNAPSHOT.jar
# Then, backup your original flume-twitter-source jar file and install this new one.
# user@server /usr/lib/flume-ng/lib]$
# mv flume-twitter-source-1.5.0-cdh5.4.3.jar flume-twitter-source-1.5.0-cdh5.4.3.jar.bak
# cp ~/tmp/flume/flume-ng-sources/flume-twitter-source/target/flume-twitter-source-1.7.0-SNAPSHOT.jar .
#
agent.sources.s1.type = org.apache.flume.source.twitter.TwitterUserStreamToJSONSource
agent.sources.s1.channels = c1
agent.sources.s1.consumerKey = <your Twitter consumer key>
agent.sources.s1.consumerSecret = <your Twitter consumer secret>
agent.sources.s1.accessToken = <your Twitter access token>
agent.sources.s1.accessTokenSecret = <your Twitter token secret>
agent.sinks.k1.type = hdfs
agent.sinks.k1.channel = c1
agent.sinks.k1.fileType = DataStream
agent.sinks.k1.writeFormat = Text
agent.sinks.k1.useLocalTimeStamp = true
agent.sinks.k1.hdfs.path = </path/to/data/dir/%Y/%m/%d>
agent.sinks.k1.hdfs.filePrefix = <prefix>
agent.sinks.k1.hdfs.fileSuffix = .json.bz2
agent.sinks.k1.hdfs.fileType = CompressedStream
agent.sinks.k1.hdfs.codeC = bzip2
agent.sinks.k1.hdfs.kerberosPrincipal = <my Kerberos principal name>
agent.sinks.k1.hdfs.kerberosKeytab = </path/to/my/kerberos/principal.keytab>
# roll the file if a new tmp file is created
agent.sinks.cqjpk.hdfs.maxOpenFiles = 1
agent.sinks.cqjpk.hdfs.batchSize = 10
agent.sinks.cqjpk.hdfs.rollCount = 0
agent.sinks.cqjpk.hdfs.rollInterval = 0
agent.sinks.cqjpk.hdfs.rollSize = 0
agent.sinks.cqjpk.hdfs.idleTimeout = 0
agent.sinks.k1.hdfs.useLocalTimeStamp = true
agent.channels.c1.type = file
# Use separate checkpoint/data dirs for each agent (dirs will be locked!)
agent.channels.c1.checkpointDir = /path/to/<agent_name>/checkpoint
agent.channels.c1.dataDirs = /path/to/<agent_name>/data
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment