Last active
October 22, 2020 07:17
-
-
Save veysiertekin/33a245152b10fa4217b5b97172a6a9cd to your computer and use it in GitHub Desktop.
Flume
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
agent1.channels = raw-channel | |
agent1.sinks = raw-sink1 raw-sink2 | |
agent1.sinkgroups = g1 | |
agent1.channels.raw-channel.type = org.apache.flume.channel.kafka.KafkaChannel | |
agent1.channels.raw-channel.brokerList = localhost:9092 | |
agent1.channels.raw-channel.kafka.topic = <topic> | |
agent1.channels.raw-channel.kafka.consumer.group.id = <group id> | |
agent1.channels.raw-channel.kafka.consumer.fetch.min.bytes = 1000000 | |
agent1.channels.raw-channel.kafka.consumer.fetch.max.wait.ms = 1000 | |
agent1.channels.raw-channel.kafka.consumer.max.poll.records = 18000 | |
agent1.channels.raw-channel.kafka.consumer.session.timeout.ms = 40000 | |
agent1.channels.raw-channel.parseAsFlumeEvent = false | |
agent1.sinkgroups.g1.sinks = raw-sink1 raw-sink2 | |
agent1.sinkgroups.g1.processor.type = load_balance | |
agent1.sinkgroups.g1.processor.selector = round_robin | |
agent1.sinks.raw-sink1.type = hdfs | |
agent1.sinks.raw-sink1.hdfs.path = <path to folder>/partition_date=%Y%m%d%H/minute=%M | |
agent1.sinks.raw-sink1.hdfs.rollInterval = 60 | |
agent1.sinks.raw-sink1.hdfs.rollSize = 202400000 | |
agent1.sinks.raw-sink1.hdfs.rollCount = 0 | |
agent1.sinks.raw-sink1.hdfs.round=true | |
agent1.sinks.raw-sink1.hdfs.roundValue=10 | |
agent1.sinks.raw-sink1.hdfs.roundUnit=minute | |
agent1.sinks.raw-sink1.hdfs.fileType = CompressedStream | |
agent1.sinks.raw-sink1.hdfs.codeC = lz4 | |
agent1.sinks.raw-sink1.hdfs.writeFormat = Text | |
agent1.sinks.raw-sink1.hdfs.batchSize = 6000 | |
agent1.sinks.raw-sink1.hdfs.idleTimeout = 1800 | |
# | |
# `POD_UUID` allows flume to scale when using with kubernetes, docker swarm etc. | |
# Please export this variable in `flume-env.sh` | |
# | |
# $ export POD_UUID=$(uuidgen) | |
# | |
# Than add `-DpropertiesImplementation=org.apache.flume.node.EnvVarResolverProperties` to `JAVA_OPTS` in `flume-env.sh` | |
# | |
# export JAVA_OPTS="${FLUME_OPTS} -XshowSettings:vm -Dorg.apache.flume.log.printconfig=true -DpropertiesImplementation=org.apache.flume.node.EnvVarResolverProperties" | |
# | |
agent1.sinks.raw-sink1.hdfs.filePrefix = flumeData_sink1_${POD_UUID}_%y%m%d_%H%M | |
agent1.sinks.raw-sink1.hdfs.useLocalTimeStamp = true | |
agent1.sinks.raw-sink1.hdfs.threadsPoolSize = 20 | |
agent1.sinks.raw-sink1.hdfs.retryInterval = 90 | |
agent1.sinks.raw-sink1.channel = raw-channel | |
agent1.sinks.raw-sink2.type = hdfs | |
agent1.sinks.raw-sink2.hdfs.path = <path to folder>/partition_date=%Y%m%d%H/minute=%M | |
agent1.sinks.raw-sink2.hdfs.rollInterval = 60 | |
agent1.sinks.raw-sink2.hdfs.rollSize = 202400000 | |
agent1.sinks.raw-sink2.hdfs.rollCount = 0 | |
agent1.sinks.raw-sink2.hdfs.round=true | |
agent1.sinks.raw-sink2.hdfs.roundValue=10 | |
agent1.sinks.raw-sink2.hdfs.roundUnit=minute | |
agent1.sinks.raw-sink2.hdfs.fileType = CompressedStream | |
agent1.sinks.raw-sink2.hdfs.codeC = lz4 | |
agent1.sinks.raw-sink2.hdfs.writeFormat = Text | |
agent1.sinks.raw-sink2.hdfs.batchSize = 6000 | |
agent1.sinks.raw-sink2.hdfs.idleTimeout = 1800 | |
# | |
# `POD_UUID` allows flume to scale when using with kubernetes, docker swarm etc. | |
# Please export this variable in `flume-env.sh` | |
# | |
# $ export POD_UUID=$(uuidgen) | |
# | |
# Than add `-DpropertiesImplementation=org.apache.flume.node.EnvVarResolverProperties` to `JAVA_OPTS` in `flume-env.sh` | |
# | |
# export JAVA_OPTS="${FLUME_OPTS} -XshowSettings:vm -Dorg.apache.flume.log.printconfig=true -DpropertiesImplementation=org.apache.flume.node.EnvVarResolverProperties" | |
# | |
agent1.sinks.raw-sink2.hdfs.filePrefix = flumeData_sink2_${POD_UUID}_%y%m%d_%H%M | |
agent1.sinks.raw-sink2.hdfs.useLocalTimeStamp = true | |
agent1.sinks.raw-sink2.hdfs.threadsPoolSize = 20 | |
agent1.sinks.raw-sink2.hdfs.retryInterval = 90 | |
agent1.sinks.raw-sink2.channel = raw-channel |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
agent1.sources.kafka-source.type = org.apache.flume.source.kafka.KafkaSource | |
agent1.sources.kafka-source.kafka.bootstrap.servers = localhost:9092 | |
agent1.sources.kafka-source.kafka.topics = <topic> | |
agent1.sources.kafka-source.kafka.consumer.group.id = <group id> | |
agent1.sources.kafka-source.kafka.consumer.timeout.ms = 1000 | |
agent1.sources.kafka-source.batchSize = 10000 | |
agent1.sources.kafka-source.batchDurationMillis = 2000 | |
agent1.sources.kafka-source.channels = first-channel second-channel | |
agent1.sources.kafka-source.interceptors = i1 | |
agent1.sources.kafka-source.interceptors.i1.type = regex_extractor | |
agent1.sources.kafka-source.interceptors.i1.regex = .json-key.:.(\\w+). | |
agent1.sources.kafka-source.interceptors.i1.serializers = s1 | |
agent1.sources.kafka-source.interceptors.i1.serializers.s1.name = my_key | |
agent1.sources.kafka-source.selector.type = multiplexing | |
agent1.sources.kafka-source.selector.header = my_key | |
agent1.sources.kafka-source.selector.mapping.<first value> = first-channel | |
agent1.sources.kafka-source.selector.mapping.<second value> = second-channel | |
agent1.channels.first-channel.type = org.apache.flume.channel.kafka.KafkaChannel | |
agent1.channels.first-channel.brokerList = localhost:9092 | |
agent1.channels.first-channel.groupId = <group id> | |
agent1.channels.first-channel.topic = <topic>_first_channel | |
agent1.channels.second-channel.type = org.apache.flume.channel.kafka.KafkaChannel | |
agent1.channels.second-channel.brokerList = localhost:9092 | |
agent1.channels.second-channel.groupId = <group id> | |
agent1.channels.second-channel.topic = <topic>_second_channel | |
agent1.sinks.first-sink.type = hdfs | |
agent1.sinks.first-sink.hdfs.path = <path to folder>/partition_date=%Y%m%d%H | |
agent1.sinks.first-sink.hdfs.rollInterval = 600 | |
agent1.sinks.first-sink.hdfs.rollSize = 102400000 | |
agent1.sinks.first-sink.hdfs.rollCount = 0 | |
agent1.sinks.first-sink.hdfs.round=true | |
agent1.sinks.first-sink.hdfs.roundValue=1 | |
agent1.sinks.first-sink.hdfs.roundUnit=hour | |
agent1.sinks.first-sink.hdfs.fileType = CompressedStream | |
agent1.sinks.first-sink.hdfs.codeC = lz4 | |
agent1.sinks.first-sink.hdfs.writeFormat = Text | |
agent1.sinks.first-sink.hdfs.batchSize = 1000 | |
agent1.sinks.first-sink.hdfs.idleTimeout = 1800 | |
# | |
# `POD_UUID` allows flume to scale when using with kubernetes, docker swarm etc. | |
# Please export this variable in `flume-env.sh` | |
# | |
# $ export POD_UUID=$(uuidgen) | |
# | |
# Than add `-DpropertiesImplementation=org.apache.flume.node.EnvVarResolverProperties` to `JAVA_OPTS` in `flume-env.sh` | |
# | |
# export JAVA_OPTS="${FLUME_OPTS} -XshowSettings:vm -Dorg.apache.flume.log.printconfig=true -DpropertiesImplementation=org.apache.flume.node.EnvVarResolverProperties" | |
# | |
agent1.sinks.first-sink.hdfs.filePrefix = flumeData_${POD_UUID}_%y%m%d_%H%M | |
agent1.sinks.first-sink.channel = <topic>_first_channel | |
agent1.sinks.second-sink.type = hdfs | |
agent1.sinks.second-sink.hdfs.path = <path to folder>/partition_date=%Y%m%d%H | |
agent1.sinks.second-sink.hdfs.rollInterval = 600 | |
agent1.sinks.second-sink.hdfs.rollSize = 102400000 | |
agent1.sinks.second-sink.hdfs.rollCount = 0 | |
agent1.sinks.second-sink.hdfs.round=true | |
agent1.sinks.second-sink.hdfs.roundValue=1 | |
agent1.sinks.second-sink.hdfs.roundUnit=hour | |
agent1.sinks.second-sink.hdfs.fileType = CompressedStream | |
agent1.sinks.second-sink.hdfs.codeC = lz4 | |
agent1.sinks.second-sink.hdfs.writeFormat = Text | |
agent1.sinks.second-sink.hdfs.batchSize = 1000 | |
agent1.sinks.second-sink.hdfs.idleTimeout = 1800 | |
agent1.sinks.second-sink.hdfs.filePrefix = flumeData_${POD_UUID}_%y%m%d_%H%M | |
agent1.sinks.second-sink.channel = <topic>_second_channel | |
agent1.sources = kafka-source | |
agent1.channels = first-channel second-channel | |
agent1.sinks = first-sink second-sink |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment