You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

107 lines
3.5 KiB
Python

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

from __future__ import annotations
import os
from io import BytesIO
from typing import BinaryIO, Dict, Optional
from .interface import StorageInterface
class HuaweiOBSStorage(StorageInterface):
"""华为云OBS存储适配器"""
storage_type = "huawei_obs"
def __init__(self, config: dict):
try:
from obs import ObsClient
except ImportError:
raise ImportError("请先安装 esdk-obs-python: pip install esdk-obs-python")
self.access_key_id = config.get("access_key_id")
self.secret_access_key = config.get("secret_access_key")
self.server = config.get("server") # 例如obs.cn-north-4.myhuaweicloud.com
self.bucket_name = config.get("bucket")
if not all([self.access_key_id, self.secret_access_key, self.server, self.bucket_name]):
raise ValueError("华为云OBS配置不完整需要: access_key_id, secret_access_key, server, bucket")
self.client = ObsClient(
access_key_id=self.access_key_id,
secret_access_key=self.secret_access_key,
server=self.server,
)
def upload(self, file_stream: BinaryIO, key: str, mime_type: Optional[str] = None) -> Dict:
"""上传文件到华为云OBS"""
data = file_stream.read()
resp = self.client.putContent(
bucketName=self.bucket_name,
objectKey=key,
content=data,
contentType=mime_type,
)
if resp.status >= 300:
raise Exception(f"华为云OBS上传失败: {resp.errorMessage}")
return {
"etag": resp.body.etag,
"version_id": resp.body.versionId,
}
def append_chunk(self, key: str, chunk_stream: BinaryIO, offset: int) -> None:
"""
追加写入数据块OBS支持追加上传
"""
chunk_data = chunk_stream.read()
resp = self.client.appendObject(
bucketName=self.bucket_name,
objectKey=key,
content=chunk_data,
position=offset,
)
if resp.status >= 300:
raise Exception(f"华为云OBS追加失败: {resp.errorMessage}")
def download(self, key: str) -> BinaryIO:
"""从华为云OBS下载文件"""
resp = self.client.getObject(bucketName=self.bucket_name, objectKey=key)
if resp.status >= 300:
raise FileNotFoundError(f"文件不存在: {key}")
return BytesIO(resp.body.buffer)
def delete(self, key: str) -> None:
"""删除OBS文件"""
resp = self.client.deleteObject(bucketName=self.bucket_name, objectKey=key)
if resp.status >= 300 and resp.status != 404:
raise Exception(f"删除文件失败: {resp.errorMessage}")
def exists(self, key: str) -> bool:
"""检查OBS文件是否存在"""
resp = self.client.getObjectMetadata(bucketName=self.bucket_name, objectKey=key)
return resp.status < 300
def get_url(self, key: str, expires: int = 3600) -> str:
"""获取OBS文件访问URL带签名"""
if expires == 0:
# 永久URL适用于公共读bucket
return f"https://{self.bucket_name}.{self.server}/{key}"
# 生成带签名的临时URL
resp = self.client.createSignedUrl(
method="GET",
bucketName=self.bucket_name,
objectKey=key,
expires=expires,
)
return resp.signedUrl