Skip to main content

tdm_server_rust/telemetry/skywalking/
reporter.rs

1//! SkyWalking GrpcReporter 初始化与全局 Tracer / Logger
2
3use crate::config::TelemetryConfig;
4use ::skywalking::{
5    logging::logger::Logger,
6    reporter::grpc::GrpcReporter,
7    trace::tracer::{set_global_tracer, Tracer},
8};
9use std::sync::{Arc, OnceLock};
10use tokio::task::JoinHandle;
11
12/// SW Agent 是否已完成 init(测试环境未 init 时为 false)
13static SW_READY: OnceLock<()> = OnceLock::new();
14
15/// 标记 SkyWalking 已初始化
16pub fn mark_ready() {
17    let _ = SW_READY.set(());
18}
19
20/// 是否已初始化(middleware 未 init 时跳过 Segment 避免 panic)
21pub fn is_ready() -> bool {
22    SW_READY.get().is_some()
23}
24
25/// SkyWalking 后台 reporter 句柄
26pub struct SkyWalkingHandle {
27    /// gRPC 上报协程
28    _report_task: JoinHandle<()>,
29    /// 全局 Tracer(进程内共享)
30    pub tracer: Arc<Tracer>,
31    /// 全局 Logger(进程内共享)
32    pub logger: Arc<Logger>,
33}
34
35/// 解析 OAP gRPC 端点
36fn resolve_sw_endpoint(cfg: &TelemetryConfig) -> Option<String> {
37    if let Ok(ep) = std::env::var("SW_AGENT_COLLECTOR_BACKEND_SERVICES") {
38        let trimmed = ep.trim();
39        if !trimmed.is_empty() {
40            let url = if trimmed.starts_with("http") {
41                trimmed.to_string()
42            } else {
43                format!("http://{trimmed}")
44            };
45            return Some(url);
46        }
47    }
48    let ep = cfg.skywalking.endpoint.trim();
49    if ep.is_empty() {
50        None
51    } else {
52        Some(ep.to_string())
53    }
54}
55
56/// 解析实例名
57fn resolve_instance_name(cfg: &TelemetryConfig) -> String {
58    if !cfg.skywalking.instance_name.trim().is_empty() {
59        return cfg.skywalking.instance_name.trim().to_string();
60    }
61    std::env::var("HOSTNAME")
62        .or_else(|_| std::env::var("COMPUTERNAME"))
63        .unwrap_or_else(|_| "tdm-server-rust".into())
64}
65
66/// 连接 OAP 并启动后台 Segment/Log 上报
67pub async fn init_skywalking(cfg: &TelemetryConfig) -> anyhow::Result<SkyWalkingHandle> {
68    let endpoint = resolve_sw_endpoint(cfg)
69        .ok_or_else(|| anyhow::anyhow!("telemetry.skywalking.endpoint 未配置"))?;
70    let instance = resolve_instance_name(cfg);
71    let service = cfg.service_name.clone();
72
73    let reporter = GrpcReporter::connect(endpoint)
74        .await
75        .map_err(|e| anyhow::anyhow!("SkyWalking connect: {e:?}"))?;
76    let report_task = reporter.reporting().await.spawn();
77
78    let tracer = Tracer::new(service.clone(), instance.clone(), reporter.clone());
79    let logger = Logger::new(service, instance, reporter);
80
81    set_global_tracer(tracer.clone());
82    super::logger::set_global_logger(Arc::new(logger.clone()));
83    mark_ready();
84
85    Ok(SkyWalkingHandle {
86        _report_task: tokio::spawn(async move {
87            let _ = report_task.await;
88        }),
89        tracer: Arc::new(tracer),
90        logger: Arc::new(logger),
91    })
92}