Skip to content

Instantly share code, notes, and snippets.

@nwisemanII
Created December 5, 2023 07:43
Show Gist options
  • Save nwisemanII/05f1e0e879133d51f6edfcf04a7f5a8d to your computer and use it in GitHub Desktop.
Save nwisemanII/05f1e0e879133d51f6edfcf04a7f5a8d to your computer and use it in GitHub Desktop.
Otel Logs to Loki Setup
// https://github.com/open-telemetry/opentelemetry-rust/blob/main/opentelemetry-otlp/examples/basic-otlp/src/main.rs
const LOGS_ENDPOINT: &str = "http://localhost:55680";
const COLLECTOR_ENDPOINT: &str = "http://localhost:4317";
const METRICS_ENDPOINT: &str = "http://localhost:4317";
// use env_logger::Logger;
// use log::logger;
// use color_eyre::eyre::Error;
use opentelemetry::global::{GlobalLoggerProvider, GlobalTracerProvider, GlobalMeterProvider};
// use hyper::service;
// use opentelemetry::global::GlobalLoggerProvider;
pub use opentelemetry::global::{logger_provider, meter_provider, tracer_provider};
pub use opentelemetry::global::{
shutdown_logger_provider, shutdown_meter_provider, shutdown_tracer_provider,
};
use opentelemetry::logs::LogError;
use opentelemetry::trace::TraceError;
use opentelemetry::{global, KeyValue};
use opentelemetry_otlp::{ExportConfig, Protocol, WithExportConfig};
use opentelemetry_sdk::logs::config;
use opentelemetry_sdk::trace::TracerProvider;
use tracing_appender::rolling;
use opentelemetry_appender_log::OpenTelemetryLogBridge;
use opentelemetry_sdk::{
logs::Logger,
metrics::{
reader::{DefaultAggregationSelector, DefaultTemporalitySelector},
MeterProvider, PeriodicReader
},
runtime,
trace::{RandomIdGenerator, Tracer},
Resource,
};
use opentelemetry_semantic_conventions::{
resource::{DEPLOYMENT_ENVIRONMENT, SERVICE_NAME, SERVICE_VERSION},
SCHEMA_URL,
};
// use opentelemetry_sdk::logs::BatchLogProcessorBuilder;
use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge;
use tracing_opentelemetry::{MetricsLayer, OpenTelemetryLayer}; //, OpenTelemetrySpanExt};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
use tracing_subscriber::fmt::writer::MakeWriterExt;
// For Timer
use tracing_subscriber::fmt::time::ChronoLocal;
// use tracing::{error, warn};
// Create a Resource that captures information about the entity for which telemetry is recorded.
fn resource() -> Resource {
Resource::from_schema_url(
[
KeyValue::new(SERVICE_NAME, env!("CARGO_PKG_NAME")),
KeyValue::new(SERVICE_VERSION, env!("CARGO_PKG_VERSION")),
KeyValue::new(DEPLOYMENT_ENVIRONMENT, "develop"),
],
SCHEMA_URL,
)
}
enum ConfigType {
Logs,
Metrics,
Traces,
}
fn get_export_config(config_type: ConfigType) -> opentelemetry_otlp::ExportConfig {
match config_type {
ConfigType::Logs => ExportConfig {
endpoint: LOGS_ENDPOINT.to_string(),
protocol: Protocol::Grpc,
timeout: std::time::Duration::from_secs(3),
},
ConfigType::Metrics => ExportConfig {
endpoint: METRICS_ENDPOINT.to_string(),
protocol: Protocol::Grpc,
timeout: std::time::Duration::from_secs(3),
},
ConfigType::Traces => ExportConfig {
endpoint: COLLECTOR_ENDPOINT.to_string(),
protocol: Protocol::Grpc,
timeout: std::time::Duration::from_secs(3),
},
_ => panic!("Invalid config type"),
}
}
// Construct MeterProvider for MetricsLayer
fn init_meter_provider() -> color_eyre::Result<MeterProvider> {
let exporter = opentelemetry_otlp::new_exporter()
.tonic()
.with_export_config(get_export_config(ConfigType::Metrics))
.build_metrics_exporter(
Box::new(DefaultAggregationSelector::new()),
Box::new(DefaultTemporalitySelector::new()),
)
.unwrap();
let reader = PeriodicReader::builder(exporter, runtime::Tokio)
.with_interval(std::time::Duration::from_secs(30))
.build();
// For debugging in development
let stdout_reader = PeriodicReader::builder(
opentelemetry_stdout::MetricsExporter::default(),
runtime::Tokio,
)
.build();
// // Rename foo metrics to foo_named and drop key_2 attribute
// let view_foo = |instrument: &Instrument| -> Option<Stream> {
// if instrument.name == "foo" {
// Some(
// Stream::new()
// .name("foo_named")
// .allowed_attribute_keys([Key::from("key_1")]),
// )
// } else {
// None
// }
// };
// // Set Custom histogram boundaries for baz metrics
// let view_baz = |instrument: &Instrument| -> Option<Stream> {
// if instrument.name == "baz" {
// Some(
// Stream::new()
// .name("baz")
// .aggregation(Aggregation::ExplicitBucketHistogram {
// boundaries: vec![0.0, 2.0, 4.0, 8.0, 16.0, 32.0, 64.0],
// record_min_max: true,
// }),
// )
// } else {
// None
// }
// };
let meter_provider = MeterProvider::builder()
.with_resource(resource())
.with_reader(reader)
.with_reader(stdout_reader)
// .with_view(view_foo)
// .with_view(view_baz)
.build();
global::set_meter_provider(meter_provider.clone());
Ok(meter_provider)
}
// Construct Tracer for OpenTelemetryLayer
fn init_tracer() -> color_eyre::Result<Tracer, TraceError> {
opentelemetry_otlp::new_pipeline()
.tracing()
.with_trace_config(
opentelemetry_sdk::trace::Config::default()
.with_resource(resource())
.with_id_generator(RandomIdGenerator::default()), // .with_resource(resource()),
)
// .with_batch_config(BatchConfig::default())
// .with_exporter(opentelemetry_stdout::LogExporter::default())
.with_exporter(
opentelemetry_otlp::new_exporter()
.tonic()
.with_export_config(get_export_config(ConfigType::Traces)),
)
.install_batch(runtime::Tokio)
}
fn init_logger() -> color_eyre::Result<Logger, LogError> {
let exporter = opentelemetry_otlp::new_exporter()
.tonic()
.with_export_config(get_export_config(ConfigType::Logs));
opentelemetry_otlp::new_pipeline()
.logging()
.with_log_config(
opentelemetry_sdk::logs::Config::default()
.with_resource(resource())
// .with_id_generator(RandomIdGenerator::default()),
)
.with_exporter(exporter)
.install_batch(runtime::Tokio)
}
// fn init_loki() -> color_eyre::Result<(), tracing_loki::Error> {
// (layer, task) : (tracing_loki::layer, _ )
// }
// Initialize tracing-subscriber and return OtelGuard for opentelemetry-related termination processing
fn init_tracing_subscriber() -> color_eyre::Result<OtelGuard> {
let tracer = init_tracer()?;
let trace_provider = tracer.provider().unwrap();
global::set_tracer_provider(trace_provider.clone());
let meter_provider = init_meter_provider()?;
global::set_meter_provider(meter_provider.clone());
let logger = init_logger()?;
let log_provider = logger.provider().unwrap();
let log_trace_bridge = OpenTelemetryTracingBridge::new(&log_provider);
let debug_file = rolling::minutely("./logs", "debug");
// Log warnings and errors to a separate file. Since we expect these events
// to occur less frequently, roll that file on a daily basis instead.
let warn_file = rolling::daily("./logs", "warnings").with_max_level(tracing::Level::WARN);
let all_files = debug_file.and(warn_file);
let (layer, task) = tracing_loki::builder()
.label("host", "mine")?
.extra_field("pid", format!("{}", std::process::id()))?
.build_url(surf::Url::parse("http://127.0.0.1:3100").unwrap())?;
let loki_handle = tokio::spawn(task);
tracing_subscriber::registry()
// .with(logging_bridge)
.with(tracing_subscriber::EnvFilter::from_default_env())
.with(
tracing_subscriber::fmt::layer()
.json()
// .with_thread_ids(true)
// .with_thread_names(true)
.with_file(true)
.with_line_number(true)
.with_timer(ChronoLocal::rfc_3339())
.with_writer(all_files)
)
.with(
tracing_subscriber::fmt::layer()
.pretty()
.with_thread_ids(true)
.with_thread_names(true)
.with_file(true)
.with_line_number(true)
.with_timer(ChronoLocal::rfc_3339())
)
.with(OpenTelemetryLayer::new(tracer.clone()))
.with(MetricsLayer::new(meter_provider.clone()))
.with(log_trace_bridge)
.with(layer)
// .with(log_bridge)
// .with(OpenTelemetryLayer::new(tracer.clone()))
// .with(Open)
// .with(OpenTelemetryLayer::new(loki_layer))
.init();
// let log_log_bridge = OpenTelemetryLogBridge::new(&global::logger_provider());
let tracer = global::tracer_provider();
// let meter = global::meter_provider();
let logger = global::logger_provider();
let _guard = OtelGuard {
tracer_provider: tracer,
meter_provider,
logger_provider: logger,
loki_handle,
};
Ok(_guard)
}
pub struct OtelGuard {
pub tracer_provider: GlobalTracerProvider,
pub meter_provider: MeterProvider,
pub logger_provider: GlobalLoggerProvider,
pub loki_handle: tokio::task::JoinHandle<()>,
}
impl Drop for OtelGuard {
fn drop(&mut self) {
if let Err(err) = self.meter_provider.shutdown() {
eprintln!("{err:?}");
}
self.meter_provider.shutdown().unwrap();
opentelemetry::global::shutdown_logger_provider();
opentelemetry::global::shutdown_tracer_provider();
}
}
type TelError = Box<dyn std::error::Error + Send + Sync + 'static>;
pub fn initialize() -> color_eyre::Result<OtelGuard> {
// let tel = INIT.call_once(|| {
// env_logger::init();
let _tel = init_tracing_subscriber()?;
// });
// todo!();
Ok(_tel)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment