Skip to main content

tdm_server_rust/repository/
oss_repo.rs

1//! OSS 对象存储数据访问层 (OSS Repository)
2//!
3//! 封装 `oss` 表记录管理及 COS STS 凭证生成。
4
5use crate::config::AppConfig;
6use crate::entity::oss::{OssCredential, OssDto};
7use crate::utils::{agent_debug, cos_cdn, cos_presign, cos_sts};
8use sqlx::{MySql, Pool, Row};
9use uuid::Uuid;
10
11/// OSS 仓储
12pub struct OssRepository {
13    /// 连接池
14    pool: Pool<MySql>,
15    /// 应用配置
16    config: AppConfig,
17}
18
19impl OssRepository {
20    /// 构造仓储
21    pub fn new(pool: Pool<MySql>, config: AppConfig) -> Self {
22        Self { pool, config }
23    }
24
25    /// 新增或更新 OSS 记录,绑定到话数岗位列。
26    ///
27    /// 若该话数岗位已有 OSS 记录则 UPDATE,否则 INSERT。
28    ///
29    /// # 返回值
30    ///
31    /// 返回 OSS 记录 ID。
32    ///
33    /// # Errors
34    ///
35    /// - `AppError::business("话数不存在喵")` — episode_id 不存在
36    /// - `AppError::Database` — 写入失败
37    #[tracing::instrument(skip_all, level = "debug")]
38    pub async fn upsert_oss(&self, dto: &OssDto, member_id: i32) -> crate::error::ApiResult<i32> {
39        let bucket = &self.config.tencent.bucket;
40        let region = &self.config.tencent.region;
41        let manga_id = self.get_episode_manga_id(dto.episode_id).await?;
42        let object_key = dto
43            .object_key
44            .clone()
45            .unwrap_or_else(|| {
46                self.build_object_key(manga_id, dto.episode_id, &dto.post_name, &dto.filename)
47            });
48        let url = format!("https://{bucket}.cos.{region}.myqcloud.com/{object_key}");
49
50        let existing_oss_id = self.get_episode_oss_id(dto.episode_id, &dto.post_name).await?;
51        let oss_id = if let Some(id) = existing_oss_id {
52            sqlx::query(
53                "UPDATE oss SET filename = ?, object_key = ?, url = ?, bucket_name = ?, region = ?, update_time = NOW(), update_by = ? WHERE id = ?",
54            )
55            .bind(&dto.filename)
56            .bind(&object_key)
57            .bind(&url)
58            .bind(bucket)
59            .bind(region)
60            .bind(member_id)
61            .bind(id)
62            .execute(&self.pool)
63            .await?;
64            id
65        } else {
66            let r = sqlx::query(
67                "INSERT INTO oss(filename, object_key, url, bucket_name, region, create_time, create_by, update_time, update_by) \
68                 VALUES (?, ?, ?, ?, ?, NOW(), ?, NOW(), ?)",
69            )
70            .bind(&dto.filename)
71            .bind(&object_key)
72            .bind(&url)
73            .bind(bucket)
74            .bind(region)
75            .bind(member_id)
76            .bind(member_id)
77            .execute(&self.pool)
78            .await?;
79            let id = r.last_insert_id() as i32;
80            self.bind_episode_oss_id(dto.episode_id, &dto.post_name, id).await?;
81            id
82        };
83        sqlx::query("UPDATE mangaepisodetb SET updateTime = NOW() WHERE Id = ?")
84            .bind(dto.episode_id)
85            .execute(&self.pool)
86            .await?;
87        Ok(oss_id)
88    }
89
90    /// 删除 OSS 记录
91    #[tracing::instrument(skip_all, level = "debug")]
92    pub async fn delete_oss(&self, oss_id: i32) -> crate::error::ApiResult<Option<String>> {
93        let row = sqlx::query("SELECT object_key FROM oss WHERE id = ?")
94            .bind(oss_id)
95            .fetch_optional(&self.pool)
96            .await?;
97        let key = row.and_then(|r| r.try_get("object_key").ok());
98        sqlx::query("DELETE FROM oss WHERE id = ?")
99            .bind(oss_id)
100            .execute(&self.pool)
101            .await?;
102        Ok(key)
103    }
104
105    /// 获取上传凭证
106    #[tracing::instrument(skip_all, level = "debug")]
107    pub async fn get_upload_credential(
108        &self,
109        episode_id: i32,
110        post_name: &str,
111        filename: &str,
112    ) -> crate::error::ApiResult<OssCredential> {
113        self.ensure_episode_exists(episode_id).await?;
114        self.ensure_ext_allowed(filename, &self.config.tencent.ext_whitelist)?;
115        let manga_id = self.get_episode_manga_id(episode_id).await?;
116        let object_key = self.build_object_key(manga_id, episode_id, post_name, filename);
117        let max_bytes = self.config.tencent.max_file_size * 1024 * 1024;
118        // #region agent log
119        agent_debug::log(
120            "H5",
121            "oss_repo.rs:get_upload_credential",
122            "upload_credential_request",
123            serde_json::json!({
124                "episodeId": episode_id,
125                "postName": post_name,
126                "objectKeyLen": object_key.len(),
127                "hasNonAscii": !object_key.is_ascii(),
128                "maxBytes": max_bytes
129            }),
130        );
131        // #endregion
132        let presigned_url = self
133            .presigned_put_url(&self.config.tencent.bucket, &object_key, max_bytes)
134            .await?;
135        Ok(OssCredential {
136            presigned_url,
137            object_key: Some(object_key),
138            original_filename: Some(filename.to_string()),
139        })
140    }
141
142    /// 获取下载凭证
143    #[tracing::instrument(skip_all, level = "debug")]
144    pub async fn get_download_credential(
145        &self,
146        episode_id: i32,
147        post_name: &str,
148    ) -> crate::error::ApiResult<OssCredential> {
149        let oss_id: i32 = self
150            .get_episode_oss_id(episode_id, post_name)
151            .await?
152            .ok_or_else(|| {
153                // #region agent log
154                agent_debug::log(
155                    "H2",
156                    "oss_repo.rs:get_download_credential",
157                    "oss_id_missing",
158                    serde_json::json!({
159                        "episodeId": episode_id,
160                        "postName": post_name
161                    }),
162                );
163                // #endregion
164                crate::error::AppError::Oss {
165                    code: None,
166                    msg: format!("当前单话尚未上传 [{post_name}] 文件"),
167                }
168            })?;
169        let row = sqlx::query("SELECT object_key, filename FROM oss WHERE id = ?")
170            .bind(oss_id)
171            .fetch_optional(&self.pool)
172            .await?
173            .ok_or_else(|| crate::error::AppError::Oss {
174                code: None,
175                msg: format!("OSS 记录 {oss_id} 不存在"),
176            })?;
177        let object_key: String = row.get("object_key");
178        let original_filename: Option<String> = row.try_get("filename").ok();
179        let presigned_url = if let Some(cdn_url) = cos_cdn::generate_cdn_url(
180            &self.config.tencent.cdn_domain,
181            &self.config.tencent.cdn_key,
182            &object_key,
183        ) {
184            // #region agent log
185            agent_debug::log(
186                "H1",
187                "oss_repo.rs:get_download_credential",
188                "cdn_signed_url",
189                serde_json::json!({
190                    "hasSign": cdn_url.contains("sign="),
191                    "hasT": cdn_url.contains("&t="),
192                    "objectKeyLen": object_key.len()
193                }),
194            );
195            // #endregion
196            cdn_url
197        } else {
198            // #region agent log
199            agent_debug::log(
200                "H1",
201                "oss_repo.rs:get_download_credential",
202                "fallback_cos_presign",
203                serde_json::json!({
204                    "cdnDomainEmpty": self.config.tencent.cdn_domain.is_empty(),
205                    "cdnKeyEmpty": self.config.tencent.cdn_key.is_empty()
206                }),
207            );
208            // #endregion
209            self.presigned_get_url(&self.config.tencent.bucket, &object_key)
210                .await?
211        };
212        Ok(OssCredential {
213            presigned_url,
214            object_key: Some(object_key),
215            original_filename,
216        })
217    }
218
219    /// 获取图片上传凭证(对齐 Java generateImageObjectKey + imageBucket)
220    #[tracing::instrument(skip_all, level = "debug")]
221    pub async fn get_image_upload_credential(
222        &self,
223        image_type: &str,
224        filename: &str,
225    ) -> crate::error::ApiResult<OssCredential> {
226        let image_bucket = self.config.tencent.image_bucket.trim();
227        if image_bucket.is_empty() {
228            return Err(crate::error::AppError::Oss {
229                code: None,
230                msg: "图片存储桶未配置".into(),
231            });
232        }
233        self.ensure_ext_allowed(filename, &self.config.tencent.image_ext_whitelist)?;
234        let object_key = self.build_image_object_key(image_type, filename);
235        let max_bytes = self.config.tencent.image_max_file_size * 1024 * 1024;
236        let presigned_url = self
237            .presigned_put_url(image_bucket, &object_key, max_bytes)
238            .await?;
239        Ok(OssCredential {
240            presigned_url,
241            object_key: Some(object_key),
242            original_filename: Some(filename.to_string()),
243        })
244    }
245
246    /// STS + 预签名 PUT URL
247    #[tracing::instrument(skip_all, level = "debug")]
248    async fn presigned_put_url(
249        &self,
250        bucket: &str,
251        object_key: &str,
252        max_bytes: u64,
253    ) -> crate::error::ApiResult<String> {
254        let policy = cos_sts::build_put_policy(bucket, &self.config.tencent.region, object_key, max_bytes)?;
255        let creds = cos_sts::get_federation_token(
256            &self.config.tencent.secret_id,
257            &self.config.tencent.secret_key,
258            &self.config.tencent.region,
259            policy,
260            self.config.tencent.duration_seconds,
261        )
262        .await?;
263        cos_presign::presigned_url(
264            &creds.tmp_secret_id,
265            &creds.tmp_secret_key,
266            bucket,
267            &self.config.tencent.region,
268            object_key,
269            "PUT",
270            self.config.tencent.duration_seconds,
271            &creds.session_token,
272        )
273    }
274
275    /// STS + 预签名 GET URL
276    #[tracing::instrument(skip_all, level = "debug")]
277    async fn presigned_get_url(
278        &self,
279        bucket: &str,
280        object_key: &str,
281    ) -> crate::error::ApiResult<String> {
282        let policy = cos_sts::build_get_policy(bucket, &self.config.tencent.region, object_key)?;
283        let creds = cos_sts::get_federation_token(
284            &self.config.tencent.secret_id,
285            &self.config.tencent.secret_key,
286            &self.config.tencent.region,
287            policy,
288            self.config.tencent.duration_seconds,
289        )
290        .await?;
291        cos_presign::presigned_url(
292            &creds.tmp_secret_id,
293            &creds.tmp_secret_key,
294            bucket,
295            &self.config.tencent.region,
296            object_key,
297            "GET",
298            self.config.tencent.duration_seconds,
299            &creds.session_token,
300        )
301    }
302
303    /// 按 ID 查询 objectKey
304    #[tracing::instrument(skip_all, level = "debug")]
305    pub async fn get_object_key(&self, oss_id: i32) -> crate::error::ApiResult<Option<String>> {
306        let row = sqlx::query("SELECT object_key FROM oss WHERE id = ?")
307            .bind(oss_id)
308            .fetch_optional(&self.pool)
309            .await?;
310        Ok(row.and_then(|r| r.try_get("object_key").ok()))
311    }
312
313    /// 检测话数是否存在
314    #[tracing::instrument(skip_all, level = "debug")]
315    async fn ensure_episode_exists(&self, episode_id: i32) -> crate::error::ApiResult<()> {
316        let row = sqlx::query("SELECT Id FROM mangaepisodetb WHERE Id = ?")
317            .bind(episode_id)
318            .fetch_optional(&self.pool)
319            .await?;
320        if row.is_none() {
321            return Err(crate::error::AppError::business(format!("漫画单话 {episode_id} 不存在")));
322        }
323        Ok(())
324    }
325
326    /// 查询话数岗位绑定的 OSS ID
327    #[tracing::instrument(skip_all, level = "debug")]
328    async fn get_episode_oss_id(&self, episode_id: i32, post_name: &str) -> crate::error::ApiResult<Option<i32>> {
329        let col = post_oss_column(post_name)?;
330        let sql = format!("SELECT {col} AS ossId FROM mangaepisodetb WHERE Id = ?");
331        let row = sqlx::query(&sql)
332            .bind(episode_id)
333            .fetch_optional(&self.pool)
334            .await?;
335        Ok(row.and_then(|r| r.try_get("ossId").ok()))
336    }
337
338    /// 绑定话数岗位 OSS ID
339    #[tracing::instrument(skip_all, level = "debug")]
340    async fn bind_episode_oss_id(
341        &self,
342        episode_id: i32,
343        post_name: &str,
344        oss_id: i32,
345    ) -> crate::error::ApiResult<()> {
346        let col = post_oss_column(post_name)?;
347        let sql = format!("UPDATE mangaepisodetb SET {col} = ?, updateTime = NOW() WHERE Id = ?");
348        sqlx::query(&sql)
349            .bind(oss_id)
350            .bind(episode_id)
351            .execute(&self.pool)
352            .await?;
353        Ok(())
354    }
355
356    /// 查询话数所属漫画 ID
357    #[tracing::instrument(skip_all, level = "debug")]
358    async fn get_episode_manga_id(&self, episode_id: i32) -> crate::error::ApiResult<i32> {
359        let row = sqlx::query("SELECT mangaId FROM mangaepisodetb WHERE Id = ?")
360            .bind(episode_id)
361            .fetch_optional(&self.pool)
362            .await?;
363        row.map(|r| r.get("mangaId"))
364            .ok_or_else(|| crate::error::AppError::business(format!("漫画单话 {episode_id} 不存在")))
365    }
366
367    /// 生成话数稿件 objectKey(对齐 Java generateObjectKey)
368    fn build_object_key(
369        &self,
370        manga_id: i32,
371        episode_id: i32,
372        post_name: &str,
373        filename: &str,
374    ) -> String {
375        let file_type_dir = match post_name.to_lowercase().as_str() {
376            "provider" => "manga-raw",
377            "translator" => "translation",
378            "proofreader" => "proofread",
379            "letterer" => "manga-cooked",
380            "timer" => "timing",
381            _ => "other",
382        };
383        format!("manga_{manga_id}/episode_{episode_id}/{file_type_dir}/{filename}")
384    }
385
386    /// 生成图片 objectKey(benefits/uuid.ext)
387    fn build_image_object_key(&self, image_type: &str, filename: &str) -> String {
388        let mut normalized = image_type.trim().trim_matches('/').to_string();
389        if normalized.is_empty() {
390            normalized = "misc".into();
391        }
392        let ext = file_ext(filename).unwrap_or_else(|| "jpg".into());
393        format!("{normalized}/{}.{}", Uuid::new_v4(), ext)
394    }
395
396    /// 校验后缀白名单
397    fn ensure_ext_allowed(&self, filename: &str, whitelist: &[String]) -> crate::error::ApiResult<()> {
398        let ext = file_ext(filename).ok_or_else(|| crate::error::AppError::Oss {
399            code: None,
400            msg: format!("无法识别文件后缀: {filename}"),
401        })?;
402        if !whitelist.iter().any(|w| w.eq_ignore_ascii_case(&ext)) {
403            return Err(crate::error::AppError::Oss {
404                code: None,
405                msg: format!("非法文件后缀: {ext}"),
406            });
407        }
408        Ok(())
409    }
410}
411
412/// 岗位对应 OSS 字段名
413fn post_oss_column(post_name: &str) -> crate::error::ApiResult<&'static str> {
414    match post_name.to_lowercase().as_str() {
415        "provider" => Ok("provider_file_oss_id"),
416        "translator" => Ok("translator_file_oss_id"),
417        "proofreader" => Ok("proofreader_file_oss_id"),
418        "letterer" => Ok("letterer_file_oss_id"),
419        "timer" => Ok("timer_file_oss_id"),
420        _ => Err(crate::error::AppError::Oss {
421            code: None,
422            msg: format!("不支持的岗位类型:{post_name}"),
423        }),
424    }
425}
426
427/// 取小写扩展名
428fn file_ext(filename: &str) -> Option<String> {
429    filename.rsplit('.').next().map(|s| s.to_lowercase())
430}