|
|
from __future__ import annotations
|
|
|
|
|
|
import os
|
|
|
from io import BytesIO
|
|
|
from typing import BinaryIO, Dict, Optional
|
|
|
|
|
|
from .interface import StorageInterface
|
|
|
|
|
|
|
|
|
class TencentCOSStorage(StorageInterface):
|
|
|
"""腾讯云COS存储适配器"""
|
|
|
|
|
|
storage_type = "tencent_cos"
|
|
|
|
|
|
def __init__(self, config: dict):
|
|
|
try:
|
|
|
from qcloud_cos import CosConfig, CosS3Client
|
|
|
except ImportError:
|
|
|
raise ImportError("请先安装 cos-python-sdk-v5: pip install cos-python-sdk-v5")
|
|
|
|
|
|
self.secret_id = config.get("secret_id")
|
|
|
self.secret_key = config.get("secret_key")
|
|
|
self.region = config.get("region")
|
|
|
self.bucket_name = config.get("bucket")
|
|
|
|
|
|
if not all([self.secret_id, self.secret_key, self.region, self.bucket_name]):
|
|
|
raise ValueError("腾讯云COS配置不完整,需要: secret_id, secret_key, region, bucket")
|
|
|
|
|
|
cos_config = CosConfig(
|
|
|
Region=self.region,
|
|
|
SecretId=self.secret_id,
|
|
|
SecretKey=self.secret_key,
|
|
|
)
|
|
|
self.client = CosS3Client(cos_config)
|
|
|
|
|
|
def upload(self, file_stream: BinaryIO, key: str, mime_type: Optional[str] = None) -> Dict:
|
|
|
"""上传文件到腾讯云COS(流式上传,避免内存溢出)"""
|
|
|
# 腾讯云 COS SDK 支持传入文件对象,会自动流式上传
|
|
|
response = self.client.put_object(
|
|
|
Bucket=self.bucket_name,
|
|
|
Key=key,
|
|
|
Body=file_stream, # 直接传入流对象,SDK会自动处理
|
|
|
ContentType=mime_type or "application/octet-stream",
|
|
|
)
|
|
|
|
|
|
return {
|
|
|
"etag": response.get("ETag", "").strip('"'),
|
|
|
"version_id": response.get("VersionId"),
|
|
|
}
|
|
|
|
|
|
def append_chunk(self, key: str, chunk_stream: BinaryIO, offset: int) -> None:
|
|
|
"""
|
|
|
追加写入数据块
|
|
|
注意:COS不支持原生append,需要先下载再上传(或使用分片上传)
|
|
|
这里采用简单的读-改-写策略
|
|
|
"""
|
|
|
chunk_data = chunk_stream.read()
|
|
|
|
|
|
# 如果文件已存在,先下载
|
|
|
existing_data = b""
|
|
|
if self.exists(key):
|
|
|
try:
|
|
|
response = self.client.get_object(Bucket=self.bucket_name, Key=key)
|
|
|
existing_data = response["Body"].read()
|
|
|
except Exception:
|
|
|
pass
|
|
|
|
|
|
# 合并数据
|
|
|
if offset == 0:
|
|
|
new_data = chunk_data
|
|
|
else:
|
|
|
new_data = existing_data[:offset] + chunk_data
|
|
|
|
|
|
# 重新上传
|
|
|
self.client.put_object(
|
|
|
Bucket=self.bucket_name,
|
|
|
Key=key,
|
|
|
Body=new_data,
|
|
|
)
|
|
|
|
|
|
def download(self, key: str) -> BinaryIO:
|
|
|
"""从腾讯云COS下载文件"""
|
|
|
response = self.client.get_object(Bucket=self.bucket_name, Key=key)
|
|
|
return BytesIO(response["Body"].read())
|
|
|
|
|
|
def delete(self, key: str) -> None:
|
|
|
"""删除COS文件"""
|
|
|
self.client.delete_object(Bucket=self.bucket_name, Key=key)
|
|
|
|
|
|
def exists(self, key: str) -> bool:
|
|
|
"""检查COS文件是否存在"""
|
|
|
try:
|
|
|
self.client.head_object(Bucket=self.bucket_name, Key=key)
|
|
|
return True
|
|
|
except Exception:
|
|
|
return False
|
|
|
|
|
|
def get_url(self, key: str, expires: int = 3600) -> str:
|
|
|
"""获取COS文件访问URL(带签名)"""
|
|
|
if expires == 0:
|
|
|
# 永久URL(适用于公共读bucket)
|
|
|
return f"https://{self.bucket_name}.cos.{self.region}.myqcloud.com/{key}"
|
|
|
|
|
|
# 生成带签名的临时URL
|
|
|
url = self.client.get_presigned_url(
|
|
|
Method="GET",
|
|
|
Bucket=self.bucket_name,
|
|
|
Key=key,
|
|
|
Expired=expires,
|
|
|
)
|
|
|
return url
|
|
|
|