Skip to content

Instantly share code, notes, and snippets.

@Hamzakh777
Last active February 26, 2024 22:34
Show Gist options
  • Save Hamzakh777/2d6af39a9def7222eab95ccd9cef564a to your computer and use it in GitHub Desktop.
Save Hamzakh777/2d6af39a9def7222eab95ccd9cef564a to your computer and use it in GitHub Desktop.
Rust tracing
#[instrument]
async fn async_tracing() {
warn!("Just warning from inside an async function");
async_tracing_inner().await;
let current_span = tracing::Span::current();
let _ = tokio::spawn(subtask().instrument(current_span)).await;
}
#[instrument]
async fn async_tracing_inner() {
warn!("This is the inner function");
}
async fn subtask() {
info!("polling subtask");
}
[dependencies]
serde = "1.0.197"
tokio = { version = "1.35.1", features = ["full"] }
tracing = "0.1.40"
tracing-subscriber = "0.3.18"
struct Payload {
data: String, // can be anything
propagation_context: PropagationContext
}
#[instrument]
fn send_request() {
let parent_context = Span::current().context();
let propagation_context = PropagationContext::inject(&parent_context);
Payload {
data: "dummy data".to_string(),
propagation_context,
}
}
async fn receive_request(payload: Payload) {
let parent_context = payload.propagation_context.extract();
let span = Span::current();
span.set_parent(parent_context);
// the rest of your code here
}
use opentelemetry::{global};
use opentelemetry_sdk::{
propagation::TraceContextPropagator,
};
fn setup_tracing() {
// rest of the code
// ================ ADD THIS ================
global::set_text_map_propagator(TraceContextPropagator::new());
}
use opentelemetry::{
global,
propagation::{Extractor, Injector},
};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
/// Serializable datastructure to hold the opentelemetry propagation context.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PropagationContext(HashMap<String, String>);
impl PropagationContext {
fn empty() -> Self {
Self(HashMap::new())
}
pub fn inject(context: &opentelemetry::Context) -> Self {
global::get_text_map_propagator(|propagator| {
let mut propagation_context = PropagationContext::empty();
propagator.inject_context(context, &mut propagation_context);
propagation_context
})
}
pub fn extract(&self) -> opentelemetry::Context {
global::get_text_map_propagator(|propagator| propagator.extract(self))
}
}
impl Injector for PropagationContext {
fn set(&mut self, key: &str, value: String) {
self.0.insert(key.to_owned(), value);
}
}
impl Extractor for PropagationContext {
fn get(&self, key: &str) -> Option<&str> {
let key = key.to_owned();
self.0.get(&key).map(|v| v.as_ref())
}
fn keys(&self) -> Vec<&str> {
self.0.keys().map(|k| k.as_ref()).collect()
}
}
use std::sync::mpsc::{self, Receiver, Sender};
use std::thread::{self, JoinHandle};
#[derive(Debug)]
struct Message {
data: String,
span: Span,
}
#[instrument]
async fn channels_tracing() {
info!("channels_tracing is starting");
let (tx, mut rx) = mpsc::channel::<Message>();
let handle1 = channels_tracing_thread_1(tx); // sends a message to thread 2
let handle2 = channels_tracing_thread_2(rx); // receives the message from thread 1
// Wait for both threads to finish
handle1.join().unwrap();
handle2.join().unwrap();
}
#[instrument(skip_all)]
fn channels_tracing_thread_1(tx: Sender<Message>) -> JoinHandle<()> {
let current_span = tracing::Span::current();
thread::spawn(move || {
current_span.in_scope(|| {
let span = info_span!("tokio spawn thread 1 span");
info!("Sending data from thread 1");
let _ = tx.send(Message {
data: String::from("meow"),
span: span,
});
})
})
}
#[instrument(skip_all)]
fn channels_tracing_thread_2(rx: Receiver<Message>) -> JoinHandle<()> {
thread::spawn(move || {
while let Ok(message) = rx.recv() {
let parent_span = message.span;
// handling the span should be done inside the parent span
println!("Got a message");
parent_span.in_scope(|| {
info!("Got the message in thread 2 {}", message.data);
});
}
})
}
use tokio::sync::mpsc::{self, Receiver, Sender};
use tracing::{info, instrument, warn, Instrument, Span};
pub type Error = Box<dyn std::error::Error + Send + Sync>;
#[tokio::main]
async fn main() -> Result<(), Error> {
setup_tracing();
channels_tracing().await;
Ok(())
}
#[derive(Debug)]
struct Message {
data: String,
span: Span,
}
#[instrument]
async fn channels_tracing() {
info!("channels_tracing is starting");
let (tx, mut rx) = mpsc::channel::<Message>(10);
let tx2 = tx.clone();
channels_tracing_thread_1(tx).await;
channels_tracing_thread_2(tx2, rx).await;
}
#[instrument]
async fn channels_tracing_thread_1(tx: Sender<Message>) {
tokio::spawn(async move {
info!("Sending data from thread 1");
let _ = tx.send(Message {
data: String::from("meow"),
span: tracing::Span::current(),
});
});
}
#[instrument]
async fn channels_tracing_thread_2(tx2: Sender<Message>, mut rx: Receiver<Message>) {
tokio::spawn(async move {
info!("Receiving data in thread 2");
while let Some(message) = rx.recv().await {
info!("Got the message in thread 2 {}", message.data);
}
});
}
use opentelemetry::{global};
use opentelemetry_sdk::{
propagation::TraceContextPropagator,
};
fn setup_tracing() {
tracing_subscriber::fmt()
// enable everything
.with_max_level(tracing::Level::TRACE)
.compact()
// Display source code file paths
.with_file(true)
// Display source code line numbers
.with_line_number(true)
// Display the thread ID an event was recorded on
.with_thread_ids(true)
// Don't display the event's target (module path)
.with_target(false)
// sets this to be the default, global collector for this application.
.init();
}
#[tokio::main]
async fn main() -> Result<(), Error> {
setup_tracing();
// your code here
Ok(())
}
use tracing::{warn, instrument};
fn main() {
sync_tracing();
}
#[instrument]
fn sync_tracing() {
warn!("event 1");
sync_tracing_sub();
}
#[instrument]
fn sync_tracing_sub() {
warn!("event 2");
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment