Skip to content

Instantly share code, notes, and snippets.

@millerjs
Last active July 19, 2021 19:41
Show Gist options
  • Save millerjs/8ab3a4ce63c4502da4c34db017ae417d to your computer and use it in GitHub Desktop.
Save millerjs/8ab3a4ce63c4502da4c34db017ae417d to your computer and use it in GitHub Desktop.
Sidekiq Unique Jobs Disappearing job
require 'socket'
# pulled from https://hpyblg.wordpress.com/2010/07/18/a-tcp-proxy-in-ruby/
LATENCY = 18 / 1000.0 # ms
if ARGV.length < 1
$stderr.puts "Usage: #{$0} remoteHost:remotePort [ localPort [ localHost ] ]"
exit 1
end
$remoteHost, $remotePort = ARGV.shift.split(":")
puts "target address: #{$remoteHost}:#{$remotePort}"
localPort = ARGV.shift || $remotePort
localHost = ARGV.shift
$blockSize = 1024
server = TCPServer.open(localHost, localPort)
port = server.addr[1]
addrs = server.addr[2..-1].uniq
puts "*** listening on #{addrs}, port #{port}"
# abort on exceptions, otherwise threads will be silently killed in case
# of unhandled exceptions
Thread.abort_on_exception = true
# have a thread just to process Ctrl-C events on Windows
# (although Ctrl-Break always works)
Thread.new { loop { sleep 1 } }
def connThread(local)
port, name = local.peeraddr[1..2]
puts "*** receiving from #{name}:#{port}"
# open connection to remote server
remote = TCPSocket.new($remoteHost, $remotePort)
# start reading from both ends
loop do
ready = select([local, remote], nil, nil)
if ready[0].include? local
# local -> remote
data = local.recv($blockSize)
puts data
if data.empty?
puts "local end closed connection"
break
end
remote.write(data)
end
if ready[0].include? remote
# remote -&gt; local
data = remote.recv($blockSize)
sleep(LATENCY)
puts data
if data.empty?
puts "remote end closed connection"
break
end
local.write(data)
end
end
local.close
remote.close
puts "*** done with #{name}:#{port}"
end
loop do
# whenever server.accept returns a new connection, start
# a handler thread for that connection
Thread.start(server.accept) { |local| connThread(local) }
end
class TestWorker
include Sidekiq::Worker
def perform(timestamp)
Rails.logger.info("performing")
File.write('/tmp/timestamp', timestamp)
sleep(10)
end
end
class UniqueUntilAndWhileExecutingTestWorker < TestWorker
sidekiq_options queue: :medium, lock: :until_and_while_executing
end
class UniqueUntilExecutingTestWorker < TestWorker
sidekiq_options queue: :medium, lock: :until_executing
end
class UniqueUntilExecutedTestWorker < TestWorker
sidekiq_options queue: :medium, lock: :until_executed
end
def test(worker)
timestamp = Time.now.to_i.to_s
p worker.perform_async(timestamp)
puts "check #{timestamp}?"
gets
p File.read('/tmp/timestamp') == timestamp
end
test(TestWorker)
test(UniqueUntilAndWhileExecutingTestWorker)
test(UniqueUntilExecutingTestWorker)
test(UniqueUntilExecutedTestWorker)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment