Skip to content

Instantly share code, notes, and snippets.

@djc
Created February 6, 2022 21:33
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save djc/ba2162c537098ac983c5294a6c1753f5 to your computer and use it in GitHub Desktop.
Save djc/ba2162c537098ac983c5294a6c1753f5 to your computer and use it in GitHub Desktop.
opentelemetry-stackdriver example
use std::env;
use std::net::SocketAddr;
use std::str::FromStr;
use async_trait::async_trait;
use gcp_auth::AuthenticationManager;
use mendes::application::{Context, Responder, Server};
use mendes::http::request::Parts;
use mendes::http::{HeaderMap, Response, StatusCode};
use mendes::hyper::Body;
use mendes::{handler, route, Application};
use opentelemetry::runtime::Tokio;
use opentelemetry::sdk::trace::{Config, Sampler, TracerProvider};
use opentelemetry::trace::TracerProvider as _;
use opentelemetry::trace::{SpanContext, SpanId, TraceContextExt, TraceFlags, TraceId, TraceState};
use opentelemetry::Context as OtelContext;
use opentelemetry_stackdriver::{GcpAuthorizer, LogContext, Resource, StackDriverExporter};
use thiserror::Error;
use tracing::{info, warn, Span};
use tracing_opentelemetry::OpenTelemetrySpanExt;
use tracing_subscriber::prelude::*;
#[tokio::main]
async fn main() {
env_logger::init();
let authentication_manager = AuthenticationManager::new().await.unwrap();
let project_id = authentication_manager.project_id().await.unwrap();
let log_context = LogContext {
log_id: "cloud-trace-test".into(),
resource: Resource {
r#type: "global".into(),
labels: [("project_id".to_owned(), project_id)]
.into_iter()
.collect(),
},
};
let authorizer = GcpAuthorizer::new().await.unwrap();
let (exporter, driver) = StackDriverExporter::builder()
.log_context(log_context)
.build(authorizer)
.await
.unwrap();
tokio::spawn(driver);
let provider = TracerProvider::builder()
.with_batch_exporter(exporter.clone(), Tokio)
.with_config(Config {
sampler: Box::new(Sampler::TraceIdRatioBased(CLOUD_TRACE_RATE)),
..Default::default()
})
.build();
tracing_subscriber::registry()
.with(tracing_opentelemetry::layer().with_tracer(provider.tracer("tracing")))
.try_init()
.unwrap();
let app = App { exporter };
let port = env::var("PORT")
.as_deref()
.unwrap_or("8000")
.parse()
.unwrap();
let addr = SocketAddr::new("0.0.0.0".parse().unwrap(), port);
println!("serve on {}...", addr);
app.serve(&addr).await.unwrap();
}
#[handler(GET)]
#[tracing::instrument(name = "hello", level = "info", skip(app, req))]
async fn hello(app: &App, req: &Parts) -> Result<Response<Body>, Error> {
link(&req.headers);
if let Some(hv) = req.headers.get("x-cloud-trace-context") {
eprintln!("ctx: {}", hv.to_str().unwrap());
};
info!("foodaddle");
warn!("Hello, world!");
eprintln!("pending count: {}", app.exporter.pending_count());
Ok(Response::builder()
.status(StatusCode::OK)
.body("Hello, world!".into())
.unwrap())
}
pub fn link(headers: &HeaderMap) -> Option<()> {
let hv = headers.get("x-cloud-trace-context")?;
let ctx = hv.to_str().ok()?;
let mut parts = ctx.splitn(2, '/');
let trace_id = TraceId::from_hex(parts.next()?).ok()?;
let mut parts = parts.next()?.splitn(2, ";o=");
let span_id = SpanId::from_hex(parts.next()?).ok()?;
let trace_flags = TraceFlags::new(u8::from_str(parts.next()?).ok()?);
let span_cx = SpanContext::new(trace_id, span_id, trace_flags, true, TraceState::default());
Span::current().set_parent(OtelContext::new().with_remote_span_context(span_cx));
None
}
#[handler(GET)]
#[tracing::instrument(name = "account", level = "info")]
async fn account(_: &App) -> Result<Response<Body>, Error> {
let project_id = project_id().await.unwrap();
Ok(Response::builder()
.status(StatusCode::OK)
.body(project_id.into())
.unwrap())
}
async fn project_id() -> Result<String, Error> {
Ok(reqwest::Client::new()
.get("http://metadata.google.internal/computeMetadata/v1/project/project-id")
.header("Metadata-Flavor", "Google")
.send()
.await?
.text()
.await?)
}
struct App {
exporter: StackDriverExporter,
}
#[async_trait]
impl Application for App {
type RequestBody = Body;
type ResponseBody = Body;
type Error = Error;
async fn handle(mut cx: Context<Self>) -> Response<Body> {
route!(match cx.path() {
Some("account") => account,
_ => hello,
})
}
}
#[derive(Debug, Error)]
enum Error {
#[error("{0}")]
Mendes(#[from] mendes::Error),
#[error("{0}")]
Reqwest(#[from] reqwest::Error),
}
impl From<&Error> for StatusCode {
fn from(e: &Error) -> StatusCode {
match e {
Error::Mendes(e) => StatusCode::from(e),
_ => StatusCode::INTERNAL_SERVER_ERROR,
}
}
}
impl Responder<App> for Error {
fn into_response(self, _: &App, _: &Parts) -> Response<Body> {
Response::builder()
.status(StatusCode::from(&self))
.body(self.to_string().into())
.unwrap()
}
}
const CLOUD_TRACE_RATE: f64 = 1.0;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment