Skip to content

Instantly share code, notes, and snippets.

Created June 6, 2013 08:58
Show Gist options
  • Save anonymous/7b45de85bf669a8401f8 to your computer and use it in GitHub Desktop.
Save anonymous/7b45de85bf669a8401f8 to your computer and use it in GitHub Desktop.
## Sources ###################################################
agent.sources = http
agent.sources.http.type = http
agent.sources.http.bind = 0.0.0.0
agent.sources.http.port = <%= @node[:flume][:http_source_port] %>
agent.sources.http.channels = <%= node[:flume][:channels].map { |c| "#{c}_channel" }.join(" ") %>
## Interceptors #################################################
agent.sources.http.interceptors = itime ihost
agent.sources.http.interceptors.itime.type = timestamp
agent.sources.http.interceptors.ihost.type = host
agent.sources.http.interceptors.ihost.useIP = false
agent.sources.http.interceptors.ihost.preserveExisting = false
agent.sources.http.interceptors.ihost.hostHeader = hostname
agent.sources.syslogudp.interceptors = itime ihost
agent.sources.syslogudp.interceptors.itime.type = timestamp
agent.sources.syslogudp.interceptors.ihost.type = host
agent.sources.syslogudp.interceptors.ihost.useIP = false
agent.sources.syslogudp.interceptors.ihost.preserveExisting = false
agent.sources.syslogudp.interceptors.ihost.hostHeader = hostname
## Multiplex Channels Mapping ######################################
agent.sources.http.selector.type = multiplexing
agent.sources.http.selector.header = client_id
<%- node[:flume][:channels].each do |channel| %>
agent.sources.http.selector.mapping.<%= channel %> = <%= channel %>_channel
<% end %>
## Channels ########################################################
agent.channels = <%= node[:flume][:channels].map { |c| "#{c}_channel" }.join(" ") %>
<%- node[:flume][:channels].each do |channel| %>
agent.channels.<%= channel %>_channel.type = <%= @node[:flume][:channel_type] %>
agent.channels.<%= channel %>_channel.capacity = <%= @node[:flume][:channel_capacity] %>
agent.channels.<%= channel %>_channel.checkpointDir = <%= File.join(node[:flume][:base_dir], channel, 'checkpoint') %>
agent.channels.<%= channel %>_channel.dataDirs = <%= File.join(node[:flume][:base_dir], channel, 'data') %>
<% end %>
## Sinks ###########################################################
agent.sinks = <%= node[:flume][:channels].map { |c| "#{c}_sink1 #{c}_sink2" }.join(" ") %>
## Serialize json events from the pipeline and write csv to HDFS (We are using s3 native FS as HDFS)
###############################################################################
<%- node[:flume][:channels].reject { |x| x == :default }.each do |channel| %>
## <%= channel.to_s.capitalize %> Channel and Sinks #################################################
<%= 1.upto(2) do |i| %>
agent.sinks.<%= channel %>_sink<%= i %>.type = hdfs
agent.sinks.<%= channel %>_sink<%= i %>.channel = <%= channel %>_channel
agent.sinks.<%= channel %>_sink<%= i %>.hdfs.path = s3n://mydrive-<%= channel %>-reports/driver-profiles/%Y-%m-%d/
agent.sinks.<%= channel %>_sink<%= i %>.hdfs.fileType = DataStream
agent.sinks.<%= channel %>_sink<%= i %>.hdfs.writeFormat = Text
agent.sinks.<%= channel %>_sink<%= i %>.serializer = com.mydrivesolutions.flume.serialization.HeaderAndBodyTextEventSerializer$Builder
agent.sinks.<%= channel %>_sink<%= i %>.serializer.columns = client_id subscription_id
agent.sinks.<%= channel %>_sink<%= i %>.serializer.format = DriverProfile
agent.sinks.<%= channel %>_sink<%= i %>.serializer.delimiter = ,
agent.sinks.<%= channel %>_sink<%= i %>.serializer.appendNewline = false
agent.sinks.<%= channel %>_sink<%= i %>.serializer.distanceMeasure = MILES
agent.sinks.<%= channel %>_sink<%= i %>.hdfs.maxOpenFiles = 5000
agent.sinks.<%= channel %>_sink<%= i %>.hdfs.rollSize = 0
agent.sinks.<%= channel %>_sink<%= i %>.hdfs.rollCount = 0
agent.sinks.<%= channel %>_sink<%= i %>.hdfs.rollInterval = 900
agent.sinks.<%= channel %>_sink<%= i %>.hdfs.callTimeout = 60000
agent.sinks.<%= channel %>_sink<%= i %>.hdfs.fileSuffix = <%= @node[:flume][:hdfs_file_suffix] %>
agent.sinks.<%= channel %>_sink<%= i %>.hdfs.inUseSuffix = <%= @node[:flume][:hdfs_in_use_suffix] %>
agent.sinks.<%= channel %>_sink<%= i %>.hdfs.filePrefix = DriverProfile.%y-%m-%d.%H.%M
<% end %>
<% end -%>
##################################################################################
### Default Channel and Sinks ####################################################
agent.sinks.default_s3_sink1.type = hdfs
agent.sinks.default_s3_sink1.channel = default_s3_channel
agent.sinks.default_s3_sink1.hdfs.path = s3n://mydrive-default-reports/driver-profiles/%{client_id}/%Y-%m-%d/
agent.sinks.default_s3_sink1.hdfs.fileType = DataStream
agent.sinks.default_s3_sink1.hdfs.writeFormat = Text
agent.sinks.default_s3_sink1.serializer = com.mydrivesolutions.flume.serialization.HeaderAndBodyTextEventSerializer$Builder
agent.sinks.default_s3_sink1.serializer.columns = client_id subscription_id
agent.sinks.default_s3_sink1.serializer.format = DriverProfile
agent.sinks.default_s3_sink1.serializer.delimiter = ,
agent.sinks.default_s3_sink1.serializer.appendNewline = false
agent.sinks.default_s3_sink1.serializer.distanceMeasure = MILES
agent.sinks.default_s3_sink1.hdfs.maxOpenFiles = 5000
agent.sinks.default_s3_sink1.hdfs.rollSize = 0
agent.sinks.default_s3_sink1.hdfs.rollCount = 0
agent.sinks.default_s3_sink1.hdfs.rollInterval = 900
agent.sinks.default_s3_sink1.hdfs.callTimeout = 60000
agent.sinks.default_s3_sink1.hdfs.fileSuffix = <%= @node[:flume][:hdfs_file_suffix] %>
agent.sinks.default_s3_sink1.hdfs.inUseSuffix = <%= @node[:flume][:hdfs_in_use_suffix] %>
agent.sinks.default_s3_sink1.hdfs.filePrefix = DriverProfile.%y-%m-%d.%H.%M
agent.sinks.default_s3_sink2.type = hdfs
agent.sinks.default_s3_sink2.channel = default_s3_channel
agent.sinks.default_s3_sink2.hdfs.path = s3n://mydrive-default-reports/driver-profiles/%{client_id}/%Y-%m-%d/
agent.sinks.default_s3_sink2.hdfs.fileType = DataStream
agent.sinks.default_s3_sink2.hdfs.writeFormat = Text
agent.sinks.default_s3_sink2.serializer = com.mydrivesolutions.flume.serialization.HeaderAndBodyTextEventSerializer$Builder
agent.sinks.default_s3_sink2.serializer.columns = client_id subscription_id
agent.sinks.default_s3_sink2.serializer.format = DriverProfile
agent.sinks.default_s3_sink2.serializer.delimiter = ,
agent.sinks.default_s3_sink2.serializer.appendNewline = false
agent.sinks.default_s3_sink2.serializer.distanceMeasure = MILES
agent.sinks.default_s3_sink2.hdfs.maxOpenFiles = 5000
agent.sinks.default_s3_sink2.hdfs.rollSize = 0
agent.sinks.default_s3_sink2.hdfs.rollCount = 0
agent.sinks.default_s3_sink2.hdfs.rollInterval = 900
agent.sinks.default_s3_sink2.hdfs.callTimeout = 60000
agent.sinks.default_s3_sink2.hdfs.fileSuffix = <%= @node[:flume][:hdfs_file_suffix] %>
agent.sinks.default_s3_sink2.hdfs.inUseSuffix = <%= @node[:flume][:hdfs_in_use_suffix] %>
agent.sinks.default_s3_sink2.hdfs.filePrefix = DriverProfile.%y-%m-%d.%H.%M
## SinkGroups ###########################################################
agent.sinkgroups = <%= node[:flume][:channels].map { |c| "#{c}_sinkgroup" }.join(" ") %>
<%- node[:flume][:channels].each do |channel| %>
## <%= channel.to_s.capitalize %> Failover SinkGroup ##########################################
agent.sinkgroups.<%= channel %>_sinkgroup.sinks = <%= "#{channel}_sink1 #{channel}_sink2" %>
agent.sinkgroups.<%= channel %>_sinkgroup.processor.type = failover
agent.sinkgroups.<%= channel %>_sinkgroup.processor.priority.<%= channel %>_sink1 = 10
agent.sinkgroups.<%= channel %>_sinkgroup.processor.priority.<%= channel %>_sink2 = 5
agent.sinkgroups.<%= channel %>_sinkgroup.processor.maxpenalty = 30000
<%- end %>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment