Skip to content

Instantly share code, notes, and snippets.

@bigbes
Created March 9, 2017 21:51
Show Gist options
  • Save bigbes/d68fa3d1cebdcb6fc4f90469449b90d2 to your computer and use it in GitHub Desktop.
Save bigbes/d68fa3d1cebdcb6fc4f90469449b90d2 to your computer and use it in GitHub Desktop.
use std::io;
use std::boxed::Box;
use std::vec::Vec;
use std::path::Path;
use std::time::Duration;
use std::net::TcpStream;
#[cfg(unix)]
use unix_socket::UnixStream;
use bufstream::BufStream;
struct Connection {
stream: Box<BufStream<Streamable>>,
expect: u32,
sync: u64,
}
impl Connection {
pub fn new(stream: BufStream<Stramable>) -> Connection {
return Connection {
stream: stream,
expect: 0,
sync: 0,
}
}
}
pub struct ConnectionBuilder {
address: String,
tm_read: Duration,
tm_write: Duration,
}
impl ConnectionBuilder {
pub fn new(address: Option<&str>) -> ConnectionBuilder {
let actual_address: String = match address {
None => String::from("localhost"),
Some(addr) => String::from(addr),
};
return ConnectionBuilder {
address: actual_address,
tm_read: Duration::new(60, 0),
tm_write: Duration::new(60, 0),
}
}
pub fn set_read_timeout<'a>(&'a mut self, tm_read: f32)
-> &'a mut ConnectionBuilder {
self.tm_read = Duration::new(tm_read.trunc() as u64, tm_read.fract() as u32 * 10^9);
return self;
}
pub fn set_write_timeout<'a>(&'a mut self, tm_write: f32)
-> &'a mut ConnectionBuilder {
self.tm_write = Duration::new(tm_write.trunc() as u64, tm_write.fract() as u32 * 10^9);
return self;
}
pub fn connect(&self) -> io::Result<Connection> {
let mut split = self.address.split("://");
return match(split.next(), split.next()) {
(Some("tcp"), Some(addr)) => {
let stream = try!(TcpStream::connect(addr));
try!(stream.set_read_timeout (Some(self.tm_read )));
try!(stream.set_write_timeout(Some(self.tm_write)));
let stream_buf = BufStream::new(stream);
return Ok(Box::new(Connection::new(stream_buf)));
},
#[cfg(unix)]
(Some("unix"), Some(addr)) => {
let stream = try!(UnixStream::connect(&Path::new(addr)));
try!(stream.set_read_timeout (Some(self.tm_read )));
try!(stream.set_write_timeout(Some(self.tm_write)));
let stream_buf = BufStream::new(stream);
return Ok(Box::new(Connection::new(stream_buf)));
},
(Some(prot), _) => Err(
io::Error::new(
io::ErrorKind::Other,
format!("Unsupported protocol: '{}'", prot)
)
),
_ => return Err(
io::Error::new(
io::ErrorKind::Other,
"Malformed address"
)
),
}
}
}
use sha1;
const KEY_CODE: u8 = 0x00;
const KEY_SYNC: u8 = 0x01;
const KEY_SCHEMA_ID: u8 = 0x05;
const KEY_SPACE_ID: u8 = 0x10;
const KEY_INDEX_ID: u8 = 0x11;
const KEY_LIMIT: u8 = 0x12;
const KEY_OFFSET: u8 = 0x13;
const KEY_ITERATOR: u8 = 0x14;
const KEY_KEY: u8 = 0x20;
const KEY_TUPLE: u8 = 0x21;
const KEY_FUNCTION_NAME: u8 = 0x22;
const KEY_USERNAME: u8 = 0x23;
const KEY_EXPRESSION: u8 = 0x27;
const KEY_OPS: u8 = 0x28;
const KEY_DATA: u8 = 0x30;
const KEY_ERROR: u8 = 0x31;
const KEY_SERVER_ID: u8 = 0x02;
const KEY_LSN: u8 = 0x03;
const KEY_TIMESTAMP: u8 = 0x04;
const KEY_SERVER_UUID: u8 = 0x24;
const KEY_CLUSTER_UUID: u8 = 0x25;
const KEY_VCLOCK: u8 = 0x26;
const OP_SELECT: u8 = 0x01;
const OP_INSERT: u8 = 0x02;
const OP_REPLACE: u8 = 0x03;
const OP_UPDATE: u8 = 0x04;
const OP_DELETE: u8 = 0x05;
const OP_CALL_16: u8 = 0x06;
const OP_AUTH: u8 = 0x07;
const OP_EVAL: u8 = 0x08;
const OP_UPSERT: u8 = 0x09;
const OP_CALL: u8 = 0x0a;
const OP_PING: u8 = 0x40;
const OP_JOIN: u8 = 0x41;
const OP_SUBSCRIBE: u8 = 0x42;
const RET_OK: u32 = 0x00;
const RET_ERROR_MASK: u32 = 0x800;
const SCRAMBLE_SIZE: u8 = 20;
const GREETING_SIZE: u8 = 128;
const VERSION_SIZE: u8 = 62;
const SALT_SIZE: u8 = 44;
#[repr(u8)]
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum Command {
Select = OP_SELECT,
Insert = OP_INSERT,
Replace = OP_REPLACE,
Update = OP_UPDATE,
Delete = OP_DELETE,
Call_16 = OP_CALL_16,
Auth = OP_AUTH,
Eval = OP_EVAL,
Upsert = OP_UPSERT,
Call = OP_CALL,
Ping = OP_PING,
Join = OP_JOIN,
Subscribe = OP_SUBSCRIBE,
}
#[repr(u8)]
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum Key {
Code = KEY_CODE,
Sync = KEY_SYNC,
Schema_id = KEY_SCHEMA_ID,
Space_id = KEY_SPACE_ID,
Index_id = KEY_INDEX_ID,
Limit = KEY_LIMIT,
Offset = KEY_OFFSET,
Iterator = KEY_ITERATOR,
Key = KEY_KEY,
Tuple = KEY_TUPLE,
Function_name = KEY_FUNCTION_NAME,
Username = KEY_USERNAME,
Expression = KEY_EXPRESSION,
Ops = KEY_OPS,
Data = KEY_DATA,
Error = KEY_ERROR,
Server_id = KEY_SERVER_ID,
Lsn = KEY_LSN,
Timestamp = KEY_TIMESTAMP,
Server_uuid = KEY_SERVER_UUID,
Cluster_uuid = KEY_CLUSTER_UUID,
Vclock = KEY_VCLOCK,
}
impl Command {
#[inline]
fn to_u8(&self) -> u8 {
return *self as u8;
}
#[inline]
fn from_u8(code: u8) -> Option<Command> {
return match code {
OP_SELECT => Some(Command::Select),
OP_INSERT => Some(Command::Insert),
OP_REPLACE => Some(Command::Replace),
OP_UPDATE => Some(Command::Update),
OP_DELETE => Some(Command::Delete),
OP_CALL_16 => Some(Command::Call_16),
OP_AUTH => Some(Command::Auth),
OP_EVAL => Some(Command::Eval),
OP_UPSERT => Some(Command::Upsert),
OP_CALL => Some(Command::Call),
OP_PING => Some(Command::Ping),
OP_JOIN => Some(Command::Join),
OP_SUBSCRIBE => Some(Command::Subscribe),
_ => None,
}
}
}
impl Key {
#[inline]
fn to_u8(&self) -> u8 {
return *self as u8;
}
#[inline]
fn from_u8(code: u8) -> Option<Key> {
return match code {
KEY_CODE => Some(Key::Code),
KEY_SYNC => Some(Key::Sync),
KEY_SCHEMA_ID => Some(Key::Schema_id),
KEY_SPACE_ID => Some(Key::Space_id),
KEY_INDEX_ID => Some(Key::Index_id),
KEY_LIMIT => Some(Key::Limit),
KEY_OFFSET => Some(Key::Offset),
KEY_ITERATOR => Some(Key::Iterator),
KEY_KEY => Some(Key::Key),
KEY_TUPLE => Some(Key::Tuple),
KEY_FUNCTION_NAME => Some(Key::Function_name),
KEY_USERNAME => Some(Key::Username),
KEY_EXPRESSION => Some(Key::Expression),
KEY_OPS => Some(Key::Ops),
KEY_DATA => Some(Key::Data),
KEY_ERROR => Some(Key::Error),
KEY_SERVER_ID => Some(Key::Server_id),
KEY_LSN => Some(Key::Lsn),
KEY_TIMESTAMP => Some(Key::Timestamp),
KEY_SERVER_UUID => Some(Key::Server_uuid),
KEY_CLUSTER_UUID => Some(Key::Cluster_uuid),
KEY_VCLOCK => Some(Key::Vclock),
_ => None,
}
}
}
#[inline]
pub fn chap_sha_xor(left: &[u8], right: &[u8]) -> Vec<u8> {
return left.iter().zip(right).map(|(a, b)| a ^ b).collect();
}
pub fn chap_sha(password: String, salt: &[u8]) -> Vec<u8> {
let mut sha = sha1::Sha1::new();
sha.update(&password.into_bytes() as &[u8]);
let hash1 = sha.digest().bytes(); sha.reset();
sha.update(&hash1[..]);
let hash2 = sha.digest().bytes(); sha.reset();
sha.update(salt); sha.update(&hash2[..]);
let hash3 = sha.digest().bytes();
return chap_sha_xor(&hash3[..], &hash1[..]);
}
use std::io;
use std::boxed::Box;
use std::vec::Vec;
use std::path::Path;
mod constants;
mod request;
mod connection;
use rmp as msgpack;
pub enum Error {
TarantoolError(u16, String),
IoError(io::Error)
}
pub type TarantoolResult<T> = Result<T, Error>;
pub trait LLOperation {
/*
fn select(space_id: u32, index_id: u32, key: Vec<u8>, limit: u32,
offset: u32, iterator: u8)
-> TarantoolResult<Vec<u8>>;
fn insert(space_id: u32, tuple: Vec<u8>)
-> TarantoolResult<Vec<u8>>;
fn replace(space_id: u32, tuple: Vec<u8>)
-> TarantoolResult<Vec<u8>>;
fn update(space_id: u32, index_id: u32, key: Vec<u8>, ops: Vec<u8>)
-> TarantoolResult<Vec<u8>>;
fn delete(space_id: u32, index_id: u32, key: Vec<u8>)
-> TarantoolResult<Vec<u8>>;
fn authenticate(login: String, password: String)
-> TarantoolResult<()>;
fn eval(code: String, args: Vec<u8>)
-> TarantoolResult<Vec<u8>>;
fn upsert(space_id: u32, index_id: u32, tuple: Vec<u8>, ops: Vec<u8>)
-> TarantoolResult<()>;
fn call(func: String, args: Vec<u8>)
-> TarantoolResult<Vec<u8>>;
*/
fn ping(&mut self) -> TarantoolResult<()>;
}
impl LLOperation for Connection {
fn ping(&mut self) -> TarantoolResult<()> {
let mut prefix = [0x00; 5];
let mut outgoing: Vec<u8> = Vec::new();
self.sync += 1;
request::encode_header(&mut outgoing, constants::Command::Ping, self.sync, 0);
request::encode_body_ping(&mut outgoing);
let (sock,) = (self.stream,);
// msgpack::encode::write_u32(&mut self.stream.unpack(), outgoing.len() as u32);
// self.stream.write_bytes([0x00]);
return Ok(());
}
}
use std::io::Write;
use client::constants::{Key, Command, chap_sha};
use rmp as msgpack;
pub fn encode_header<W: Write>(wr: &mut W, operation: Command, sync: u64, schema_id: u32) {
msgpack::encode::write_map_len(wr, 3);
msgpack::encode::write_uint(wr, Key::Code as u64);
msgpack::encode::write_uint(wr, operation as u64);
msgpack::encode::write_uint(wr, Key::Sync as u64);
msgpack::encode::write_uint(wr, sync);
msgpack::encode::write_uint(wr, Key::Schema_id as u64);
msgpack::encode::write_uint(wr, schema_id as u64);
}
pub fn encode_body_ping<W: Write>(wr: &mut W) {
msgpack::encode::write_map_len(wr, 0);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment