tdm_server_rust/telemetry/
layers.rs1use crate::{
4 config::TelemetryConfig,
5 profile::DevProfileLayer,
6 telemetry::{
7 logs, otlp, skywalking,
8 },
9};
10use std::path::Path;
11use std::sync::Arc;
12use tracing::Level;
13use tracing::Subscriber;
14use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge;
15use tracing_subscriber::{
16 fmt::{format::Writer, FormatEvent, FormatFields, time::FormatTime},
17 layer::SubscriberExt,
18 util::SubscriberInitExt,
19 EnvFilter, Layer, Registry,
20};
21
22pub struct TelemetryGuard {
24 providers: Option<otlp::OtelProviders>,
26 skywalking: Option<skywalking::SkyWalkingHandle>,
28 _file_guard: Option<tracing_appender::non_blocking::WorkerGuard>,
30}
31
32impl TelemetryGuard {
33 pub fn shutdown(self) {
35 if let Some(p) = self.providers {
36 if let Some(tp) = p.tracer_provider {
37 let _ = tp.shutdown();
38 }
39 let _ = p.logger_provider.shutdown();
40 }
41 }
42
43 pub fn sw_logger(&self) -> Option<Arc<::skywalking::logging::logger::Logger>> {
45 self.skywalking.as_ref().map(|h| h.logger.clone())
46 }
47}
48
49pub async fn init(profile: &str, cfg: &TelemetryConfig) -> TelemetryGuard {
51 let is_dev = matches!(profile, "dev" | "dev-h2");
52 let filter = build_env_filter(cfg);
53
54 let skywalking = if cfg.enabled && cfg.skywalking.export_traces {
55 match skywalking::init_skywalking(cfg).await {
56 Ok(h) => Some(h),
57 Err(e) => {
58 eprintln!("SkyWalking 初始化失败,继续无 Segment 导出: {e}");
59 None
60 }
61 }
62 } else {
63 None
64 };
65
66 let providers = if cfg.enabled {
67 match otlp::build_providers(profile, cfg) {
68 Ok(p) => Some(p),
69 Err(e) => {
70 eprintln!("OTel Log 初始化失败,继续无 OTLP 日志: {e}");
71 None
72 }
73 }
74 } else {
75 None
76 };
77
78 let mut file_guard = None;
79 let mut fmt_layer = tracing_subscriber::fmt::layer()
80 .with_target(false)
81 .with_ansi(is_dev)
82 .event_format(OtelLogFormat)
83 .boxed();
84
85 if !is_dev && cfg.log_file.non_blocking {
86 let path = Path::new(&cfg.log_file.path);
87 if let Some(dir) = path.parent() {
88 let _ = std::fs::create_dir_all(dir);
89 }
90 let file_name = path
91 .file_name()
92 .and_then(|s| s.to_str())
93 .unwrap_or("app.log");
94 let dir = path
95 .parent()
96 .map(|p| p.to_string_lossy().into_owned())
97 .unwrap_or_else(|| "./logs".into());
98 let file_appender = tracing_appender::rolling::daily(&dir, file_name);
99 let (non_blocking, guard) = tracing_appender::non_blocking(file_appender);
100 file_guard = Some(guard);
101 fmt_layer = tracing_subscriber::fmt::layer()
102 .with_target(false)
103 .with_ansi(false)
104 .event_format(OtelLogFormat)
105 .with_writer(non_blocking)
106 .boxed();
107 }
108
109 let sw_log_layer: skywalking::SwLogLayer = skywalking
110 .as_ref()
111 .filter(|_| cfg.skywalking.export_native_logs)
112 .map(|h| {
113 let min = parse_level(&cfg.log_level);
114 skywalking::SwLogLayer::Active(skywalking::SkyWalkingLogLayer::new(
115 h.logger.clone(),
116 min,
117 ))
118 })
119 .unwrap_or(skywalking::SwLogLayer::noop());
120
121 let registry = Registry::default()
122 .with(filter)
123 .with(fmt_layer)
124 .with(sw_log_layer);
125
126 if let Some(ref p) = providers {
127 match (logs::export_enabled(cfg), is_dev) {
128 (true, true) => registry
129 .with(OpenTelemetryTracingBridge::new(&p.logger_provider))
130 .with(DevProfileLayer::new())
131 .init(),
132 (true, false) => registry
133 .with(OpenTelemetryTracingBridge::new(&p.logger_provider))
134 .init(),
135 (false, true) => registry.with(DevProfileLayer::new()).init(),
136 (false, false) => registry.init(),
137 }
138 } else if is_dev {
139 registry.with(DevProfileLayer::new()).init();
140 } else {
141 registry.init();
142 }
143
144 TelemetryGuard {
145 providers,
146 skywalking,
147 _file_guard: file_guard,
148 }
149}
150
151fn parse_level(s: &str) -> Level {
152 match s.to_lowercase().as_str() {
153 "trace" => Level::TRACE,
154 "debug" => Level::DEBUG,
155 "warn" => Level::WARN,
156 "error" => Level::ERROR,
157 _ => Level::INFO,
158 }
159}
160
161fn build_env_filter(cfg: &TelemetryConfig) -> EnvFilter {
163 let default = format!(
164 "tdm_server_rust={},tower_http=warn,sqlx=warn",
165 cfg.log_level
166 );
167 EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new(default))
168}
169
170struct OtelLogFormat;
172
173impl<S, N> FormatEvent<S, N> for OtelLogFormat
174where
175 S: Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a>,
176 N: for<'a> FormatFields<'a> + 'static,
177{
178 fn format_event(
179 &self,
180 ctx: &tracing_subscriber::fmt::FmtContext<'_, S, N>,
181 mut writer: Writer<'_>,
182 event: &tracing::Event<'_>,
183 ) -> std::fmt::Result {
184 use tracing_subscriber::fmt::time::SystemTime;
185
186 let fmt_time = SystemTime;
187 fmt_time.format_time(&mut writer)?;
188
189 write!(writer, " ")?;
190 let level = *event.metadata().level();
191 write!(writer, "{level:>5} ")?;
192
193 if let Some(trace_id) = skywalking::current_trace_id() {
194 write!(writer, "trace_id={trace_id} ")?;
195 }
196
197 ctx.field_format().format_fields(writer.by_ref(), event)?;
198 writeln!(writer)
199 }
200}