Skip to content

Instantly share code, notes, and snippets.

@mwlang
Last active May 16, 2019 16:33
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mwlang/57876320390c899be690a5a79eed8125 to your computer and use it in GitHub Desktop.
Save mwlang/57876320390c899be690a5a79eed8125 to your computer and use it in GitHub Desktop.
require "http"
class Listener
getter symbol : String
def initialize(@symbol : String)
@channel = Channel(String).new
@open = false
@url = "wss://stream.binance.com:9443/ws/#{@symbol.downcase}@depth"
puts "opening #{@symbol}"
@ws = HTTP::WebSocket.new URI.parse(@url)
@ws.on_message do |message|
@channel.send(message)
end
@ws.on_close do |message|
@channel.send("CLOSED")
end
end
def stopped?
!@open
end
def run
spawn { @ws.run }
@open = true
while data = @channel.receive? # how to timeout here?
if data == "CLOSED"
@open = false
puts "CLOSED #{@symbol.upcase}"
sleep(30) # pause this fiber so we can catch whether all listeners were closed
break # how to exit/terminate the fiber here?
else
puts data
end
end
end
end
markets = ["BTCUSDC", "TUSDBTC", "USDCUSDT"]
puts "preparing listeners"
listeners = markets.map { |market_symbol| Listener.new market_symbol }
puts "starting listeners"
listeners.each { |listener| spawn { listener.run } }
puts "listening..."
loop do
sleep
# if every listener is stopped, we'll just exit the program
# attempting to sleep(30) in the fiber that closed, but doesn't work as intended.
break if listeners.all? { |listener| listener.stopped? }
# if just some stopped, attempt to restart
listeners.select{ |l| l.stopped? }.each{ |l| spawn { l.run } }
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment