"""MinIO 对象存储适配器""" from __future__ import annotations from typing import BinaryIO, Dict, Optional from io import BytesIO from .interface import StorageInterface class MinIOStorage(StorageInterface): """MinIO 对象存储适配器""" storage_type = "minio" def __init__(self, config: dict): """ 初始化 MinIO 存储 Args: config: 配置字典,包含以下字段: - endpoint: MinIO 服务地址(如 localhost:9000) - access_key: Access Key - secret_key: Secret Key - bucket: 存储桶名称 - secure: 是否使用 HTTPS(默认 False) - region: 区域(可选) """ try: from minio import Minio except ImportError: raise ImportError( "MinIO 存储需要安装 minio 库: pip install minio" ) self.endpoint = config.get("endpoint") self.access_key = config.get("access_key") self.secret_key = config.get("secret_key") self.bucket = config.get("bucket") self.secure = config.get("secure", False) self.region = config.get("region") if not all([self.endpoint, self.access_key, self.secret_key, self.bucket]): raise ValueError( "MinIO 存储需要配置: endpoint, access_key, secret_key, bucket" ) # 创建 MinIO 客户端 self.client = Minio( self.endpoint, access_key=self.access_key, secret_key=self.secret_key, secure=self.secure, region=self.region, ) # 确保存储桶存在 if not self.client.bucket_exists(self.bucket): self.client.make_bucket(self.bucket, location=self.region) def upload(self, file_stream: BinaryIO, key: str, mime_type: Optional[str] = None) -> Dict: """上传文件到 MinIO(流式上传,避免内存溢出)""" # 移除存储类型前缀 key = key.lstrip(self.storage_type + ":") # 获取文件大小(如果无法获取,使用 -1 让 MinIO 自动处理) try: current_pos = file_stream.tell() file_stream.seek(0, 2) # 移动到文件末尾 file_size = file_stream.tell() file_stream.seek(current_pos) # 恢复原位置 except (OSError, IOError): # 如果流不支持 seek,使用 -1(MinIO 会使用分片上传) file_size = -1 # 上传文件(MinIO SDK 会自动流式上传,不会一次性读取到内存) result = self.client.put_object( self.bucket, key, file_stream, file_size, content_type=mime_type or "application/octet-stream", ) return { "bucket": self.bucket, "key": key, "etag": result.etag, "version_id": result.version_id, } def append_chunk(self, key: str, chunk_stream: BinaryIO, offset: int) -> None: """ 追加写入数据块 注意:MinIO 不支持追加写入,此方法用于分片上传的临时实现 实际使用时建议在内存或本地临时文件中合并后再上传 """ raise NotImplementedError( "MinIO 不支持追加写入,请使用分片上传后合并的方式" ) def download(self, key: str) -> BinaryIO: """从 MinIO 下载文件""" key = key.lstrip(self.storage_type + ":") try: response = self.client.get_object(self.bucket, key) data = response.read() response.close() response.release_conn() return BytesIO(data) except Exception as e: raise FileNotFoundError(f"文件不存在: {key}") from e def delete(self, key: str) -> None: """从 MinIO 删除文件""" key = key.lstrip(self.storage_type + ":") try: self.client.remove_object(self.bucket, key) except Exception as e: raise FileNotFoundError(f"文件不存在: {key}") from e def exists(self, key: str) -> bool: """检查文件是否存在""" key = key.lstrip(self.storage_type + ":") try: self.client.stat_object(self.bucket, key) return True except Exception: return False def get_url(self, key: str, expires: int = 3600) -> str: """ 获取文件访问 URL(预签名 URL) Args: key: 对象键 expires: 过期时间(秒),0 表示永久(实际会使用最大值 7 天) Returns: 预签名 URL """ from datetime import timedelta key = key.lstrip(self.storage_type + ":") # MinIO 预签名 URL 最长 7 天 if expires == 0 or expires > 7 * 24 * 3600: expires = 7 * 24 * 3600 url = self.client.presigned_get_object( self.bucket, key, expires=timedelta(seconds=expires), ) return url def get_preview_url(self, key: str, expires: int = 3600) -> str: """获取预览 URL(与 get_url 相同)""" return self.get_url(key, expires) def get_thumbnail_url( self, key: str, width: int = 200, height: int = 200, mode: str = "fit", expires: int = 3600, ) -> Optional[str]: """ 获取缩略图 URL 注意:MinIO 本身不支持图片处理,返回原图 URL 如需缩略图功能,建议: 1. 使用后端生成缩略图 2. 或在 MinIO 前端配置图片处理服务(如 thumbor) """ return self.get_url(key, expires)