Created
February 6, 2022 21:33
-
-
Save djc/ba2162c537098ac983c5294a6c1753f5 to your computer and use it in GitHub Desktop.
opentelemetry-stackdriver example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use std::env; | |
use std::net::SocketAddr; | |
use std::str::FromStr; | |
use async_trait::async_trait; | |
use gcp_auth::AuthenticationManager; | |
use mendes::application::{Context, Responder, Server}; | |
use mendes::http::request::Parts; | |
use mendes::http::{HeaderMap, Response, StatusCode}; | |
use mendes::hyper::Body; | |
use mendes::{handler, route, Application}; | |
use opentelemetry::runtime::Tokio; | |
use opentelemetry::sdk::trace::{Config, Sampler, TracerProvider}; | |
use opentelemetry::trace::TracerProvider as _; | |
use opentelemetry::trace::{SpanContext, SpanId, TraceContextExt, TraceFlags, TraceId, TraceState}; | |
use opentelemetry::Context as OtelContext; | |
use opentelemetry_stackdriver::{GcpAuthorizer, LogContext, Resource, StackDriverExporter}; | |
use thiserror::Error; | |
use tracing::{info, warn, Span}; | |
use tracing_opentelemetry::OpenTelemetrySpanExt; | |
use tracing_subscriber::prelude::*; | |
#[tokio::main] | |
async fn main() { | |
env_logger::init(); | |
let authentication_manager = AuthenticationManager::new().await.unwrap(); | |
let project_id = authentication_manager.project_id().await.unwrap(); | |
let log_context = LogContext { | |
log_id: "cloud-trace-test".into(), | |
resource: Resource { | |
r#type: "global".into(), | |
labels: [("project_id".to_owned(), project_id)] | |
.into_iter() | |
.collect(), | |
}, | |
}; | |
let authorizer = GcpAuthorizer::new().await.unwrap(); | |
let (exporter, driver) = StackDriverExporter::builder() | |
.log_context(log_context) | |
.build(authorizer) | |
.await | |
.unwrap(); | |
tokio::spawn(driver); | |
let provider = TracerProvider::builder() | |
.with_batch_exporter(exporter.clone(), Tokio) | |
.with_config(Config { | |
sampler: Box::new(Sampler::TraceIdRatioBased(CLOUD_TRACE_RATE)), | |
..Default::default() | |
}) | |
.build(); | |
tracing_subscriber::registry() | |
.with(tracing_opentelemetry::layer().with_tracer(provider.tracer("tracing"))) | |
.try_init() | |
.unwrap(); | |
let app = App { exporter }; | |
let port = env::var("PORT") | |
.as_deref() | |
.unwrap_or("8000") | |
.parse() | |
.unwrap(); | |
let addr = SocketAddr::new("0.0.0.0".parse().unwrap(), port); | |
println!("serve on {}...", addr); | |
app.serve(&addr).await.unwrap(); | |
} | |
#[handler(GET)] | |
#[tracing::instrument(name = "hello", level = "info", skip(app, req))] | |
async fn hello(app: &App, req: &Parts) -> Result<Response<Body>, Error> { | |
link(&req.headers); | |
if let Some(hv) = req.headers.get("x-cloud-trace-context") { | |
eprintln!("ctx: {}", hv.to_str().unwrap()); | |
}; | |
info!("foodaddle"); | |
warn!("Hello, world!"); | |
eprintln!("pending count: {}", app.exporter.pending_count()); | |
Ok(Response::builder() | |
.status(StatusCode::OK) | |
.body("Hello, world!".into()) | |
.unwrap()) | |
} | |
pub fn link(headers: &HeaderMap) -> Option<()> { | |
let hv = headers.get("x-cloud-trace-context")?; | |
let ctx = hv.to_str().ok()?; | |
let mut parts = ctx.splitn(2, '/'); | |
let trace_id = TraceId::from_hex(parts.next()?).ok()?; | |
let mut parts = parts.next()?.splitn(2, ";o="); | |
let span_id = SpanId::from_hex(parts.next()?).ok()?; | |
let trace_flags = TraceFlags::new(u8::from_str(parts.next()?).ok()?); | |
let span_cx = SpanContext::new(trace_id, span_id, trace_flags, true, TraceState::default()); | |
Span::current().set_parent(OtelContext::new().with_remote_span_context(span_cx)); | |
None | |
} | |
#[handler(GET)] | |
#[tracing::instrument(name = "account", level = "info")] | |
async fn account(_: &App) -> Result<Response<Body>, Error> { | |
let project_id = project_id().await.unwrap(); | |
Ok(Response::builder() | |
.status(StatusCode::OK) | |
.body(project_id.into()) | |
.unwrap()) | |
} | |
async fn project_id() -> Result<String, Error> { | |
Ok(reqwest::Client::new() | |
.get("http://metadata.google.internal/computeMetadata/v1/project/project-id") | |
.header("Metadata-Flavor", "Google") | |
.send() | |
.await? | |
.text() | |
.await?) | |
} | |
struct App { | |
exporter: StackDriverExporter, | |
} | |
#[async_trait] | |
impl Application for App { | |
type RequestBody = Body; | |
type ResponseBody = Body; | |
type Error = Error; | |
async fn handle(mut cx: Context<Self>) -> Response<Body> { | |
route!(match cx.path() { | |
Some("account") => account, | |
_ => hello, | |
}) | |
} | |
} | |
#[derive(Debug, Error)] | |
enum Error { | |
#[error("{0}")] | |
Mendes(#[from] mendes::Error), | |
#[error("{0}")] | |
Reqwest(#[from] reqwest::Error), | |
} | |
impl From<&Error> for StatusCode { | |
fn from(e: &Error) -> StatusCode { | |
match e { | |
Error::Mendes(e) => StatusCode::from(e), | |
_ => StatusCode::INTERNAL_SERVER_ERROR, | |
} | |
} | |
} | |
impl Responder<App> for Error { | |
fn into_response(self, _: &App, _: &Parts) -> Response<Body> { | |
Response::builder() | |
.status(StatusCode::from(&self)) | |
.body(self.to_string().into()) | |
.unwrap() | |
} | |
} | |
const CLOUD_TRACE_RATE: f64 = 1.0; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment