You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

108 lines
3.9 KiB
Python

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

from __future__ import annotations
from io import BytesIO
from typing import BinaryIO, Dict, Optional
from .interface import StorageInterface
class AliyunOSSStorage(StorageInterface):
"""阿里云OSS存储适配器"""
storage_type = "aliyun_oss"
def __init__(self, config: dict):
try:
import oss2
except ImportError:
raise ImportError("请先安装 oss2: pip install oss2")
self.access_key_id = config.get("access_key_id")
self.access_key_secret = config.get("access_key_secret")
self.endpoint = config.get("endpoint")
self.bucket_name = config.get("bucket")
if not all([self.access_key_id, self.access_key_secret, self.endpoint, self.bucket_name]):
raise ValueError("阿里云OSS配置不完整需要: access_key_id, access_key_secret, endpoint, bucket")
auth = oss2.Auth(self.access_key_id, self.access_key_secret)
self.bucket = oss2.Bucket(auth, self.endpoint, self.bucket_name)
def upload(self, file_stream: BinaryIO, key: str, mime_type: Optional[str] = None) -> Dict:
"""上传文件到阿里云OSS流式上传避免内存溢出"""
headers = {}
if mime_type:
headers["Content-Type"] = mime_type
# OSS SDK 的 put_object 支持传入文件对象,会自动流式上传
# 不需要调用 read() 一次性读取到内存
result = self.bucket.put_object(key, file_stream, headers=headers)
return {
"etag": result.etag,
"request_id": result.request_id,
}
def append_chunk(self, key: str, chunk_stream: BinaryIO, offset: int) -> None:
"""追加写入数据块OSS原生支持追加写"""
chunk_data = chunk_stream.read()
# 第一次写入时使用append_object后续也用append_object
self.bucket.append_object(key, offset, chunk_data)
def download(self, key: str) -> BinaryIO:
"""从阿里云OSS下载文件"""
result = self.bucket.get_object(key)
return BytesIO(result.read())
def delete(self, key: str) -> None:
"""删除OSS文件"""
self.bucket.delete_object(key)
def exists(self, key: str) -> bool:
"""检查OSS文件是否存在"""
return self.bucket.object_exists(key)
def get_url(self, key: str, expires: int = 3600) -> str:
"""获取OSS文件访问URL带签名"""
if expires == 0:
# 永久URL适用于公共读bucket
return f"https://{self.bucket_name}.{self.endpoint.replace('http://', '').replace('https://', '')}/{key}"
# 生成带签名的临时URL
return self.bucket.sign_url("GET", key, expires)
# 预览与缩略图
def get_preview_url(self, key: str, expires: int = 3600) -> str:
"""预览URL等同于访问URL"""
return self.get_url(key, expires)
def get_thumbnail_url(
self,
key: str,
width: int = 200,
height: int = 200,
mode: str = "fit",
expires: int = 3600,
) -> str:
"""使用 x-oss-process 图片处理签名URL。
注意:图片处理参数必须参与签名,故通过 SDK 的 sign_url params 参数生成。
"""
# 模式映射fit -> lfit, fill -> fill, pad -> pad
mode_map = {
"fit": "lfit",
"fill": "fill",
"pad": "pad",
}
oss_mode = mode_map.get(mode, "lfit")
# 使用图片处理 + 指定格式,避免覆盖 Content-Type避免 0017-00000902 错误)
# OSS 会依据 format 自动返回相应的 Content-Type
process = (
f"image/resize,m_{oss_mode},w_{max(1, int(width))},h_{max(1, int(height))}/format,png"
)
# 仅携带 x-oss-process 参与签名
return self.bucket.sign_url("GET", key, expires, params={"x-oss-process": process})