Skip to content

Instantly share code, notes, and snippets.

@seki
Last active July 6, 2016 09:14
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save seki/d0ce79925fb29653ce204b35576a1af9 to your computer and use it in GitHub Desktop.
Save seki/d0ce79925fb29653ce204b35576a1af9 to your computer and use it in GitHub Desktop.
require 'drip'
class Driprox
def initialize(drip, port, remote_host, remote_port)
@drip = drip
@server = TCPServer.new(port)
@hostname = remote_host
@port = remote_port
@origin = @drip.write(nil, 'start')
end
def reader(soc, conn, kind)
kind_key = "#{kind}=#{conn}"
Thread.new do
while true
begin
data = soc.readpartial(65536)
@drip.write(data, kind_key)
rescue EOFError
@drip.write(nil, kind_key)
# soc.close
return
end
end
end
end
def writer(cursor, soc, conn, kind)
kind_key = "#{kind}=#{conn}"
Thread.new do
while true
cursor, data = @drip.read_tag(cursor, kind_key, 1, 1)[0]
if data
soc.write(data)
else
soc.close
return
end
end
end
end
def invoke_worker(left, conn)
origin = @drip.write(left.peeraddr, "conn=#{conn}")
right = TCPSocket.new(@hostname, @port)
reader(left, conn, 'left')
reader(right, conn, 'right')
writer(origin, right, conn, 'left')
writer(origin, left, conn, 'right')
end
def run
count = 0
while client = @server.accept
count += 1
invoke_worker(client, count)
end
end
def watcher
Thread.new(@origin) do |cursor|
last_tag = nil
time = nil
pending = []
while true
cursor, data, tag = @drip.read(cursor, 1, 1)[0]
if last_tag != tag || data.nil?
p [time, last_tag, pending.size]
str = pending.join('')
puts [str.size, str.inspect[0..50]].join(' ')
time = @drip.key_to_time(cursor)
last_tag = tag
pending = [data].compact
else
pending << data
end
end
end
end
end
prox = Driprox.new(Drip.new('driprox_db'),
8880,
'www.druby.org', 8083)
prox.watcher
prox.run
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment