from __future__ import annotations import os from io import BytesIO from typing import BinaryIO, Dict, Optional from .interface import StorageInterface class HuaweiOBSStorage(StorageInterface): """华为云OBS存储适配器""" storage_type = "huawei_obs" def __init__(self, config: dict): try: from obs import ObsClient except ImportError: raise ImportError("请先安装 esdk-obs-python: pip install esdk-obs-python") self.access_key_id = config.get("access_key_id") self.secret_access_key = config.get("secret_access_key") self.server = config.get("server") # 例如:obs.cn-north-4.myhuaweicloud.com self.bucket_name = config.get("bucket") if not all([self.access_key_id, self.secret_access_key, self.server, self.bucket_name]): raise ValueError("华为云OBS配置不完整,需要: access_key_id, secret_access_key, server, bucket") self.client = ObsClient( access_key_id=self.access_key_id, secret_access_key=self.secret_access_key, server=self.server, ) def upload(self, file_stream: BinaryIO, key: str, mime_type: Optional[str] = None) -> Dict: """上传文件到华为云OBS(流式上传,避免内存溢出)""" # 华为云 OBS SDK 支持传入文件对象进行流式上传 resp = self.client.putFile( bucketName=self.bucket_name, objectKey=key, file_path=None, # 不使用文件路径 content_type=mime_type, ) # 如果 putFile 不支持流,则使用 putContent(需要读取) # 但我们可以分块读取来减少内存压力 if resp is None or resp.status >= 300: # 使用分块读取方式 BUFFER_SIZE = 8 * 1024 * 1024 # 8MB chunks = [] while True: chunk = file_stream.read(BUFFER_SIZE) if not chunk: break chunks.append(chunk) data = b''.join(chunks) resp = self.client.putContent( bucketName=self.bucket_name, objectKey=key, content=data, contentType=mime_type, ) if resp.status >= 300: raise Exception(f"华为云OBS上传失败: {resp.errorMessage}") return { "etag": resp.body.etag, "version_id": resp.body.versionId, } def append_chunk(self, key: str, chunk_stream: BinaryIO, offset: int) -> None: """ 追加写入数据块(OBS支持追加上传) """ chunk_data = chunk_stream.read() resp = self.client.appendObject( bucketName=self.bucket_name, objectKey=key, content=chunk_data, position=offset, ) if resp.status >= 300: raise Exception(f"华为云OBS追加失败: {resp.errorMessage}") def download(self, key: str) -> BinaryIO: """从华为云OBS下载文件""" resp = self.client.getObject(bucketName=self.bucket_name, objectKey=key) if resp.status >= 300: raise FileNotFoundError(f"文件不存在: {key}") return BytesIO(resp.body.buffer) def delete(self, key: str) -> None: """删除OBS文件""" resp = self.client.deleteObject(bucketName=self.bucket_name, objectKey=key) if resp.status >= 300 and resp.status != 404: raise Exception(f"删除文件失败: {resp.errorMessage}") def exists(self, key: str) -> bool: """检查OBS文件是否存在""" resp = self.client.getObjectMetadata(bucketName=self.bucket_name, objectKey=key) return resp.status < 300 def get_url(self, key: str, expires: int = 3600) -> str: """获取OBS文件访问URL(带签名)""" if expires == 0: # 永久URL(适用于公共读bucket) return f"https://{self.bucket_name}.{self.server}/{key}" # 生成带签名的临时URL resp = self.client.createSignedUrl( method="GET", bucketName=self.bucket_name, objectKey=key, expires=expires, ) return resp.signedUrl