Skip to content

Instantly share code, notes, and snippets.

@fteychene
Created February 9, 2021 23:22
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save fteychene/ea1bb0d7ec205484216b3f1c55cec7c0 to your computer and use it in GitHub Desktop.
Save fteychene/ea1bb0d7ec205484216b3f1c55cec7c0 to your computer and use it in GitHub Desktop.
Lolilol - a test of https://github.com/akiradeveloper/lol usage
[package]
name = "lolilol"
version = "0.1.0"
authors = ["fteychene <francois.teychene@gmail.com>"]
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
lol-core = "0.7.1"
async-trait = "0.1.42"
anyhow = "1.0"
tokio = { version = "1", features = ["full"] }
tonic = "0.4"
simple-logging = "2.0.2"
log = "0.4"
bytes = "1.0"
use anyhow::Error;
use async_trait::async_trait;
use log::LevelFilter;
use lol_core::core_message;
use lol_core::proto_compiled;
use lol_core::proto_compiled::raft_client::RaftClient;
use lol_core::proto_compiled::{AddServerReq, CommitReq};
use lol_core::snapshot::{SnapshotStream, SnapshotTag};
use lol_core::{Config, MakeSnapshot, RaftApp, RaftCore, TunableConfig};
use std::io::Read;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::sync::Arc;
use std::{io, thread, time};
struct LoliApp {
contents: bytes::Bytes,
}
#[async_trait]
impl RaftApp for LoliApp {
/// How state machine interacts with inputs from clients.
async fn process_message(&self, request: &[u8]) -> Result<Vec<u8>, Error> {
println!("process_message : {:?}", request);
unimplemented!()
}
/// Almost same as process_message but is called in log application path.
/// This function may return `MakeSnapshot` to make a new snapshot.
/// Note that the snapshot entry corresponding to the copy snapshot is not guaranteed to be made
/// due to possible I/O errors, etc.
async fn apply_message(
&self,
request: &[u8],
apply_index: u64,
) -> Result<(Vec<u8>, MakeSnapshot), Error> {
println!(
"apply_message: index={} request={:?}",
apply_index,
std::str::from_utf8(request)
);
Ok((vec![], MakeSnapshot::None))
}
/// Special type of apply_message but when the entry is snapshot entry.
/// Snapshot is None happens iff apply_index is 1 which is the most initial snapshot.
async fn install_snapshot(
&self,
snapshot: Option<&SnapshotTag>,
apply_index: u64,
) -> Result<(), Error> {
Ok(())
}
/// This function is called from compaction threads.
/// It should return new snapshot from accumulative compution with the old_snapshot and the subsequent log entries.
async fn fold_snapshot(
&self,
old_snapshot: Option<&SnapshotTag>,
requests: Vec<&[u8]>,
) -> Result<SnapshotTag, Error> {
Ok(SnapshotTag::from(vec![]))
}
/// Make a snapshot resource and returns the tag.
async fn from_snapshot_stream(
&self,
st: SnapshotStream,
snapshot_index: u64,
) -> Result<SnapshotTag, Error> {
unimplemented!()
}
/// Make a snapshot stream from a snapshot resource bound to the tag.
async fn to_snapshot_stream(&self, x: &SnapshotTag) -> SnapshotStream {
unimplemented!()
}
/// Delete a snapshot resource bound to the tag.
async fn delete_resource(&self, x: &SnapshotTag) -> Result<(), Error> {
unimplemented!()
}
}
#[tokio::main]
async fn main() -> Result<(), Error> {
simple_logging::log_to(std::io::stdout(), LevelFilter::Info);
println!("Lolilol project. Use lol");
tokio::spawn(async {
let app = LoliApp {
contents: Default::default(),
};
let storage = lol_core::storage::memory::Storage::new();
let config = Config::new("http://127.0.0.1:5000".to_string());
let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 5000);
let core = RaftCore::new(
app,
storage,
config,
TunableConfig {
compaction_delay_sec: 3,
compaction_interval_sec: 5,
},
)
.await;
let service = lol_core::make_service(core);
tonic::transport::Server::builder()
.add_service(service)
.serve(socket)
.await
});
tokio::spawn(async {
let app = LoliApp {
contents: Default::default(),
};
let storage = lol_core::storage::memory::Storage::new();
let config = Config::new("http://127.0.0.1:5001".to_string());
let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 5001);
let core = RaftCore::new(
app,
storage,
config,
TunableConfig {
compaction_delay_sec: 3,
compaction_interval_sec: 5,
},
)
.await;
let service = lol_core::make_service(core);
tonic::transport::Server::builder()
.add_service(service)
.serve(socket)
.await;
});
tokio::spawn(async {
let app = LoliApp {
contents: Default::default(),
};
let storage = lol_core::storage::memory::Storage::new();
let config = Config::new("http://127.0.0.1:5002".to_string());
let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 5002);
let core = RaftCore::new(
app,
storage,
config,
TunableConfig {
compaction_delay_sec: 3,
compaction_interval_sec: 5,
},
)
.await;
let service = lol_core::make_service(core);
tonic::transport::Server::builder()
.add_service(service)
.serve(socket)
.await;
});
let ten_millis = time::Duration::from_millis(500);
thread::sleep(ten_millis);
let mut client = RaftClient::connect(tonic::transport::channel::Endpoint::from_static(
"http://127.0.0.1:5000",
))
.await?;
let response = client
.add_server(tonic::Request::new(AddServerReq {
id: "http://127.0.0.1:5000".into(),
}))
.await?;
println!("adding 5000: {:?}", response);
let response = client
.add_server(tonic::Request::new(AddServerReq {
id: "http://127.0.0.1:5001".into(),
}))
.await?;
println!("adding 5001: {:?}", response);
let response = client
.add_server(tonic::Request::new(AddServerReq {
id: "http://127.0.0.1:5002".into(),
}))
.await?;
println!("adding 5002: {:?}", response);
let mut conn = lol_core::connection::connect(tonic::transport::channel::Endpoint::from_static(
"http://127.0.0.1:5000",
))
.await
.unwrap();
let msg = core_message::Req::ClusterInfo;
let req = proto_compiled::ProcessReq {
message: core_message::Req::serialize(&msg),
core: true,
};
let res = conn
.request_process_locally(req)
.await
.unwrap()
.into_inner();
let msg = core_message::Rep::deserialize(&res.message).unwrap();
println!("clusterInfo: {:?}", msg);
let response = conn
.request_commit(proto_compiled::CommitReq {
core: false,
message: String::from("coucou from brest").as_bytes().to_vec(),
})
.await?;
println!("requestCommit: {:?}", response);
let mut buffer = String::new();
std::io::stdin().read_to_string(&mut buffer)?;
Ok(())
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment