Instantly share code, notes, and snippets.

@esplo /bf_ws_rust.rs Secret
Created Dec 5, 2018

Embed
What would you like to do?
extern crate chrono;
extern crate env_logger;
extern crate serde;
#[macro_use]
extern crate serde_derive;
extern crate serde_json;
extern crate ws;
use std::fmt;
use chrono::prelude::*;
use serde_json::Error as SJError;
use std::error::Error;
use std::fs::File;
use std::fs::OpenOptions;
use std::io::Write;
use ws::Result as WSResult;
use ws::{connect, Handshake, Handler, Message, Sender};
#[derive(Serialize, Deserialize, Debug)]
pub struct TradeMessage {
id: u64,
pub side: String,
pub price: u32,
pub size: f32,
pub exec_date: String,
buy_child_order_acceptance_id: String,
sell_child_order_acceptance_id: String,
}
impl fmt::Display for TradeMessage {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
serde_json::to_string(self)
.map_err(|_| fmt::Error)
.and_then(|j| write!(f, "{}", j))
}
}
#[derive(Serialize, Deserialize, Debug)]
pub struct MessageParams {
channel: String,
pub message: Vec<TradeMessage>,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct ReceivedMessage {
jsonrpc: String,
method: String,
pub params: MessageParams,
}
struct MyHandler<'a> {
counter: usize,
_out: Sender,
file: &'a File,
}
impl<'a> MyHandler<'a> {
fn new(out: Sender, file: &'a File) -> MyHandler<'a> {
MyHandler {
counter: 0,
_out: out,
file,
}
}
}
impl<'a> Handler for MyHandler<'a> {
fn on_open(&mut self, _: Handshake) -> WSResult<()> {
let s = r#"{"jsonrpc": "2.0", "method": "subscribe", "params": {"channel": "lightning_executions_FX_BTC_JPY"}}"#;
self._out.send(s)
}
fn on_message(&mut self, msg: Message) -> WSResult<()> {
self.counter += 1;
if self.counter >= 10 {
self.counter = 0;
let cur_time = Utc::now();
let p: Result<ReceivedMessage, SJError> = serde_json::from_str(msg.as_text().unwrap());
match p {
Ok(v) => {
let ts = v.params.message[0].exec_date.parse::<DateTime<Utc>>().unwrap();
let delta_time = cur_time.signed_duration_since(ts);
let delta_ms = delta_time.num_microseconds().unwrap() as f64 / 1000.0;
let processed_time = Utc::now().signed_duration_since(cur_time);
let processed_ms =
processed_time.num_nanoseconds().unwrap() as f64 / 1000.0 / 1000.0;
writeln!(
self.file,
"{},{},{}",
cur_time.timestamp_millis() as f64 / 1000.0,
delta_ms,
processed_ms
);
self.file.sync_all()?;
}
Err(e) => println!("error {}", e),
}
}
Ok(())
}
}
fn main() {
env_logger::init();
let file = match OpenOptions::new()
.write(true)
.create(true)
.append(true)
.open("/tmp/latency/calc.txt")
{
Err(why) => panic!("couldn't open: {}", why.description()),
Ok(file) => file,
};
if let Err(error) = connect(
"wss://ws.lightstream.bitflyer.com/json-rpc",
|out| MyHandler::new(out, &file),
) {
panic!("Failed to create WebSocket due to: {:?}", error);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment