Skip to content

Instantly share code, notes, and snippets.

@Akanoa
Created July 16, 2022 16:31
Show Gist options
  • Save Akanoa/b53f4f042d24607f00f1c36b1ec01585 to your computer and use it in GitHub Desktop.
Save Akanoa/b53f4f042d24607f00f1c36b1ec01585 to your computer and use it in GitHub Desktop.
Playground article télémétrie
use opentelemetry::global::shutdown_tracer_provider;
use std::future::Future;
use std::pin::Pin;
use opentelemetry::runtime::Tokio;
use rand::Rng;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::{io, join, select, spawn};
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tracing::{debug, error, info, info_span, span, trace_span, warn, Instrument, Span};
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
use futures::future;
#[tokio::main]
#[tracing::instrument]
async fn main() {
// Create a Jaeger Tracer
let tracer_jaeger = opentelemetry_jaeger::new_pipeline()
.with_service_name("plus ou moins5")
.install_batch(Tokio)
.unwrap();
// Create a tracing layer with the configured tracer
let opentelemetry_layer = tracing_opentelemetry::layer()
.with_tracer(tracer_jaeger)
.with_threads(true);
// Register Layers
tracing_subscriber::registry()
.with(opentelemetry_layer)
.init();
let (_tx, rx) = channel::<()>(1);
let (_tx1, rx1) = channel::<()>(1);
let future_randomizer = spawn(randomizer(rx));
let future_game_server = spawn(game_server(rx1));
let (_randomizer_result, _game_server_result) = join!(future_randomizer, future_game_server);
// Force all Span to be sent
shutdown_tracer_provider();
}
#[tracing::instrument]
async fn compute_random_number() -> u8 {
debug!("Start asking number");
//sleep(Duration::from_millis(100));
debug!("End asking number");
rand::thread_rng().gen_range(0..100)
}
#[tracing::instrument]
async fn handle_randomizer_request(mut stream: TcpStream) -> io::Result<()> {
let number = compute_random_number().await;
info!(number = number, "Number chosen");
stream.write_u8(number).await.map_err(|err| {
warn!(error=%err, "Unable to write to stream");
err
})?;
Ok(())
}
enum Response {
More,
Less,
Win(u8),
}
fn handle_player(
tx: Sender<u8>,
mut rx: Receiver<Response>,
) -> Pin<Box<dyn Fn(TcpStream) -> dyn Future<Output = io::Result<()>>>> {
let inner2 = |mut stream: TcpStream| async move {
rx.recv().await;
stream.write_u8(1).await;
Ok::<(), io::Error>(())
};
async fn inner(mut stream: TcpStream) -> io::Result<()> {
let a = rx.recv().await;
loop {
let mut data = [0_u8; 32];
let size = stream.read(&mut data).await?;
println!("data receive");
if size == 0 {
info!("Peer has left");
break;
}
stream.write_all(b"PONG").await.map_err(|err| {
warn!(error=%err, "Unable to write to stream");
err
})?;
info!("PONG sent")
}
Ok(())
}
Box::pin(inner2)
}
#[tracing::instrument(skip(closure))]
async fn tcp_server<F, O>(
port: u16,
closure: F,
mut signal: Receiver<()>,
name: &str,
) -> io::Result<()>
where
O: Future<Output = io::Result<()>>,
F: Fn(TcpStream) -> O,
{
println!("start tcp server {}", name);
let listener = TcpListener::bind(format!("127.0.0.1:{}", port))
.await
.map_err(|error| {
error!(port, %error, "Unable to start {} service", name);
error
})?;
loop {
select! {
_ = signal.recv() => break,
result_accept = listener.accept() => {
match result_accept {
Ok((stream, peer)) => {
info!(peer=%peer, "New peer connected");
let handle_response = closure(stream).await;
if let Err(error) = handle_response {
error!(%error, "An error occurred while handling {} request", name);
continue;
}
}
Err(error) => {
warn!(error=%error, "Unable to accept connection");
continue;
}
};
}
};
}
info!("End of {} service", name);
Ok(())
}
#[tracing::instrument]
async fn randomizer(signal: Receiver<()>) -> io::Result<()> {
println!("Start randomizer");
let port: u16 = 3004;
tcp_server(port, handle_randomizer_request, signal, "randomizer").await
}
#[tracing::instrument]
async fn call_number() -> io::Result<u8> {
let port_randomizer: u16 = 3004;
let mut connection_randomizer = TcpStream::connect(format!("127.0.0.1:{}", port_randomizer))
.await
.map_err(|err| {
error!(%err, "Unable to connect to randomizer service");
err
})?;
connection_randomizer.write_all(b"").await.map_err(|err| {
warn!(%err, "Unable to write to randomizer stream");
err
})?;
let chosen_number = connection_randomizer.read_u8().await.map_err(|err| {
warn!(%err, "Unable to read number choser");
err
})?;
info!(number = chosen_number, "New number chosen");
Ok(chosen_number)
}
#[tracing::instrument]
async fn game_server(signal: Receiver<()>) -> io::Result<()> {
info_span!("Init game server").in_scope(|| info!("Game server is starting"));
let port: u16 = 3003;
let _number = trace_span!("asking for new random number")
.in_scope(|| {
info!("test");
//sleep(Duration::from_millis(666));
call_number.in_current_span().inner()()
})
.await?;
let (tx, rx) = channel(1);
let (tx1, rx1) = channel(1);
tcp_server(port, handle_player(tx, rx1), signal, "game_server").await
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment