Skip to main content

tdm_server_rust/telemetry/
layers.rs

1//! tracing 订阅器 Layer 组装与日志 trace_id 格式
2
3use crate::{
4    config::TelemetryConfig,
5    profile::DevProfileLayer,
6    telemetry::{
7        logs, otlp, skywalking,
8    },
9};
10use std::path::Path;
11use std::sync::Arc;
12use tracing::Level;
13use tracing::Subscriber;
14use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge;
15use tracing_subscriber::{
16    fmt::{format::Writer, FormatEvent, FormatFields, time::FormatTime},
17    layer::SubscriberExt,
18    util::SubscriberInitExt,
19    EnvFilter, Layer, Registry,
20};
21
22/// 可观测性守卫:持有 SW reporter、OTel Logger 与文件 appender
23pub struct TelemetryGuard {
24    /// OTel Log Provider
25    providers: Option<otlp::OtelProviders>,
26    /// SkyWalking gRPC 上报
27    skywalking: Option<skywalking::SkyWalkingHandle>,
28    /// 异步日志落盘 guard(pro)
29    _file_guard: Option<tracing_appender::non_blocking::WorkerGuard>,
30}
31
32impl TelemetryGuard {
33    /// 进程退出前 flush
34    pub fn shutdown(self) {
35        if let Some(p) = self.providers {
36            if let Some(tp) = p.tracer_provider {
37                let _ = tp.shutdown();
38            }
39            let _ = p.logger_provider.shutdown();
40        }
41    }
42
43    /// SW Logger(debug_log / error_log 结构化上报)
44    pub fn sw_logger(&self) -> Option<Arc<::skywalking::logging::logger::Logger>> {
45        self.skywalking.as_ref().map(|h| h.logger.clone())
46    }
47}
48
49/// 初始化 tracing + SW 原生 + OTLP Log
50pub async fn init(profile: &str, cfg: &TelemetryConfig) -> TelemetryGuard {
51    let is_dev = matches!(profile, "dev" | "dev-h2");
52    let filter = build_env_filter(cfg);
53
54    let skywalking = if cfg.enabled && cfg.skywalking.export_traces {
55        match skywalking::init_skywalking(cfg).await {
56            Ok(h) => Some(h),
57            Err(e) => {
58                eprintln!("SkyWalking 初始化失败,继续无 Segment 导出: {e}");
59                None
60            }
61        }
62    } else {
63        None
64    };
65
66    let providers = if cfg.enabled {
67        match otlp::build_providers(profile, cfg) {
68            Ok(p) => Some(p),
69            Err(e) => {
70                eprintln!("OTel Log 初始化失败,继续无 OTLP 日志: {e}");
71                None
72            }
73        }
74    } else {
75        None
76    };
77
78    let mut file_guard = None;
79    let mut fmt_layer = tracing_subscriber::fmt::layer()
80        .with_target(false)
81        .with_ansi(is_dev)
82        .event_format(OtelLogFormat)
83        .boxed();
84
85    if !is_dev && cfg.log_file.non_blocking {
86        let path = Path::new(&cfg.log_file.path);
87        if let Some(dir) = path.parent() {
88            let _ = std::fs::create_dir_all(dir);
89        }
90        let file_name = path
91            .file_name()
92            .and_then(|s| s.to_str())
93            .unwrap_or("app.log");
94        let dir = path
95            .parent()
96            .map(|p| p.to_string_lossy().into_owned())
97            .unwrap_or_else(|| "./logs".into());
98        let file_appender = tracing_appender::rolling::daily(&dir, file_name);
99        let (non_blocking, guard) = tracing_appender::non_blocking(file_appender);
100        file_guard = Some(guard);
101        fmt_layer = tracing_subscriber::fmt::layer()
102            .with_target(false)
103            .with_ansi(false)
104            .event_format(OtelLogFormat)
105            .with_writer(non_blocking)
106            .boxed();
107    }
108
109    let sw_log_layer: skywalking::SwLogLayer = skywalking
110        .as_ref()
111        .filter(|_| cfg.skywalking.export_native_logs)
112        .map(|h| {
113            let min = parse_level(&cfg.log_level);
114            skywalking::SwLogLayer::Active(skywalking::SkyWalkingLogLayer::new(
115                h.logger.clone(),
116                min,
117            ))
118        })
119        .unwrap_or(skywalking::SwLogLayer::noop());
120
121    let registry = Registry::default()
122        .with(filter)
123        .with(fmt_layer)
124        .with(sw_log_layer);
125
126    if let Some(ref p) = providers {
127        match (logs::export_enabled(cfg), is_dev) {
128            (true, true) => registry
129                .with(OpenTelemetryTracingBridge::new(&p.logger_provider))
130                .with(DevProfileLayer::new())
131                .init(),
132            (true, false) => registry
133                .with(OpenTelemetryTracingBridge::new(&p.logger_provider))
134                .init(),
135            (false, true) => registry.with(DevProfileLayer::new()).init(),
136            (false, false) => registry.init(),
137        }
138    } else if is_dev {
139        registry.with(DevProfileLayer::new()).init();
140    } else {
141        registry.init();
142    }
143
144    TelemetryGuard {
145        providers,
146        skywalking,
147        _file_guard: file_guard,
148    }
149}
150
151fn parse_level(s: &str) -> Level {
152    match s.to_lowercase().as_str() {
153        "trace" => Level::TRACE,
154        "debug" => Level::DEBUG,
155        "warn" => Level::WARN,
156        "error" => Level::ERROR,
157        _ => Level::INFO,
158    }
159}
160
161/// 构建 EnvFilter
162fn build_env_filter(cfg: &TelemetryConfig) -> EnvFilter {
163    let default = format!(
164        "tdm_server_rust={},tower_http=warn,sqlx=warn",
165        cfg.log_level
166    );
167    EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new(default))
168}
169
170/// 日志格式:输出 SW trace_id
171struct OtelLogFormat;
172
173impl<S, N> FormatEvent<S, N> for OtelLogFormat
174where
175    S: Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a>,
176    N: for<'a> FormatFields<'a> + 'static,
177{
178    fn format_event(
179        &self,
180        ctx: &tracing_subscriber::fmt::FmtContext<'_, S, N>,
181        mut writer: Writer<'_>,
182        event: &tracing::Event<'_>,
183    ) -> std::fmt::Result {
184        use tracing_subscriber::fmt::time::SystemTime;
185
186        let fmt_time = SystemTime;
187        fmt_time.format_time(&mut writer)?;
188
189        write!(writer, " ")?;
190        let level = *event.metadata().level();
191        write!(writer, "{level:>5} ")?;
192
193        if let Some(trace_id) = skywalking::current_trace_id() {
194            write!(writer, "trace_id={trace_id} ")?;
195        }
196
197        ctx.field_format().format_fields(writer.by_ref(), event)?;
198        writeln!(writer)
199    }
200}