Skip to content

Instantly share code, notes, and snippets.

@cschneid
Last active August 29, 2015 13:57
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save cschneid/9792305 to your computer and use it in GitHub Desktop.
Save cschneid/9792305 to your computer and use it in GitHub Desktop.

Single Mode:

i: 0 i: 500 i: 1000 i: 1500 i: 2000 i: 2500 i: 3000 i: 3500 i: 4000 i: 4500

Duration: 3.952875

Batched Mode:

i: 0 i: 500 i: 1000 i: 1500 i: 2000 i: 2500 i: 3000 i: 3500 i: 4000 i: 4500

Duration: 0.644725

ruby send_event.rb 1.58s user 0.55s system 22% cpu 9.246 total

require 'rubygems'
$: << "."
$: << "./gen-rb"
require "flume_constants"
require "flume_types"
require "thrift_source_protocol"
class FlumeStuffer
def initialize
open_connection
end
def open_connection
@transport = Thrift::FramedTransport.new(Thrift::Socket.new('battleschool-dev', 9876))
@protocol = Thrift::CompactProtocol.new(@transport)
@client = ThriftSourceProtocol::Client.new(@protocol)
@transport.open
end
def stuff(messages)
begin
@client.appendBatch(messages)
rescue
puts "Failed! ohh no! Retrying: #{$!}"
open_connection
stuff(messages)
end
end
def stuffSingle(message)
begin
@client.append(message)
rescue
puts "Failed! ohh no! Retrying: #{$!}"
open_connection
stuffSingle(message)
end
end
end
stuffer = FlumeStuffer.new
t1 = Time.now
puts "Single Mode:"
puts
5000.times do |i|
if i % 500 == 0
puts "i: #{i}"
end
message = ThriftFlumeEvent.new
message.headers = {}
message.body = <<-BODY
{"subject": "observations:76a85c15-0f0c-4ac7-b268-b5c7e18e3f30", "property": "tags", "asserted_at": "2014-03-25T16:58:27Z", "value": ["temperature"], "nonce": "VGGtgvijEqDUztAKqv66uw"}
BODY
stuffer.stuffSingle(message)
end
t2 = Time.now
duration = t2 - t1
puts "Duration: #{duration}"
puts
puts "========================="
puts
t1 = Time.now
puts "Batched Mode:"
puts
500.times do |i|
if i % 50 == 0
puts "i: #{i * 10}"
end
messages = (1..10).to_a.map{
message = ThriftFlumeEvent.new
message.headers = {}
message.body = <<-BODY
{"subject": "observations:76a85c15-0f0c-4ac7-b268-b5c7e18e3f30", "property": "tags", "asserted_at": "2014-03-25T16:58:27Z", "value": ["temperature"], "nonce": "VGGtgvijEqDUztAKqv66uw"}
BODY
message
}
stuffer.stuff(messages)
end
t2 = Time.now
duration = t2 - t1
puts "Duration: #{duration}"
# Channels
master-dataset-recorder.channels = memory-channel
master-dataset-recorder.channels.memory-channel.type = memory
master-dataset-recorder.channels.memory-channel.capacity = 20000
master-dataset-recorder.channels.memory-channel.transactionCapacity = 20000
# netcat source
# master-dataset-recorder.sources = netcat-src
# master-dataset-recorder.sources.netcat-src.type = netcat
# master-dataset-recorder.sources.netcat-src.bind = 0.0.0.0
# master-dataset-recorder.sources.netcat-src.port = 44444
# master-dataset-recorder.sources.netcat-src.channels = memory-channel
# thrift source
master-dataset-recorder.sources = thrift-src
master-dataset-recorder.sources.thrift-src.type = thrift
master-dataset-recorder.sources.thrift-src.bind = 0.0.0.0
master-dataset-recorder.sources.thrift-src.port = 9876
master-dataset-recorder.sources.thrift-src.channels = memory-channel
# logger sink
# master-dataset-recorder.sinks = logger-sink
# master-dataset-recorder.sinks.logger-sink.type = logger
# master-dataset-recorder.sinks.logger-sink.channel = memory-channel
# null sink
master-dataset-recorder.sinks = null-sink
master-dataset-recorder.null-sink.type = null
master-dataset-recorder.sinks.null-sink.channel = memory-channel
# hdfs sink
# master-dataset-recorder.sinks = hdfs-sink
# master-dataset-recorder.sinks.hdfs-sink.type = hdfs
# master-dataset-recorder.sinks.hdfs-sink.hdfs.fileType = SequenceFile
# master-dataset-recorder.sinks.hdfs-sink.hdfs.writeFormat = Text
# master-dataset-recorder.sinks.hdfs-sink.hdfs.rollInterval = 30
# master-dataset-recorder.sinks.hdfs-sink.hdfs.rollCount = 0
# master-dataset-recorder.sinks.hdfs-sink.hdfs.rollSize = 65000000
# master-dataset-recorder.sinks.hdfs-sink.hdfs.filePrefix = Facts
# master-dataset-recorder.sinks.hdfs-sink.hdfs.batchSize = 10000
# master-dataset-recorder.sinks.hdfs-sink.hdfs.txnEventMax = 10000
# master-dataset-recorder.sinks.hdfs-sink.hdfs.callTimeout = 90000
# master-dataset-recorder.sinks.hdfs-sink.hdfs.threadsPoolSize
# master-dataset-recorder.sinks.hdfs-sink.hdfs.path=hdfs://battleschool-dev:9000/advanced-apps/master-dataset/
# master-dataset-recorder.sinks.hdfs-sink.channel = memory-channel
@cschneid
Copy link
Author

Reenabling the HDFS sink approximatly halves the overall speed of ingest - it changes the timings to:

Single Mode:

i: 0
i: 500
i: 1000
i: 1500
i: 2000
i: 2500
i: 3000
i: 3500
i: 4000
i: 4500
Duration: 7.725536

Batched Mode:

i: 0
i: 500
i: 1000
i: 1500
i: 2000
i: 2500
i: 3000
i: 3500
i: 4000
i: 4500
Duration: 0.915852
ruby send_event.rb 0.92s user 0.34s system 13% cpu 9.169 total

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment