Created
May 9, 2019 14:03
-
-
Save mwlang/f5446a497afa46de2783c96d58e99ae7 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require "./query_param_helper" | |
require "http" | |
require "json" | |
class OrderBookEntry | |
getter last_update_id : Int32 | |
getter price : Float64 | |
getter quantity : Float64 | |
def initialize(uid : Int32, price : Float64, quantity : Float64) | |
@last_update_id = uid | |
@price = price | |
@quantity = quantity | |
end | |
def expired?(min_uid : Int32) | |
min_uid <= last_update_id | |
end | |
def update(uid : Int32, new_quantity : Float64) | |
@last_update_id = uid | |
@quantity = new_quantity | |
end | |
end | |
class OrderBook | |
getter market_symbol : String | |
getter bids : Hash(Float64, OrderBookEntry) | |
getter asks : Hash(Float64, OrderBookEntry) | |
getter last_update_id : Int32 | |
def initialize(market_symbol : String) | |
@last_update_id = 0 | |
@market_symbol = market_symbol | |
@bids = Hash(Float64, OrderBookEntry).new | |
@asks = Hash(Float64, OrderBookEntry).new | |
load_from_endpoint market_symbol, 1000 | |
end | |
def endpoint | |
"https://www.binance.com/api/v1/depth" | |
end | |
def load_from_endpoint(market_symbol, limit) | |
params = { | |
symbol: market_symbol, | |
limit: limit | |
} | |
full_path = QueryParamHelper.set_query_params(endpoint, params) | |
puts "fetching #{full_path}" | |
response = HTTP::Client.get(full_path) | |
data = JSON.parse(response.body) | |
update(data) | |
end | |
def remove_expired_entries(min_id) | |
@bids.keys.each_pair do |price, ob| | |
@bids[price].remove if @bids[price].expired?(min_uid) | |
end | |
@asks.keys.each_pair do |price, ob| | |
@asks[price].remove if @asks[price].expired?(min_uid) | |
end | |
end | |
def build_entries(side, uid, entries) | |
entries.each do |entry| | |
e = entry.as_a | |
puts e.inspect | |
p = e[0].as_s.to_f64 | |
q = e[1].as_s.to_f64 | |
side[p] = OrderBookEntry.new(uid.as_i, p, q) | |
end | |
end | |
def update_entries(side, uid, new_entries) | |
new_entries.each do |entry| | |
e = entry.as_a | |
p = e[0].as_s.to_f64 | |
q = e[1].as_s.to_f64 | |
if q.zero? | |
side.delete(p) | |
else | |
if side[p]? | |
side[p].update(uid.as_i, q) | |
else | |
side[p] = OrderBookEntry.new(uid.as_i, p, q) | |
end | |
end | |
end | |
end | |
def update(data : JSON::Any) | |
if code = data["code"]? | |
puts "error retrieving #{market_symbol} #{data.inspect}" | |
elsif uid = data["lastUpdateId"]? | |
build_entries(bids, uid, data["bids"].as_a) | |
build_entries(asks, uid, data["asks"].as_a) | |
else | |
update_entries(bids, data["U"], data["b"].as_a) | |
update_entries(asks, data["U"], data["a"].as_a) | |
end | |
end | |
end | |
class Listener | |
getter symbol : String | |
def initialize(symbol : String) | |
@symbol = symbol.downcase | |
@channel = Channel(String).new | |
@order_book = OrderBook.new(symbol) | |
@open = false | |
@url = "wss://stream.binance.com:9443/ws/#{@symbol}@depth" | |
puts "opening #{@url}" | |
@entries = Hash(String, OrderBookEntry).new | |
@ws = HTTP::WebSocket.new URI.parse(@url) | |
@ws.on_message do |message| | |
@channel.send(message) | |
end | |
@ws.on_close do |message| | |
@channel.send("CLOSED") | |
end | |
end | |
def stopped? | |
!@open | |
end | |
def running? | |
!!@open | |
end | |
def run | |
@open = true | |
spawn { @ws.run } | |
while data = @channel.receive? | |
if data == "CLOSED" | |
puts "CLOSED #{@symbol}" | |
@open = false | |
break | |
else | |
@order_book.update JSON.parse(data) | |
end | |
end | |
end | |
end | |
puts "preparing listeners" | |
# listeners = %w{bnbusdt bnbbtc btcusdt tusdusdt usdcusdt}.map { |symbol| Listener.new symbol } | |
listeners = %w{btcusdt}.map { |symbol| Listener.new symbol } | |
puts "starting listeners" | |
listeners.each { |listener| spawn { listener.run } } | |
puts "listening..." | |
loop do | |
sleep | |
break if listeners.all? { |listener| listener.stopped? } | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment