|
|
from __future__ import annotations
|
|
|
from io import BytesIO
|
|
|
from typing import BinaryIO, Dict, Optional
|
|
|
|
|
|
from .interface import StorageInterface
|
|
|
|
|
|
|
|
|
class AliyunOSSStorage(StorageInterface):
|
|
|
"""阿里云OSS存储适配器"""
|
|
|
|
|
|
storage_type = "aliyun_oss"
|
|
|
|
|
|
def __init__(self, config: dict):
|
|
|
try:
|
|
|
import oss2
|
|
|
except ImportError:
|
|
|
raise ImportError("请先安装 oss2: pip install oss2")
|
|
|
|
|
|
self.access_key_id = config.get("access_key_id")
|
|
|
self.access_key_secret = config.get("access_key_secret")
|
|
|
self.endpoint = config.get("endpoint")
|
|
|
self.bucket_name = config.get("bucket")
|
|
|
|
|
|
if not all([self.access_key_id, self.access_key_secret, self.endpoint, self.bucket_name]):
|
|
|
raise ValueError("阿里云OSS配置不完整,需要: access_key_id, access_key_secret, endpoint, bucket")
|
|
|
|
|
|
auth = oss2.Auth(self.access_key_id, self.access_key_secret)
|
|
|
self.bucket = oss2.Bucket(auth, self.endpoint, self.bucket_name)
|
|
|
|
|
|
def upload(self, file_stream: BinaryIO, key: str, mime_type: Optional[str] = None) -> Dict:
|
|
|
"""上传文件到阿里云OSS(流式上传,避免内存溢出)"""
|
|
|
headers = {}
|
|
|
if mime_type:
|
|
|
headers["Content-Type"] = mime_type
|
|
|
|
|
|
# OSS SDK 的 put_object 支持传入文件对象,会自动流式上传
|
|
|
# 不需要调用 read() 一次性读取到内存
|
|
|
result = self.bucket.put_object(key, file_stream, headers=headers)
|
|
|
|
|
|
return {
|
|
|
"etag": result.etag,
|
|
|
"request_id": result.request_id,
|
|
|
}
|
|
|
|
|
|
def append_chunk(self, key: str, chunk_stream: BinaryIO, offset: int) -> None:
|
|
|
"""追加写入数据块(OSS原生支持追加写)"""
|
|
|
chunk_data = chunk_stream.read()
|
|
|
|
|
|
# 第一次写入时使用append_object,后续也用append_object
|
|
|
self.bucket.append_object(key, offset, chunk_data)
|
|
|
|
|
|
def download(self, key: str) -> BinaryIO:
|
|
|
"""从阿里云OSS下载文件"""
|
|
|
result = self.bucket.get_object(key)
|
|
|
return BytesIO(result.read())
|
|
|
|
|
|
def delete(self, key: str) -> None:
|
|
|
"""删除OSS文件"""
|
|
|
self.bucket.delete_object(key)
|
|
|
|
|
|
def exists(self, key: str) -> bool:
|
|
|
"""检查OSS文件是否存在"""
|
|
|
return self.bucket.object_exists(key)
|
|
|
|
|
|
def get_url(self, key: str, expires: int = 3600) -> str:
|
|
|
"""获取OSS文件访问URL(带签名)"""
|
|
|
if expires == 0:
|
|
|
# 永久URL(适用于公共读bucket)
|
|
|
return f"https://{self.bucket_name}.{self.endpoint.replace('http://', '').replace('https://', '')}/{key}"
|
|
|
|
|
|
# 生成带签名的临时URL
|
|
|
return self.bucket.sign_url("GET", key, expires)
|
|
|
|
|
|
# 预览与缩略图
|
|
|
def get_preview_url(self, key: str, expires: int = 3600) -> str:
|
|
|
"""预览URL(等同于访问URL)"""
|
|
|
return self.get_url(key, expires)
|
|
|
|
|
|
def get_thumbnail_url(
|
|
|
self,
|
|
|
key: str,
|
|
|
width: int = 200,
|
|
|
height: int = 200,
|
|
|
mode: str = "fit",
|
|
|
expires: int = 3600,
|
|
|
) -> str:
|
|
|
"""使用 x-oss-process 图片处理签名URL。
|
|
|
|
|
|
注意:图片处理参数必须参与签名,故通过 SDK 的 sign_url params 参数生成。
|
|
|
"""
|
|
|
# 模式映射:fit -> lfit, fill -> fill, pad -> pad
|
|
|
mode_map = {
|
|
|
"fit": "lfit",
|
|
|
"fill": "fill",
|
|
|
"pad": "pad",
|
|
|
}
|
|
|
oss_mode = mode_map.get(mode, "lfit")
|
|
|
|
|
|
# 使用图片处理 + 指定格式,避免覆盖 Content-Type(避免 0017-00000902 错误)
|
|
|
# OSS 会依据 format 自动返回相应的 Content-Type
|
|
|
process = (
|
|
|
f"image/resize,m_{oss_mode},w_{max(1, int(width))},h_{max(1, int(height))}/format,png"
|
|
|
)
|
|
|
|
|
|
# 仅携带 x-oss-process 参与签名
|
|
|
return self.bucket.sign_url("GET", key, expires, params={"x-oss-process": process})
|
|
|
|