Skip to content

Instantly share code, notes, and snippets.

@chrishulbert
Created August 27, 2023 13:15
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save chrishulbert/07f171532fd4d653a82f403abe547c02 to your computer and use it in GitHub Desktop.
Save chrishulbert/07f171532fd4d653a82f403abe547c02 to your computer and use it in GitHub Desktop.
Interactive Brokers TWS API in Rust
// MIT licensed.
use std::net::TcpStream;
use std::io::{Read, Write};
use std::sync::mpsc::channel;
use std::thread;
fn main() {
let mut stream = TcpStream::connect("127.0.0.1:7496").expect("connect");
stream.set_nodelay(true).expect("nodelay"); // Because we're wannabe HFT traders.
println!("Connected");
send_greeting(&mut stream);
println!("Greeting sent");
let greeting_response = read_message(&mut stream);
println!("Greeting response: {:?}", greeting_response);
send_start_api(&mut stream, 123456);
let mut writer = stream.try_clone().expect("Clone");
let mut reader = stream;
let (writer_tx, writer_rx) = channel::<Vec<String>>();
let reader_handle = thread::Builder::new().name("Reader".into()).spawn(move || {
loop {
let message = read_message(&mut reader);
message_received(&message);
}
}).expect("Spawn");
let writer_handle = thread::Builder::new().name("Writer".into()).spawn(move || {
loop {
let fields = writer_rx.recv().expect("Writer queue receive");
let message = make_message(&fields);
println!("Sending {:?} aka {:?} aka {:?}",
fields, String::from_utf8_lossy(&message), message);
writer.write_all(&message).expect("Write all");
}
}).expect("Spawn");
// Post-handshake delay or TWS bugs out.
thread::sleep(std::time::Duration::from_millis(500));
let request_market_data: Vec<String> = vec![
(OutgoingRequestType::ReqMktData as u32).to_string(),
"11".to_string(), // Version
"999".to_string(), // Request id aka Ticker id
"".to_string(), // Contract id
"BTC".to_string(), // Symbol
"CRYPTO".to_string(), // Security type
"".to_string(), // LastTradeDateOrContractMonth
"".to_string(), // strike
"".to_string(), // right
"".to_string(), // multiplier
"PAXOS".to_string(), // exchange
"".to_string(), // primaryExchange
"USD".to_string(), // currency
"".to_string(), // localSymbol
"".to_string(), // tradingClass
"0".to_string(), // Delta neutral contract? 0=false, 1=true
"".to_string(), // genericTickList
"0".to_string(), // is a snapshot?
"0".to_string(), // regulatorySnapshot - costs 1c ea.
"".to_string(), // mktDataOptions - unsupported.
];
writer_tx.send(request_market_data).expect("Writer queue send");
writer_handle.join().unwrap();
reader_handle.join().unwrap();
}
fn add_length_prefix(bytes: &[u8]) -> Vec<u8> {
let len = bytes.len() as u32;
let mut data: Vec<u8> = Vec::new();
data.push(((len >> 24) & 0xff) as u8);
data.push(((len >> 16) & 0xff) as u8);
data.push(((len >> 8) & 0xff) as u8);
data.push((len & 0xff) as u8);
data.extend(bytes);
data
}
fn concat(a: &[u8], b: &[u8]) -> Vec<u8> {
let mut both = a.to_owned();
both.extend(b);
both
}
const MIN_SERVER_VER_BOND_ISSUERID: u32 = 176; // From server_versions.py.
const DESIRED_VERSION: u32 = MIN_SERVER_VER_BOND_ISSUERID;
fn send_greeting(stream: &mut TcpStream) {
let prefix = "API\0";
let version = format!("v{}..{}", DESIRED_VERSION, DESIRED_VERSION);
let version_msg = add_length_prefix(version.as_bytes());
let both = concat(prefix.as_bytes(), &version_msg);
stream.write_all(&both).expect("Greeting");
}
fn read_length(reader: &mut TcpStream) -> u32 {
let mut len_buf: [u8; 4] = [0; 4];
reader.read_exact(&mut len_buf).expect("Read length");
let length: u32 = ((len_buf[0] as u32) << 24)
| ((len_buf[1] as u32) << 16)
| ((len_buf[2] as u32) << 8)
| len_buf[3] as u32;
length
}
fn split_message(message: &[u8]) -> Vec<String> {
// Split into an array of buffers:
let mut components = Vec::<Vec<u8>>::new();
let mut current_component = Vec::<u8>::new();
for byte in message {
if *byte == 0 {
if !current_component.is_empty() {
components.push(current_component.clone());
current_component.clear();
}
} else {
current_component.push(*byte);
}
}
if !current_component.is_empty() {
components.push(current_component);
}
// Convert the buffers into strings:
components
.into_iter()
.map(|v| String::from_utf8_lossy(&v).to_string())
.collect()
}
fn read_message(reader: &mut TcpStream) -> Vec<String> {
let length = read_length(reader);
let mut buffer: Vec<u8> = vec![0; length as usize];
reader.read_exact(&mut buffer).expect("Read message");
split_message(&buffer)
}
enum OutgoingRequestType {
ReqMktData = 1,
StartApi = 71,
}
// Join and delimit the fields, prefixing the length.
fn make_message(fields: &[String]) -> Vec<u8> {
let mut delimited_fields = Vec::<u8>::new();
for field in fields {
delimited_fields.extend(field.as_bytes());
delimited_fields.push(0); // Even goes after the last field.
}
add_length_prefix(&delimited_fields)
}
fn send_start_api(stream: &mut TcpStream, client_id: u32) {
let fields: Vec<String> = vec![
(OutgoingRequestType::StartApi as u32).to_string(),
"2".to_string(), // Version of the Start API message.
client_id.to_string(),
"".to_string(), // Optional capabilities.
];
let message = make_message(&fields);
stream.write_all(&message).expect("Start api");
}
enum IncomingRequestType {
TickPrice = 1,
ErrMsg = 4,
}
fn message_received(fields: &[String]) {
if fields.is_empty() { return }
let Ok(t) = fields[0].parse::<u32>() else { return };
if t == IncomingRequestType::ErrMsg as u32 {
let _request_id = fields.get(2);
let _code = fields.get(3);
let Some(text) = fields.get(4) else { return };
println!("Message: {}", text);
} else if t == IncomingRequestType::TickPrice as u32 {
let Some(tick_type) = fields.get(3) else { return };
if tick_type != "4" { return } // 4 = 'Last' from ticktype.py.
let Some(price) = fields.get(4) else { return };
println!("Price: {}", price);
} else {
println!("Received: {:?}", fields);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment