Skip to content

Instantly share code, notes, and snippets.

@copiousfreetime
Last active February 1, 2021 21:57
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save copiousfreetime/e6ea5c901270706271c763fd2fbd355e to your computer and use it in GitHub Desktop.
Save copiousfreetime/e6ea5c901270706271c763fd2fbd355e to your computer and use it in GitHub Desktop.
Fault Tolerant TCP Client

This is code associated with the blog post ...

#!/usr/bin/env ruby
require 'socket'
require 'zlib'
require 'logger'
require 'fcntl'
def usage
$stderr.puts "#{$0} host port [output-location]"
exit 1
end
class LogFormatter < ::Logger::Formatter
FORMAT = "%s %5d %05s : %s\n".freeze
DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ".freeze
def initialize
super
self.datetime_format = DATETIME_FORMAT
end
def call(severity, time, progname, msg)
FORMAT % [format_datetime(time.utc), Process.pid, severity, msg2str(msg)]
end
end
host = ARGV.shift || usage
port = ARGV.shift.to_i || usage
output = ARGV.shift || "/dev/null"
logger = ::Logger.new($stderr, formatter: LogFormatter.new)
# Create the socket and make it non-blocking since this application is doing other things
# too
socket = ::Socket.tcp(host, port)
socket.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK) # make it non-blocking
logger.info "Socket created"
read_buffer = String.new # A resuable string for use by readpartial
bufsize = 64*1024 # How much maximum data to read from the socket at a go
stop_after = 1024*1024 # 1 megabyte of data
total_bytes = 0
read_count = 0
logger.info "Reading..."
loop do
bytes = socket.readpartial(bufsize, read_buffer)
total_bytes += bytes.bytesize
read_count += 1
break if total_bytes > stop_after
end
logger.info "Stopped after #{total_bytes} bytes read in #{read_count} reads"
#!/usr/bin/env ruby
require 'socket'
require 'zlib'
require 'logger'
require 'fcntl'
def usage
$stderr.puts "#{$0} host port [output-location]"
exit 1
end
class LogFormatter < ::Logger::Formatter
FORMAT = "%s %5d %05s : %s\n".freeze
DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ".freeze
def initialize
super
self.datetime_format = DATETIME_FORMAT
end
def call(severity, time, progname, msg)
FORMAT % [format_datetime(time.utc), Process.pid, severity, msg2str(msg)]
end
end
host = ARGV.shift || usage
port = ARGV.shift.to_i || usage
output_to = ARGV.shift || "-"
logger = ::Logger.new($stderr, formatter: LogFormatter.new)
# Create the socket and make it non-blocking since this application is doing other things
# too
socket = ::Socket.tcp(host, port)
socket.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK) # make it non-blocking
logger.info "Socket created"
bufsize = 64*1024 # How much maximum data to read from the socket at a go
stop_after = 1024*1024 # 1 megabyte of data
total_bytes = 0
read_count = 0
compressed_buffer = String.new # A resuable string for use by readpartial for compressed bytes
inflater = ::Zlib::Inflate.new(::Zlib::MAX_WBITS + 32)
uncompressed_bytes = 0
logger.info "Reading..."
logger.info "Writing to #{output_to}"
output = output_to == "-" ? $stdout : File.open(output_to, "w+")
loop do
socket.readpartial(bufsize, compressed_buffer)
total_bytes += compressed_buffer.bytesize
read_count += 1
uncompressed_buffer = inflater.inflate(compressed_buffer)
uncompressed_bytes += uncompressed_buffer.bytesize
output.write(uncompressed_buffer)
break if total_bytes > stop_after
end
output.close
logger.info "Read #{read_count} times from data source"
logger.info "Received #{total_bytes} of compressed data"
logger.info "Resulting in #{uncompressed_bytes} of decompressed data"
#!/usr/bin/env ruby
require 'socket'
require 'zlib'
require 'logger'
require 'fcntl'
require 'json'
require 'thread'
def usage
$stderr.puts "#{$0} host port [output-location]"
exit 1
end
class LogFormatter < ::Logger::Formatter
FORMAT = "%s %5d %05s : %s\n".freeze
DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ".freeze
def initialize
super
self.datetime_format = DATETIME_FORMAT
end
def call(severity, time, progname, msg)
FORMAT % [format_datetime(time.utc), Process.pid, severity, msg2str(msg)]
end
end
#
# class to read data from an input IO, decompress the data, and write it to an
# output IO. it'll collect stats during the process
#
class Decompressor
attr_reader :input
attr_reader :output
attr_reader :top_after
attr_reader :buffer_size
attr_reader :compressed_bytes
attr_reader :uncompressed_bytes
attr_reader :read_count
def initialize(input:, output:, stop_after: Float::INFINITY)
@input = input
@output = output
@stop_after = stop_after
@buffer_size = 64*1024 # How much maximum data to read from the socket at a go
@compressed_bytes = 0
@uncompressed_bytes = 0
@read_count = 0
end
def call
compressed_buffer = String.new
inflater = ::Zlib::Inflate.new(::Zlib::MAX_WBITS + 32)
loop do
input.readpartial(@buffer_size, compressed_buffer)
@compressed_bytes += compressed_buffer.bytesize
@read_count += 1
uncompressed_buffer = inflater.inflate(compressed_buffer)
@uncompressed_bytes += uncompressed_buffer.bytesize
output.write(uncompressed_buffer)
break if @compressed_bytes > @stop_after
end
output.close
end
end
#
# class to read newlines from an input and write the output parsed object something
# else that responds to `<<`
#
class Parser
attr_reader :item_count
attr_reader :input_bytes
def initialize(input:, output:)
@item_count = 0
@input_bytes = 0
@stop = false
@input = input
@output = output
end
def stop
@stop = true
end
def call
loop do
break if @stop
line = @input.readline
@input_bytes += line.bytesize
event = JSON.parse(line)
@output << event
@item_count += 1
end
end
end
host = ARGV.shift || usage
port = ARGV.shift.to_i || usage
logger = ::Logger.new($stderr, formatter: LogFormatter.new)
# Create the socket and make it non-blocking since this application is doing other things
# too
socket = ::Socket.tcp(host, port)
socket.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK) # make it non-blocking
# Create a pipe to buffer the uncompressed data from the socket so that the text may be
# parsed into newlines.
#
read_io, write_io = IO.pipe
write_io.set_encoding("BINARY") # to handle multibyte-character splitting
stop_after = 1024*1014
# final output to this location
events = Queue.new
# setup the decompressor and parser objects
decompressor = Decompressor.new(input: socket, output: write_io, stop_after: stop_after)
parser = Parser.new(input: read_io, output: events)
# spawn threads for each of the objects
decompressor_thread = Thread.new { decompressor.call }
parser_thread = Thread.new { parser.call }
# spawn a thread to consume all the events from the parser and throw them away
consumed_count = 0
consumer_thread = Thread.new {
loop do
e = events.deq
consumed_count += 1 unless e.nil?
break if events.closed? && events.empty?
end
}
# wait for the decompressor to stop reading from input
decompressor_thread.join
# tell the parser to stop
parser.stop
parser_thread.join
# close the events queue so the consumer thread will drain the queue and stop
events.close
consumer_thread.join
logger.info "Decompressor: read #{decompressor.read_count} times"
logger.info "Decompressor: received #{decompressor.compressed_bytes} bytes"
logger.info "Decompressor: forwarded on #{decompressor.uncompressed_bytes} bytes"
logger.info "Parser : received #{parser.input_bytes}"
logger.info "Parser : forwarded on #{parser.item_count} events"
logger.info "Consumer : threw away #{consumed_count} events"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment