Skip to content

Instantly share code, notes, and snippets.

@dkohlsdorf
Last active March 14, 2019 19:06
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dkohlsdorf/a17039267ebeba3da183a04049b6ebfd to your computer and use it in GitHub Desktop.
Save dkohlsdorf/a17039267ebeba3da183a04049b6ebfd to your computer and use it in GitHub Desktop.
Avro over TCP / Ip in Rust
extern crate avro_rs;
use avro_rs::*;
use std::io::prelude::*;
use std::net::TcpListener;
use std::net::TcpStream;
use std::thread;
#[derive(Debug, Deserialize, Serialize)]
pub struct Test {
a: i32,
b: f64,
c: String,
d: Vec<i32>,
}
impl Test {
fn new(i: i32) -> Test {
Test {
a: i,
b: f64::from(i) / 12.0,
c: String::from("test"),
d: vec![0; i as usize],
}
}
fn schema() -> Schema {
let raw_schema = r#"
{
"type": "record",
"name": "test",
"fields": [
{"name": "a", "type": "int", "default": 0},
{"name": "b", "type": "double", "default": 1.0},
{"name": "c", "type": "string", "default": "dansen"},
{"name": "d", "type": {"type": "array", "items": "int"}}
]
}
"#;
avro_rs::Schema::parse_str(raw_schema).unwrap()
}
fn write_data(data: &[Test], schema: Schema) -> Vec<u8> {
let mut writer = Writer::with_codec(&schema, Vec::new(), Codec::Deflate);
for x in data.iter() {
writer.append_ser(x).unwrap();
}
writer.flush().unwrap();
writer.into_inner()
}
fn read_data(data: &[u8], schema: Schema) -> Vec<Test> {
let reader = Reader::with_schema(&schema, data).unwrap();
reader
.map(|record| from_value::<Test>(&record.unwrap()).unwrap())
.collect()
}
fn handle_connection(mut stream: TcpStream) {
let mut buffer: Vec<u8> = Vec::new();
stream.read_to_end(&mut buffer).unwrap();
for test in Test::read_data(&buffer[..], Test::schema()) {
println!("Received: {:?}", test);
}
}
pub fn example() {
let h1 = thread::spawn(|| {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
for stream in listener.incoming() {
let s = stream.unwrap();
Test::handle_connection(s);
}
});
let h2 = thread::spawn(|| {
let data = Test::write_data(
&[Test::new(1), Test::new(2), Test::new(3)],
Test::schema(),
);
let mut stream = TcpStream::connect("127.0.0.1:7878").unwrap();
stream.write_all(&data).unwrap();
});
h2.join().unwrap();
h1.join().unwrap();
}
}
extern crate avro_rs;
use std::net::TcpListener;
use std::net::TcpStream;
use std::thread;
use avro_rs::*;
use byteorder::*;
use std::io::prelude::*;
#[derive(Debug, Deserialize, Serialize)]
pub struct Test {
a: i32,
b: f64,
c: String,
d: Vec<i32>
}
impl Test {
fn new(i: i32) -> Test {
Test {
a: i,
b: f64::from(i) / 12.0,
c: String::from("test"),
d: vec![0; i as usize]
}
}
fn schema() -> Schema {
let raw_schema = r#"
{
"type": "record",
"name": "test",
"fields": [
{"name": "a", "type": "int", "default": 0},
{"name": "b", "type": "double", "default": 1.0},
{"name": "c", "type": "string", "default": "dansen"},
{"name": "d", "type": {"type": "array", "items": "int"}}
]
}
"#;
avro_rs::Schema::parse_str(raw_schema).unwrap()
}
}
fn write_data(data: &[Test], schema: Schema) -> Vec<u8> {
let mut writer = Writer::with_codec(&schema, Vec::new(), Codec::Deflate);
for x in data.iter() {
writer.append_ser(x).unwrap();
}
writer.flush().unwrap();
writer.into_inner()
}
fn read_data(data: &[u8], schema: Schema) -> Vec<Test> {
let reader = Reader::with_schema(&schema, data).unwrap();
reader
.map(|record| from_value::<Test>(&record.unwrap()).unwrap())
.collect()
}
// Shamelessly ported from avro python implementation
pub fn send_framed(objects: &[Test], address: String, schema: Schema, buf_size: usize) {
let data = write_data(objects, schema);
let mut stream = TcpStream::connect(address).unwrap();
let n = data.len();
for i in (0..n).step_by(buf_size) {
// determine size of bytes to write
let start = i;
let stop = usize::min(i + buf_size, n);
// send length of buffer
let mut buffer_length = [0; 4];
BigEndian::write_u32(&mut buffer_length, (stop - start) as u32);
stream.write(&buffer_length).unwrap();
// write actual data
stream.write(&data[start..stop]).unwrap();
}
// terminate by 0 sized
let mut buffer_length = [0; 4];
BigEndian::write_u32(&mut buffer_length, 0);
stream.write(&buffer_length).unwrap();
stream.flush().unwrap();
}
// Shamelessly ported from avro python implementation
pub fn read_framed(stream: &mut TcpStream, schema: Schema) -> Vec<Test> {
let mut message: Vec<u8> = Vec::new();
let mut first_buf = [0; 4];
stream.read(&mut first_buf).unwrap();
let mut next = BigEndian::read_u32(&first_buf);
// read while we see non empty buffers
while next > 0 {
// append all bytes to final object
let mut object = vec![0; next as usize];
stream.read(&mut object).unwrap();
message.append(&mut object);
// read next buffer size
let mut next_buf = [0; 4];
stream.read(&mut next_buf).unwrap();
next = BigEndian::read_u32(&next_buf);
}
read_data(&message, schema)
}
pub fn example() {
let h1 = thread::spawn(|| {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
for stream in listener.incoming() {
let mut s = stream.unwrap();
let results = read_framed(&mut s, Test::schema());
for result in results.iter() {
println!("{:?}", result);
}
}
});
thread::spawn(|| {
send_framed(&[Test::new(1), Test::new(2), Test::new(3)], String::from("127.0.0.1:7878"), Test::schema(), 8192); // From python code
send_framed(&[Test::new(4), Test::new(5), Test::new(6)], String::from("127.0.0.1:7878"), Test::schema(), 8192);
send_framed(&[Test::new(7), Test::new(8), Test::new(9)], String::from("127.0.0.1:7878"), Test::schema(), 8192);
send_framed(&[Test::new(10), Test::new(11), Test::new(12)], String::from("127.0.0.1:7878"), Test::schema(), 8192);
});
h1.join().unwrap();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment