You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

112 lines
3.6 KiB
Python

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

from __future__ import annotations
import os
from io import BytesIO
from typing import BinaryIO, Dict, Optional
from .interface import StorageInterface
class TencentCOSStorage(StorageInterface):
"""腾讯云COS存储适配器"""
storage_type = "tencent_cos"
def __init__(self, config: dict):
try:
from qcloud_cos import CosConfig, CosS3Client
except ImportError:
raise ImportError("请先安装 cos-python-sdk-v5: pip install cos-python-sdk-v5")
self.secret_id = config.get("secret_id")
self.secret_key = config.get("secret_key")
self.region = config.get("region")
self.bucket_name = config.get("bucket")
if not all([self.secret_id, self.secret_key, self.region, self.bucket_name]):
raise ValueError("腾讯云COS配置不完整需要: secret_id, secret_key, region, bucket")
cos_config = CosConfig(
Region=self.region,
SecretId=self.secret_id,
SecretKey=self.secret_key,
)
self.client = CosS3Client(cos_config)
def upload(self, file_stream: BinaryIO, key: str, mime_type: Optional[str] = None) -> Dict:
"""上传文件到腾讯云COS"""
response = self.client.put_object(
Bucket=self.bucket_name,
Key=key,
Body=file_stream.read(),
ContentType=mime_type or "application/octet-stream",
)
return {
"etag": response.get("ETag", "").strip('"'),
"version_id": response.get("VersionId"),
}
def append_chunk(self, key: str, chunk_stream: BinaryIO, offset: int) -> None:
"""
追加写入数据块
注意COS不支持原生append需要先下载再上传或使用分片上传
这里采用简单的读-改-写策略
"""
chunk_data = chunk_stream.read()
# 如果文件已存在,先下载
existing_data = b""
if self.exists(key):
try:
response = self.client.get_object(Bucket=self.bucket_name, Key=key)
existing_data = response["Body"].read()
except Exception:
pass
# 合并数据
if offset == 0:
new_data = chunk_data
else:
new_data = existing_data[:offset] + chunk_data
# 重新上传
self.client.put_object(
Bucket=self.bucket_name,
Key=key,
Body=new_data,
)
def download(self, key: str) -> BinaryIO:
"""从腾讯云COS下载文件"""
response = self.client.get_object(Bucket=self.bucket_name, Key=key)
return BytesIO(response["Body"].read())
def delete(self, key: str) -> None:
"""删除COS文件"""
self.client.delete_object(Bucket=self.bucket_name, Key=key)
def exists(self, key: str) -> bool:
"""检查COS文件是否存在"""
try:
self.client.head_object(Bucket=self.bucket_name, Key=key)
return True
except Exception:
return False
def get_url(self, key: str, expires: int = 3600) -> str:
"""获取COS文件访问URL带签名"""
if expires == 0:
# 永久URL适用于公共读bucket
return f"https://{self.bucket_name}.cos.{self.region}.myqcloud.com/{key}"
# 生成带签名的临时URL
url = self.client.get_presigned_url(
Method="GET",
Bucket=self.bucket_name,
Key=key,
Expired=expires,
)
return url