Skip to content

Instantly share code, notes, and snippets.

@mwlang
Created May 9, 2019 14:03
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mwlang/f5446a497afa46de2783c96d58e99ae7 to your computer and use it in GitHub Desktop.
Save mwlang/f5446a497afa46de2783c96d58e99ae7 to your computer and use it in GitHub Desktop.
require "./query_param_helper"
require "http"
require "json"
class OrderBookEntry
getter last_update_id : Int32
getter price : Float64
getter quantity : Float64
def initialize(uid : Int32, price : Float64, quantity : Float64)
@last_update_id = uid
@price = price
@quantity = quantity
end
def expired?(min_uid : Int32)
min_uid <= last_update_id
end
def update(uid : Int32, new_quantity : Float64)
@last_update_id = uid
@quantity = new_quantity
end
end
class OrderBook
getter market_symbol : String
getter bids : Hash(Float64, OrderBookEntry)
getter asks : Hash(Float64, OrderBookEntry)
getter last_update_id : Int32
def initialize(market_symbol : String)
@last_update_id = 0
@market_symbol = market_symbol
@bids = Hash(Float64, OrderBookEntry).new
@asks = Hash(Float64, OrderBookEntry).new
load_from_endpoint market_symbol, 1000
end
def endpoint
"https://www.binance.com/api/v1/depth"
end
def load_from_endpoint(market_symbol, limit)
params = {
symbol: market_symbol,
limit: limit
}
full_path = QueryParamHelper.set_query_params(endpoint, params)
puts "fetching #{full_path}"
response = HTTP::Client.get(full_path)
data = JSON.parse(response.body)
update(data)
end
def remove_expired_entries(min_id)
@bids.keys.each_pair do |price, ob|
@bids[price].remove if @bids[price].expired?(min_uid)
end
@asks.keys.each_pair do |price, ob|
@asks[price].remove if @asks[price].expired?(min_uid)
end
end
def build_entries(side, uid, entries)
entries.each do |entry|
e = entry.as_a
puts e.inspect
p = e[0].as_s.to_f64
q = e[1].as_s.to_f64
side[p] = OrderBookEntry.new(uid.as_i, p, q)
end
end
def update_entries(side, uid, new_entries)
new_entries.each do |entry|
e = entry.as_a
p = e[0].as_s.to_f64
q = e[1].as_s.to_f64
if q.zero?
side.delete(p)
else
if side[p]?
side[p].update(uid.as_i, q)
else
side[p] = OrderBookEntry.new(uid.as_i, p, q)
end
end
end
end
def update(data : JSON::Any)
if code = data["code"]?
puts "error retrieving #{market_symbol} #{data.inspect}"
elsif uid = data["lastUpdateId"]?
build_entries(bids, uid, data["bids"].as_a)
build_entries(asks, uid, data["asks"].as_a)
else
update_entries(bids, data["U"], data["b"].as_a)
update_entries(asks, data["U"], data["a"].as_a)
end
end
end
class Listener
getter symbol : String
def initialize(symbol : String)
@symbol = symbol.downcase
@channel = Channel(String).new
@order_book = OrderBook.new(symbol)
@open = false
@url = "wss://stream.binance.com:9443/ws/#{@symbol}@depth"
puts "opening #{@url}"
@entries = Hash(String, OrderBookEntry).new
@ws = HTTP::WebSocket.new URI.parse(@url)
@ws.on_message do |message|
@channel.send(message)
end
@ws.on_close do |message|
@channel.send("CLOSED")
end
end
def stopped?
!@open
end
def running?
!!@open
end
def run
@open = true
spawn { @ws.run }
while data = @channel.receive?
if data == "CLOSED"
puts "CLOSED #{@symbol}"
@open = false
break
else
@order_book.update JSON.parse(data)
end
end
end
end
puts "preparing listeners"
# listeners = %w{bnbusdt bnbbtc btcusdt tusdusdt usdcusdt}.map { |symbol| Listener.new symbol }
listeners = %w{btcusdt}.map { |symbol| Listener.new symbol }
puts "starting listeners"
listeners.each { |listener| spawn { listener.run } }
puts "listening..."
loop do
sleep
break if listeners.all? { |listener| listener.stopped? }
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment