from __future__ import annotations import os from io import BytesIO from typing import BinaryIO, Dict, Optional from .interface import StorageInterface class TencentCOSStorage(StorageInterface): """腾讯云COS存储适配器""" storage_type = "tencent_cos" def __init__(self, config: dict): try: from qcloud_cos import CosConfig, CosS3Client except ImportError: raise ImportError("请先安装 cos-python-sdk-v5: pip install cos-python-sdk-v5") self.secret_id = config.get("secret_id") self.secret_key = config.get("secret_key") self.region = config.get("region") self.bucket_name = config.get("bucket") if not all([self.secret_id, self.secret_key, self.region, self.bucket_name]): raise ValueError("腾讯云COS配置不完整,需要: secret_id, secret_key, region, bucket") cos_config = CosConfig( Region=self.region, SecretId=self.secret_id, SecretKey=self.secret_key, ) self.client = CosS3Client(cos_config) def upload(self, file_stream: BinaryIO, key: str, mime_type: Optional[str] = None) -> Dict: """上传文件到腾讯云COS""" response = self.client.put_object( Bucket=self.bucket_name, Key=key, Body=file_stream.read(), ContentType=mime_type or "application/octet-stream", ) return { "etag": response.get("ETag", "").strip('"'), "version_id": response.get("VersionId"), } def append_chunk(self, key: str, chunk_stream: BinaryIO, offset: int) -> None: """ 追加写入数据块 注意:COS不支持原生append,需要先下载再上传(或使用分片上传) 这里采用简单的读-改-写策略 """ chunk_data = chunk_stream.read() # 如果文件已存在,先下载 existing_data = b"" if self.exists(key): try: response = self.client.get_object(Bucket=self.bucket_name, Key=key) existing_data = response["Body"].read() except Exception: pass # 合并数据 if offset == 0: new_data = chunk_data else: new_data = existing_data[:offset] + chunk_data # 重新上传 self.client.put_object( Bucket=self.bucket_name, Key=key, Body=new_data, ) def download(self, key: str) -> BinaryIO: """从腾讯云COS下载文件""" response = self.client.get_object(Bucket=self.bucket_name, Key=key) return BytesIO(response["Body"].read()) def delete(self, key: str) -> None: """删除COS文件""" self.client.delete_object(Bucket=self.bucket_name, Key=key) def exists(self, key: str) -> bool: """检查COS文件是否存在""" try: self.client.head_object(Bucket=self.bucket_name, Key=key) return True except Exception: return False def get_url(self, key: str, expires: int = 3600) -> str: """获取COS文件访问URL(带签名)""" if expires == 0: # 永久URL(适用于公共读bucket) return f"https://{self.bucket_name}.cos.{self.region}.myqcloud.com/{key}" # 生成带签名的临时URL url = self.client.get_presigned_url( Method="GET", Bucket=self.bucket_name, Key=key, Expired=expires, ) return url