Last active
March 14, 2019 19:06
-
-
Save dkohlsdorf/a17039267ebeba3da183a04049b6ebfd to your computer and use it in GitHub Desktop.
Avro over TCP / Ip in Rust
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
extern crate avro_rs; | |
use avro_rs::*; | |
use std::io::prelude::*; | |
use std::net::TcpListener; | |
use std::net::TcpStream; | |
use std::thread; | |
#[derive(Debug, Deserialize, Serialize)] | |
pub struct Test { | |
a: i32, | |
b: f64, | |
c: String, | |
d: Vec<i32>, | |
} | |
impl Test { | |
fn new(i: i32) -> Test { | |
Test { | |
a: i, | |
b: f64::from(i) / 12.0, | |
c: String::from("test"), | |
d: vec![0; i as usize], | |
} | |
} | |
fn schema() -> Schema { | |
let raw_schema = r#" | |
{ | |
"type": "record", | |
"name": "test", | |
"fields": [ | |
{"name": "a", "type": "int", "default": 0}, | |
{"name": "b", "type": "double", "default": 1.0}, | |
{"name": "c", "type": "string", "default": "dansen"}, | |
{"name": "d", "type": {"type": "array", "items": "int"}} | |
] | |
} | |
"#; | |
avro_rs::Schema::parse_str(raw_schema).unwrap() | |
} | |
fn write_data(data: &[Test], schema: Schema) -> Vec<u8> { | |
let mut writer = Writer::with_codec(&schema, Vec::new(), Codec::Deflate); | |
for x in data.iter() { | |
writer.append_ser(x).unwrap(); | |
} | |
writer.flush().unwrap(); | |
writer.into_inner() | |
} | |
fn read_data(data: &[u8], schema: Schema) -> Vec<Test> { | |
let reader = Reader::with_schema(&schema, data).unwrap(); | |
reader | |
.map(|record| from_value::<Test>(&record.unwrap()).unwrap()) | |
.collect() | |
} | |
fn handle_connection(mut stream: TcpStream) { | |
let mut buffer: Vec<u8> = Vec::new(); | |
stream.read_to_end(&mut buffer).unwrap(); | |
for test in Test::read_data(&buffer[..], Test::schema()) { | |
println!("Received: {:?}", test); | |
} | |
} | |
pub fn example() { | |
let h1 = thread::spawn(|| { | |
let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); | |
for stream in listener.incoming() { | |
let s = stream.unwrap(); | |
Test::handle_connection(s); | |
} | |
}); | |
let h2 = thread::spawn(|| { | |
let data = Test::write_data( | |
&[Test::new(1), Test::new(2), Test::new(3)], | |
Test::schema(), | |
); | |
let mut stream = TcpStream::connect("127.0.0.1:7878").unwrap(); | |
stream.write_all(&data).unwrap(); | |
}); | |
h2.join().unwrap(); | |
h1.join().unwrap(); | |
} | |
} | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
extern crate avro_rs; | |
use std::net::TcpListener; | |
use std::net::TcpStream; | |
use std::thread; | |
use avro_rs::*; | |
use byteorder::*; | |
use std::io::prelude::*; | |
#[derive(Debug, Deserialize, Serialize)] | |
pub struct Test { | |
a: i32, | |
b: f64, | |
c: String, | |
d: Vec<i32> | |
} | |
impl Test { | |
fn new(i: i32) -> Test { | |
Test { | |
a: i, | |
b: f64::from(i) / 12.0, | |
c: String::from("test"), | |
d: vec![0; i as usize] | |
} | |
} | |
fn schema() -> Schema { | |
let raw_schema = r#" | |
{ | |
"type": "record", | |
"name": "test", | |
"fields": [ | |
{"name": "a", "type": "int", "default": 0}, | |
{"name": "b", "type": "double", "default": 1.0}, | |
{"name": "c", "type": "string", "default": "dansen"}, | |
{"name": "d", "type": {"type": "array", "items": "int"}} | |
] | |
} | |
"#; | |
avro_rs::Schema::parse_str(raw_schema).unwrap() | |
} | |
} | |
fn write_data(data: &[Test], schema: Schema) -> Vec<u8> { | |
let mut writer = Writer::with_codec(&schema, Vec::new(), Codec::Deflate); | |
for x in data.iter() { | |
writer.append_ser(x).unwrap(); | |
} | |
writer.flush().unwrap(); | |
writer.into_inner() | |
} | |
fn read_data(data: &[u8], schema: Schema) -> Vec<Test> { | |
let reader = Reader::with_schema(&schema, data).unwrap(); | |
reader | |
.map(|record| from_value::<Test>(&record.unwrap()).unwrap()) | |
.collect() | |
} | |
// Shamelessly ported from avro python implementation | |
pub fn send_framed(objects: &[Test], address: String, schema: Schema, buf_size: usize) { | |
let data = write_data(objects, schema); | |
let mut stream = TcpStream::connect(address).unwrap(); | |
let n = data.len(); | |
for i in (0..n).step_by(buf_size) { | |
// determine size of bytes to write | |
let start = i; | |
let stop = usize::min(i + buf_size, n); | |
// send length of buffer | |
let mut buffer_length = [0; 4]; | |
BigEndian::write_u32(&mut buffer_length, (stop - start) as u32); | |
stream.write(&buffer_length).unwrap(); | |
// write actual data | |
stream.write(&data[start..stop]).unwrap(); | |
} | |
// terminate by 0 sized | |
let mut buffer_length = [0; 4]; | |
BigEndian::write_u32(&mut buffer_length, 0); | |
stream.write(&buffer_length).unwrap(); | |
stream.flush().unwrap(); | |
} | |
// Shamelessly ported from avro python implementation | |
pub fn read_framed(stream: &mut TcpStream, schema: Schema) -> Vec<Test> { | |
let mut message: Vec<u8> = Vec::new(); | |
let mut first_buf = [0; 4]; | |
stream.read(&mut first_buf).unwrap(); | |
let mut next = BigEndian::read_u32(&first_buf); | |
// read while we see non empty buffers | |
while next > 0 { | |
// append all bytes to final object | |
let mut object = vec![0; next as usize]; | |
stream.read(&mut object).unwrap(); | |
message.append(&mut object); | |
// read next buffer size | |
let mut next_buf = [0; 4]; | |
stream.read(&mut next_buf).unwrap(); | |
next = BigEndian::read_u32(&next_buf); | |
} | |
read_data(&message, schema) | |
} | |
pub fn example() { | |
let h1 = thread::spawn(|| { | |
let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); | |
for stream in listener.incoming() { | |
let mut s = stream.unwrap(); | |
let results = read_framed(&mut s, Test::schema()); | |
for result in results.iter() { | |
println!("{:?}", result); | |
} | |
} | |
}); | |
thread::spawn(|| { | |
send_framed(&[Test::new(1), Test::new(2), Test::new(3)], String::from("127.0.0.1:7878"), Test::schema(), 8192); // From python code | |
send_framed(&[Test::new(4), Test::new(5), Test::new(6)], String::from("127.0.0.1:7878"), Test::schema(), 8192); | |
send_framed(&[Test::new(7), Test::new(8), Test::new(9)], String::from("127.0.0.1:7878"), Test::schema(), 8192); | |
send_framed(&[Test::new(10), Test::new(11), Test::new(12)], String::from("127.0.0.1:7878"), Test::schema(), 8192); | |
}); | |
h1.join().unwrap(); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment