Skip to content

Instantly share code, notes, and snippets.

@veysiertekin
Last active October 22, 2020 07:17
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save veysiertekin/33a245152b10fa4217b5b97172a6a9cd to your computer and use it in GitHub Desktop.
Save veysiertekin/33a245152b10fa4217b5b97172a6a9cd to your computer and use it in GitHub Desktop.
Flume
agent1.channels = raw-channel
agent1.sinks = raw-sink1 raw-sink2
agent1.sinkgroups = g1
agent1.channels.raw-channel.type = org.apache.flume.channel.kafka.KafkaChannel
agent1.channels.raw-channel.brokerList = localhost:9092
agent1.channels.raw-channel.kafka.topic = <topic>
agent1.channels.raw-channel.kafka.consumer.group.id = <group id>
agent1.channels.raw-channel.kafka.consumer.fetch.min.bytes = 1000000
agent1.channels.raw-channel.kafka.consumer.fetch.max.wait.ms = 1000
agent1.channels.raw-channel.kafka.consumer.max.poll.records = 18000
agent1.channels.raw-channel.kafka.consumer.session.timeout.ms = 40000
agent1.channels.raw-channel.parseAsFlumeEvent = false
agent1.sinkgroups.g1.sinks = raw-sink1 raw-sink2
agent1.sinkgroups.g1.processor.type = load_balance
agent1.sinkgroups.g1.processor.selector = round_robin
agent1.sinks.raw-sink1.type = hdfs
agent1.sinks.raw-sink1.hdfs.path = <path to folder>/partition_date=%Y%m%d%H/minute=%M
agent1.sinks.raw-sink1.hdfs.rollInterval = 60
agent1.sinks.raw-sink1.hdfs.rollSize = 202400000
agent1.sinks.raw-sink1.hdfs.rollCount = 0
agent1.sinks.raw-sink1.hdfs.round=true
agent1.sinks.raw-sink1.hdfs.roundValue=10
agent1.sinks.raw-sink1.hdfs.roundUnit=minute
agent1.sinks.raw-sink1.hdfs.fileType = CompressedStream
agent1.sinks.raw-sink1.hdfs.codeC = lz4
agent1.sinks.raw-sink1.hdfs.writeFormat = Text
agent1.sinks.raw-sink1.hdfs.batchSize = 6000
agent1.sinks.raw-sink1.hdfs.idleTimeout = 1800
#
# `POD_UUID` allows flume to scale when using with kubernetes, docker swarm etc.
# Please export this variable in `flume-env.sh`
#
# $ export POD_UUID=$(uuidgen)
#
# Than add `-DpropertiesImplementation=org.apache.flume.node.EnvVarResolverProperties` to `JAVA_OPTS` in `flume-env.sh`
#
# export JAVA_OPTS="${FLUME_OPTS} -XshowSettings:vm -Dorg.apache.flume.log.printconfig=true -DpropertiesImplementation=org.apache.flume.node.EnvVarResolverProperties"
#
agent1.sinks.raw-sink1.hdfs.filePrefix = flumeData_sink1_${POD_UUID}_%y%m%d_%H%M
agent1.sinks.raw-sink1.hdfs.useLocalTimeStamp = true
agent1.sinks.raw-sink1.hdfs.threadsPoolSize = 20
agent1.sinks.raw-sink1.hdfs.retryInterval = 90
agent1.sinks.raw-sink1.channel = raw-channel
agent1.sinks.raw-sink2.type = hdfs
agent1.sinks.raw-sink2.hdfs.path = <path to folder>/partition_date=%Y%m%d%H/minute=%M
agent1.sinks.raw-sink2.hdfs.rollInterval = 60
agent1.sinks.raw-sink2.hdfs.rollSize = 202400000
agent1.sinks.raw-sink2.hdfs.rollCount = 0
agent1.sinks.raw-sink2.hdfs.round=true
agent1.sinks.raw-sink2.hdfs.roundValue=10
agent1.sinks.raw-sink2.hdfs.roundUnit=minute
agent1.sinks.raw-sink2.hdfs.fileType = CompressedStream
agent1.sinks.raw-sink2.hdfs.codeC = lz4
agent1.sinks.raw-sink2.hdfs.writeFormat = Text
agent1.sinks.raw-sink2.hdfs.batchSize = 6000
agent1.sinks.raw-sink2.hdfs.idleTimeout = 1800
#
# `POD_UUID` allows flume to scale when using with kubernetes, docker swarm etc.
# Please export this variable in `flume-env.sh`
#
# $ export POD_UUID=$(uuidgen)
#
# Than add `-DpropertiesImplementation=org.apache.flume.node.EnvVarResolverProperties` to `JAVA_OPTS` in `flume-env.sh`
#
# export JAVA_OPTS="${FLUME_OPTS} -XshowSettings:vm -Dorg.apache.flume.log.printconfig=true -DpropertiesImplementation=org.apache.flume.node.EnvVarResolverProperties"
#
agent1.sinks.raw-sink2.hdfs.filePrefix = flumeData_sink2_${POD_UUID}_%y%m%d_%H%M
agent1.sinks.raw-sink2.hdfs.useLocalTimeStamp = true
agent1.sinks.raw-sink2.hdfs.threadsPoolSize = 20
agent1.sinks.raw-sink2.hdfs.retryInterval = 90
agent1.sinks.raw-sink2.channel = raw-channel
agent1.sources.kafka-source.type = org.apache.flume.source.kafka.KafkaSource
agent1.sources.kafka-source.kafka.bootstrap.servers = localhost:9092
agent1.sources.kafka-source.kafka.topics = <topic>
agent1.sources.kafka-source.kafka.consumer.group.id = <group id>
agent1.sources.kafka-source.kafka.consumer.timeout.ms = 1000
agent1.sources.kafka-source.batchSize = 10000
agent1.sources.kafka-source.batchDurationMillis = 2000
agent1.sources.kafka-source.channels = first-channel second-channel
agent1.sources.kafka-source.interceptors = i1
agent1.sources.kafka-source.interceptors.i1.type = regex_extractor
agent1.sources.kafka-source.interceptors.i1.regex = .json-key.:.(\\w+).
agent1.sources.kafka-source.interceptors.i1.serializers = s1
agent1.sources.kafka-source.interceptors.i1.serializers.s1.name = my_key
agent1.sources.kafka-source.selector.type = multiplexing
agent1.sources.kafka-source.selector.header = my_key
agent1.sources.kafka-source.selector.mapping.<first value> = first-channel
agent1.sources.kafka-source.selector.mapping.<second value> = second-channel
agent1.channels.first-channel.type = org.apache.flume.channel.kafka.KafkaChannel
agent1.channels.first-channel.brokerList = localhost:9092
agent1.channels.first-channel.groupId = <group id>
agent1.channels.first-channel.topic = <topic>_first_channel
agent1.channels.second-channel.type = org.apache.flume.channel.kafka.KafkaChannel
agent1.channels.second-channel.brokerList = localhost:9092
agent1.channels.second-channel.groupId = <group id>
agent1.channels.second-channel.topic = <topic>_second_channel
agent1.sinks.first-sink.type = hdfs
agent1.sinks.first-sink.hdfs.path = <path to folder>/partition_date=%Y%m%d%H
agent1.sinks.first-sink.hdfs.rollInterval = 600
agent1.sinks.first-sink.hdfs.rollSize = 102400000
agent1.sinks.first-sink.hdfs.rollCount = 0
agent1.sinks.first-sink.hdfs.round=true
agent1.sinks.first-sink.hdfs.roundValue=1
agent1.sinks.first-sink.hdfs.roundUnit=hour
agent1.sinks.first-sink.hdfs.fileType = CompressedStream
agent1.sinks.first-sink.hdfs.codeC = lz4
agent1.sinks.first-sink.hdfs.writeFormat = Text
agent1.sinks.first-sink.hdfs.batchSize = 1000
agent1.sinks.first-sink.hdfs.idleTimeout = 1800
#
# `POD_UUID` allows flume to scale when using with kubernetes, docker swarm etc.
# Please export this variable in `flume-env.sh`
#
# $ export POD_UUID=$(uuidgen)
#
# Than add `-DpropertiesImplementation=org.apache.flume.node.EnvVarResolverProperties` to `JAVA_OPTS` in `flume-env.sh`
#
# export JAVA_OPTS="${FLUME_OPTS} -XshowSettings:vm -Dorg.apache.flume.log.printconfig=true -DpropertiesImplementation=org.apache.flume.node.EnvVarResolverProperties"
#
agent1.sinks.first-sink.hdfs.filePrefix = flumeData_${POD_UUID}_%y%m%d_%H%M
agent1.sinks.first-sink.channel = <topic>_first_channel
agent1.sinks.second-sink.type = hdfs
agent1.sinks.second-sink.hdfs.path = <path to folder>/partition_date=%Y%m%d%H
agent1.sinks.second-sink.hdfs.rollInterval = 600
agent1.sinks.second-sink.hdfs.rollSize = 102400000
agent1.sinks.second-sink.hdfs.rollCount = 0
agent1.sinks.second-sink.hdfs.round=true
agent1.sinks.second-sink.hdfs.roundValue=1
agent1.sinks.second-sink.hdfs.roundUnit=hour
agent1.sinks.second-sink.hdfs.fileType = CompressedStream
agent1.sinks.second-sink.hdfs.codeC = lz4
agent1.sinks.second-sink.hdfs.writeFormat = Text
agent1.sinks.second-sink.hdfs.batchSize = 1000
agent1.sinks.second-sink.hdfs.idleTimeout = 1800
agent1.sinks.second-sink.hdfs.filePrefix = flumeData_${POD_UUID}_%y%m%d_%H%M
agent1.sinks.second-sink.channel = <topic>_second_channel
agent1.sources = kafka-source
agent1.channels = first-channel second-channel
agent1.sinks = first-sink second-sink
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment