-
-
Save anonymous/7b45de85bf669a8401f8 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
## Sources ################################################### | |
agent.sources = http | |
agent.sources.http.type = http | |
agent.sources.http.bind = 0.0.0.0 | |
agent.sources.http.port = <%= @node[:flume][:http_source_port] %> | |
agent.sources.http.channels = <%= node[:flume][:channels].map { |c| "#{c}_channel" }.join(" ") %> | |
## Interceptors ################################################# | |
agent.sources.http.interceptors = itime ihost | |
agent.sources.http.interceptors.itime.type = timestamp | |
agent.sources.http.interceptors.ihost.type = host | |
agent.sources.http.interceptors.ihost.useIP = false | |
agent.sources.http.interceptors.ihost.preserveExisting = false | |
agent.sources.http.interceptors.ihost.hostHeader = hostname | |
agent.sources.syslogudp.interceptors = itime ihost | |
agent.sources.syslogudp.interceptors.itime.type = timestamp | |
agent.sources.syslogudp.interceptors.ihost.type = host | |
agent.sources.syslogudp.interceptors.ihost.useIP = false | |
agent.sources.syslogudp.interceptors.ihost.preserveExisting = false | |
agent.sources.syslogudp.interceptors.ihost.hostHeader = hostname | |
## Multiplex Channels Mapping ###################################### | |
agent.sources.http.selector.type = multiplexing | |
agent.sources.http.selector.header = client_id | |
<%- node[:flume][:channels].each do |channel| %> | |
agent.sources.http.selector.mapping.<%= channel %> = <%= channel %>_channel | |
<% end %> | |
## Channels ######################################################## | |
agent.channels = <%= node[:flume][:channels].map { |c| "#{c}_channel" }.join(" ") %> | |
<%- node[:flume][:channels].each do |channel| %> | |
agent.channels.<%= channel %>_channel.type = <%= @node[:flume][:channel_type] %> | |
agent.channels.<%= channel %>_channel.capacity = <%= @node[:flume][:channel_capacity] %> | |
agent.channels.<%= channel %>_channel.checkpointDir = <%= File.join(node[:flume][:base_dir], channel, 'checkpoint') %> | |
agent.channels.<%= channel %>_channel.dataDirs = <%= File.join(node[:flume][:base_dir], channel, 'data') %> | |
<% end %> | |
## Sinks ########################################################### | |
agent.sinks = <%= node[:flume][:channels].map { |c| "#{c}_sink1 #{c}_sink2" }.join(" ") %> | |
## Serialize json events from the pipeline and write csv to HDFS (We are using s3 native FS as HDFS) | |
############################################################################### | |
<%- node[:flume][:channels].reject { |x| x == :default }.each do |channel| %> | |
## <%= channel.to_s.capitalize %> Channel and Sinks ################################################# | |
<%= 1.upto(2) do |i| %> | |
agent.sinks.<%= channel %>_sink<%= i %>.type = hdfs | |
agent.sinks.<%= channel %>_sink<%= i %>.channel = <%= channel %>_channel | |
agent.sinks.<%= channel %>_sink<%= i %>.hdfs.path = s3n://mydrive-<%= channel %>-reports/driver-profiles/%Y-%m-%d/ | |
agent.sinks.<%= channel %>_sink<%= i %>.hdfs.fileType = DataStream | |
agent.sinks.<%= channel %>_sink<%= i %>.hdfs.writeFormat = Text | |
agent.sinks.<%= channel %>_sink<%= i %>.serializer = com.mydrivesolutions.flume.serialization.HeaderAndBodyTextEventSerializer$Builder | |
agent.sinks.<%= channel %>_sink<%= i %>.serializer.columns = client_id subscription_id | |
agent.sinks.<%= channel %>_sink<%= i %>.serializer.format = DriverProfile | |
agent.sinks.<%= channel %>_sink<%= i %>.serializer.delimiter = , | |
agent.sinks.<%= channel %>_sink<%= i %>.serializer.appendNewline = false | |
agent.sinks.<%= channel %>_sink<%= i %>.serializer.distanceMeasure = MILES | |
agent.sinks.<%= channel %>_sink<%= i %>.hdfs.maxOpenFiles = 5000 | |
agent.sinks.<%= channel %>_sink<%= i %>.hdfs.rollSize = 0 | |
agent.sinks.<%= channel %>_sink<%= i %>.hdfs.rollCount = 0 | |
agent.sinks.<%= channel %>_sink<%= i %>.hdfs.rollInterval = 900 | |
agent.sinks.<%= channel %>_sink<%= i %>.hdfs.callTimeout = 60000 | |
agent.sinks.<%= channel %>_sink<%= i %>.hdfs.fileSuffix = <%= @node[:flume][:hdfs_file_suffix] %> | |
agent.sinks.<%= channel %>_sink<%= i %>.hdfs.inUseSuffix = <%= @node[:flume][:hdfs_in_use_suffix] %> | |
agent.sinks.<%= channel %>_sink<%= i %>.hdfs.filePrefix = DriverProfile.%y-%m-%d.%H.%M | |
<% end %> | |
<% end -%> | |
################################################################################## | |
### Default Channel and Sinks #################################################### | |
agent.sinks.default_s3_sink1.type = hdfs | |
agent.sinks.default_s3_sink1.channel = default_s3_channel | |
agent.sinks.default_s3_sink1.hdfs.path = s3n://mydrive-default-reports/driver-profiles/%{client_id}/%Y-%m-%d/ | |
agent.sinks.default_s3_sink1.hdfs.fileType = DataStream | |
agent.sinks.default_s3_sink1.hdfs.writeFormat = Text | |
agent.sinks.default_s3_sink1.serializer = com.mydrivesolutions.flume.serialization.HeaderAndBodyTextEventSerializer$Builder | |
agent.sinks.default_s3_sink1.serializer.columns = client_id subscription_id | |
agent.sinks.default_s3_sink1.serializer.format = DriverProfile | |
agent.sinks.default_s3_sink1.serializer.delimiter = , | |
agent.sinks.default_s3_sink1.serializer.appendNewline = false | |
agent.sinks.default_s3_sink1.serializer.distanceMeasure = MILES | |
agent.sinks.default_s3_sink1.hdfs.maxOpenFiles = 5000 | |
agent.sinks.default_s3_sink1.hdfs.rollSize = 0 | |
agent.sinks.default_s3_sink1.hdfs.rollCount = 0 | |
agent.sinks.default_s3_sink1.hdfs.rollInterval = 900 | |
agent.sinks.default_s3_sink1.hdfs.callTimeout = 60000 | |
agent.sinks.default_s3_sink1.hdfs.fileSuffix = <%= @node[:flume][:hdfs_file_suffix] %> | |
agent.sinks.default_s3_sink1.hdfs.inUseSuffix = <%= @node[:flume][:hdfs_in_use_suffix] %> | |
agent.sinks.default_s3_sink1.hdfs.filePrefix = DriverProfile.%y-%m-%d.%H.%M | |
agent.sinks.default_s3_sink2.type = hdfs | |
agent.sinks.default_s3_sink2.channel = default_s3_channel | |
agent.sinks.default_s3_sink2.hdfs.path = s3n://mydrive-default-reports/driver-profiles/%{client_id}/%Y-%m-%d/ | |
agent.sinks.default_s3_sink2.hdfs.fileType = DataStream | |
agent.sinks.default_s3_sink2.hdfs.writeFormat = Text | |
agent.sinks.default_s3_sink2.serializer = com.mydrivesolutions.flume.serialization.HeaderAndBodyTextEventSerializer$Builder | |
agent.sinks.default_s3_sink2.serializer.columns = client_id subscription_id | |
agent.sinks.default_s3_sink2.serializer.format = DriverProfile | |
agent.sinks.default_s3_sink2.serializer.delimiter = , | |
agent.sinks.default_s3_sink2.serializer.appendNewline = false | |
agent.sinks.default_s3_sink2.serializer.distanceMeasure = MILES | |
agent.sinks.default_s3_sink2.hdfs.maxOpenFiles = 5000 | |
agent.sinks.default_s3_sink2.hdfs.rollSize = 0 | |
agent.sinks.default_s3_sink2.hdfs.rollCount = 0 | |
agent.sinks.default_s3_sink2.hdfs.rollInterval = 900 | |
agent.sinks.default_s3_sink2.hdfs.callTimeout = 60000 | |
agent.sinks.default_s3_sink2.hdfs.fileSuffix = <%= @node[:flume][:hdfs_file_suffix] %> | |
agent.sinks.default_s3_sink2.hdfs.inUseSuffix = <%= @node[:flume][:hdfs_in_use_suffix] %> | |
agent.sinks.default_s3_sink2.hdfs.filePrefix = DriverProfile.%y-%m-%d.%H.%M | |
## SinkGroups ########################################################### | |
agent.sinkgroups = <%= node[:flume][:channels].map { |c| "#{c}_sinkgroup" }.join(" ") %> | |
<%- node[:flume][:channels].each do |channel| %> | |
## <%= channel.to_s.capitalize %> Failover SinkGroup ########################################## | |
agent.sinkgroups.<%= channel %>_sinkgroup.sinks = <%= "#{channel}_sink1 #{channel}_sink2" %> | |
agent.sinkgroups.<%= channel %>_sinkgroup.processor.type = failover | |
agent.sinkgroups.<%= channel %>_sinkgroup.processor.priority.<%= channel %>_sink1 = 10 | |
agent.sinkgroups.<%= channel %>_sinkgroup.processor.priority.<%= channel %>_sink2 = 5 | |
agent.sinkgroups.<%= channel %>_sinkgroup.processor.maxpenalty = 30000 | |
<%- end %> |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment