from __future__ import annotations from io import BytesIO from typing import BinaryIO, Dict, Optional from .interface import StorageInterface class AliyunOSSStorage(StorageInterface): """阿里云OSS存储适配器""" storage_type = "aliyun_oss" def __init__(self, config: dict): try: import oss2 except ImportError: raise ImportError("请先安装 oss2: pip install oss2") self.access_key_id = config.get("access_key_id") self.access_key_secret = config.get("access_key_secret") self.endpoint = config.get("endpoint") self.bucket_name = config.get("bucket") if not all([self.access_key_id, self.access_key_secret, self.endpoint, self.bucket_name]): raise ValueError("阿里云OSS配置不完整,需要: access_key_id, access_key_secret, endpoint, bucket") auth = oss2.Auth(self.access_key_id, self.access_key_secret) self.bucket = oss2.Bucket(auth, self.endpoint, self.bucket_name) def upload(self, file_stream: BinaryIO, key: str, mime_type: Optional[str] = None) -> Dict: """上传文件到阿里云OSS(流式上传,避免内存溢出)""" headers = {} if mime_type: headers["Content-Type"] = mime_type # OSS SDK 的 put_object 支持传入文件对象,会自动流式上传 # 不需要调用 read() 一次性读取到内存 result = self.bucket.put_object(key, file_stream, headers=headers) return { "etag": result.etag, "request_id": result.request_id, } def append_chunk(self, key: str, chunk_stream: BinaryIO, offset: int) -> None: """追加写入数据块(OSS原生支持追加写)""" chunk_data = chunk_stream.read() # 第一次写入时使用append_object,后续也用append_object self.bucket.append_object(key, offset, chunk_data) def download(self, key: str) -> BinaryIO: """从阿里云OSS下载文件""" result = self.bucket.get_object(key) return BytesIO(result.read()) def delete(self, key: str) -> None: """删除OSS文件""" self.bucket.delete_object(key) def exists(self, key: str) -> bool: """检查OSS文件是否存在""" return self.bucket.object_exists(key) def get_url(self, key: str, expires: int = 3600) -> str: """获取OSS文件访问URL(带签名)""" if expires == 0: # 永久URL(适用于公共读bucket) return f"https://{self.bucket_name}.{self.endpoint.replace('http://', '').replace('https://', '')}/{key}" # 生成带签名的临时URL return self.bucket.sign_url("GET", key, expires) # 预览与缩略图 def get_preview_url(self, key: str, expires: int = 3600) -> str: """预览URL(等同于访问URL)""" return self.get_url(key, expires) def get_thumbnail_url( self, key: str, width: int = 200, height: int = 200, mode: str = "fit", expires: int = 3600, ) -> str: """使用 x-oss-process 图片处理签名URL。 注意:图片处理参数必须参与签名,故通过 SDK 的 sign_url params 参数生成。 """ # 模式映射:fit -> lfit, fill -> fill, pad -> pad mode_map = { "fit": "lfit", "fill": "fill", "pad": "pad", } oss_mode = mode_map.get(mode, "lfit") # 使用图片处理 + 指定格式,避免覆盖 Content-Type(避免 0017-00000902 错误) # OSS 会依据 format 自动返回相应的 Content-Type process = ( f"image/resize,m_{oss_mode},w_{max(1, int(width))},h_{max(1, int(height))}/format,png" ) # 仅携带 x-oss-process 参与签名 return self.bucket.sign_url("GET", key, expires, params={"x-oss-process": process})