Skip to main content

tdm_server_rust/telemetry/skywalking/
middleware.rs

1//! 全路由 SkyWalking Entry span 中间件
2
3use super::{context, reporter, sw_propagation};
4use ::skywalking::trace::{span::HandleSpanObject, tracer::create_trace_context};
5use axum::{
6    body::Body,
7    extract::MatchedPath,
8    http::{Request, Response},
9    middleware::Next,
10};
11use tracing::Instrument;
12
13/// 未 init SW 时直通(集成测试不连 OAP)
14async fn passthrough(req: Request<Body>, next: Next) -> Response<Body> {
15    next.run(req).await
16}
17
18/// 全路由 HTTP Entry 追踪(SW 原生 Segment,驱动 Overview/Endpoint 指标)
19pub async fn sw_entry_middleware(req: Request<Body>, next: Next) -> Response<Body> {
20    if !reporter::is_ready() {
21        return passthrough(req, next).await;
22    }
23    let method = req.method().clone();
24    let uri = req.uri().clone();
25    let path = uri.path().to_string();
26    let route = req
27        .extensions()
28        .get::<MatchedPath>()
29        .map(|m| m.as_str().to_string())
30        .unwrap_or_else(|| path.clone());
31    let endpoint = format!("{}:{route}", method.as_str());
32    let peer = req
33        .headers()
34        .get("x-forwarded-for")
35        .and_then(|v| v.to_str().ok())
36        .unwrap_or("-")
37        .to_string();
38
39    let mut ctx = create_trace_context();
40    let propagation = sw_propagation::extract_inbound(req.headers());
41
42    let mut entry = match propagation {
43        Some(prop) => ctx.create_entry_span_with_propagation(&endpoint, &prop),
44        None => ctx.create_entry_span(&endpoint),
45    };
46    entry.add_tag("http.method", method.as_str());
47    entry.add_tag("url", &path);
48
49    let span = tracing::info_span!(
50        "http.request",
51        http.method = %method,
52        url.path = %path,
53        http.route = %route,
54        http.status_code = tracing::field::Empty,
55    );
56
57    context::with_context(ctx, async move {
58        let mut entry = entry;
59        let mut response = next.run(req).instrument(span.clone()).await;
60        let status = response.status().as_u16();
61        span.record("http.status_code", status);
62
63        entry.add_tag("http.status_code", status.to_string());
64
65        if let Some(matched) = response.extensions().get::<MatchedPath>() {
66            span.record("http.route", matched.as_str());
67        }
68
69        context::with_current(|c| {
70            sw_propagation::inject_sw8(response.headers_mut(), c, &endpoint, &peer);
71            sw_propagation::inject_traceparent(response.headers_mut(), c.trace_id());
72        });
73
74        response
75    })
76    .await
77}