You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
iTi-Flask/docs/HTTP_RESPONSE_UTILS.md

872 lines
21 KiB
Markdown

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

# HTTP 响应包装工具使用指南
## 📖 概述
本工具库提供统一的 API 响应格式包装函数和分页工具,基于 APIFlask 框架设计,简化 API 开发流程。
**核心特性:**
- ✅ 统一返回格式:`{data, code, message}`
- ✅ 智能分页支持:自动识别 SQLAlchemy Pagination 对象
- ✅ 灵活调用方式:支持多种参数传递方式
- ✅ 完整类型提示TypeScript 级别的类型安全
- ✅ 参考 APIFlask 标准:与框架保持一致
---
## 📦 安装位置
```
src/applications/common/utils/http.py
```
---
## 🎯 核心函数
### 1. `success()` - 成功响应
**签名:**
```python
def success(data: Any = None, message: str = '成功', code: int = 200) -> dict
```
**参数:**
- `data`:返回数据(任意类型)
- `message`:提示信息(默认 `'成功'`
- `code`:业务状态码(默认 `200`
**返回格式:**
```json
{
"data": <返回数据>,
"code": 200,
"message": "成功"
}
```
**示例:**
```python
from iti.applications.common.utils import success
from iti.applications.extensions.http import BaseResponse
@app.get('/api/users/<int:user_id>')
@app.output(BaseResponse)
def get_user(user_id):
user = User.query.get(user_id)
if not user:
return fail('用户不存在', code=404)
# 返回单个对象
return success({
'id': user.id,
'name': user.name,
'email': user.email
})
@app.get('/api/products')
@app.output(BaseResponse)
def get_products():
products = Product.query.limit(10).all()
# 返回列表
return success(
[{'id': p.id, 'name': p.name} for p in products],
message='获取产品列表成功'
)
```
---
### 2. `fail()` - 失败响应
**签名:**
```python
def fail(message: str = '操作失败', code: int = 500, data: Any = None) -> dict
```
**参数:**
- `message`:错误信息
- `code`:业务错误码(默认 `500`
- `data`:额外数据(如验证错误详情)
**特点:**
- ⚠️ **HTTP 状态码保持 200**,由前端根据 `code` 字段判断业务状态
- 💡 适用于统一错误处理,避免 HTTP 层面的错误拦截
**返回格式:**
```json
{
"data": null,
"code": 500,
"message": "操作失败"
}
```
**示例:**
```python
from iti.applications.common.utils import fail
@app.post('/api/users')
@app.output(BaseResponse)
def create_user():
username = request.json.get('username')
# 参数验证失败
if not username:
return fail('用户名不能为空', code=400)
# 资源不存在
if User.query.filter_by(username=username).first():
return fail('用户名已存在', code=409)
# 服务器错误
try:
user = User(username=username)
db.session.add(user)
db.session.commit()
except Exception as e:
return fail(f'创建失败: {str(e)}', code=500)
return success(user_to_dict(user), message='创建成功')
@app.post('/api/login')
@app.output(BaseResponse)
def login():
data = request.json
# 验证失败,返回详细错误
errors = validate_login(data)
if errors:
return fail(
message='验证失败',
code=422,
data=errors # {'username': ['必填项'], 'password': ['长度不足']}
)
# ... 登录逻辑
```
---
### 3. `page()` - 分页响应
**签名:**
```python
def page(
items: Union[list, Any],
pagination: Union[dict, Any, None] = None,
message: str = '成功',
code: int = 200
) -> dict
```
**支持三种调用方式:**
#### **方式 1传入 SQLAlchemy Pagination 对象(最简)**
```python
from iti.applications.common.utils import page
@app.get('/api/users')
@app.output(BaseResponse)
def get_users():
page_num = request.args.get('page', 1, type=int)
page_size = request.args.get('size', 10, type=int)
# SQLAlchemy 分页查询
db_pagination = User.query.paginate(page=page_num, per_page=page_size)
# ✅ 直接传入 Pagination 对象,自动解析
return page(db_pagination, message='获取用户列表成功')
```
**返回格式:**
```json
{
"data": {
"items": [
{"id": 1, "name": "Alice"},
{"id": 2, "name": "Bob"}
],
"page": {
"page": 1,
"size": 10,
"pages": 10,
"total": 100,
"current": "http://api.example.com/api/users?page=1&size=10",
"next": "http://api.example.com/api/users?page=2&size=10",
"prev": null,
"first": "http://api.example.com/api/users?page=1&size=10",
"last": "http://api.example.com/api/users?page=10&size=10"
}
},
"code": 200,
"message": "获取用户列表成功"
}
```
---
#### **方式 2传入数据列表 + Pagination 对象**
适用于需要先序列化数据的场景:
```python
from iti.applications.common.utils import page
from iti.applications.schemas.user import UserSchema
@app.get('/api/users')
@app.output(BaseResponse)
def get_users():
db_pagination = User.query.paginate(page=1, per_page=10)
# 先序列化数据(使用 Marshmallow Schema
users = UserSchema(many=True).dump(db_pagination.items)
# ✅ 传入序列化后的数据 + Pagination 对象
return page(users, db_pagination, message='获取用户列表成功')
```
---
#### **方式 3手动构建分页信息**
适用于非 SQLAlchemy 数据源(如 Redis、外部 API
```python
from iti.applications.common.utils import page, pagination_builder
@app.get('/api/stats')
@app.output(BaseResponse)
def get_stats():
# 从 Redis 获取数据
all_data = redis_client.lrange('stats', 0, -1)
page_num = request.args.get('page', 1, type=int)
page_size = request.args.get('size', 20, type=int)
# 手动分页
start = (page_num - 1) * page_size
end = start + page_size
items = all_data[start:end]
# ✅ 手动构建分页信息
pagination_info = pagination_builder(
None,
page=page_num,
size=page_size,
total=len(all_data)
)
return page(items, pagination_info, message='获取统计数据成功')
```
---
## 🔧 辅助函数
### `pagination_builder()` - 分页信息构建器
**签名:**
```python
def pagination_builder(
pagination: Any,
*,
page: Optional[int] = None,
size: Optional[int] = None,
total: Optional[int] = None,
pages: Optional[int] = None,
) -> dict
```
**参数说明:**
- `pagination`SQLAlchemy Pagination 对象 或 `None`
- `page`:当前页码(手动模式)
- `size`:每页数量(手动模式)
- `total`:总记录数(手动模式)
- `pages`:总页数(可选,自动计算)
**注意:**
- 第一个参数后使用 `*`,强制后续参数必须使用关键字传参
- 参考 APIFlask 的 `helpers.py` 实现
**示例 1自动模式SQLAlchemy Pagination**
```python
from iti.applications.common.utils import pagination_builder
@app.get('/api/products')
def get_products():
db_pagination = Product.query.paginate(page=1, per_page=10)
# ✅ 自动从 Pagination 对象提取信息
pagination_info = pagination_builder(db_pagination)
return {
'items': db_pagination.items,
'pagination': pagination_info
}
```
**示例 2手动模式自定义数据源**
```python
from iti.applications.common.utils import pagination_builder
@app.get('/api/external-data')
def get_external_data():
# 从外部 API 获取数据
response = requests.get('https://api.example.com/data')
total_count = response.headers.get('X-Total-Count', 0)
items = response.json()
# ✅ 手动构建分页信息
pagination_info = pagination_builder(
None, # 第一个参数传 None
page=1,
size=20,
total=int(total_count)
)
return page(items, pagination_info)
```
---
## 📊 Schema 定义
### `PaginationSchema` - 分页信息 Schema
```python
class PaginationSchema(Schema):
"""自定义分页信息 Schema"""
page = Integer() # 当前页码
size = Integer() # 每页数量(重命名自 per_page
pages = Integer() # 总页数
total = Integer() # 总记录数
current = URL() # 当前页URL
next = URL() # 下一页URL
prev = URL() # 上一页URL
first = URL() # 首页URL
last = URL() # 末页URL
```
**特点:**
- ✅ 与 APIFlask 原生 `PaginationSchema` 保持一致
- ✅ 仅将 `per_page` 重命名为 `size`
---
### `PageDataSchema` - 分页数据 Schema
```python
class PageDataSchema(Schema):
"""分页数据包装 Schema"""
items = List(Nested(Field())) # 数据列表
page = Nested(PaginationSchema) # 分页信息(字段名为 page
```
**使用示例:**
```python
from iti.applications.common.utils import PageDataSchema
# 在路由中使用(可选,主要用于文档生成)
@app.get('/api/users')
@app.doc(responses={200: PageDataSchema})
def get_users():
...
```
---
## 🎨 完整示例
### 示例 1用户管理 CRUD
```python
from flask import request
from iti.applications.common.utils import success, fail, page
from iti.applications.extensions.http import BaseResponse
from my_project.models.user import User
from my_project.schemas.user import UserSchema, UserCreateSchema
# ========== 列表(分页) ==========
@app.get('/api/users')
@app.output(BaseResponse)
def get_users():
"""获取用户列表"""
page_num = request.args.get('page', 1, type=int)
page_size = request.args.get('size', 10, type=int)
db_pagination = User.query.paginate(page=page_num, per_page=page_size)
return page(db_pagination, message='获取用户列表成功')
# ========== 详情 ==========
@app.get('/api/users/<int:user_id>')
@app.output(BaseResponse)
def get_user(user_id):
"""获取用户详情"""
user = User.query.get(user_id)
if not user:
return fail('用户不存在', code=404)
return success(UserSchema().dump(user))
# ========== 创建 ==========
@app.post('/api/users')
@app.input(UserCreateSchema)
@app.output(BaseResponse)
def create_user(data):
"""创建用户"""
# 检查用户名是否存在
if User.query.filter_by(username=data['username']).first():
return fail('用户名已存在', code=409)
try:
user = User(**data)
db.session.add(user)
db.session.commit()
return success(
UserSchema().dump(user),
message='创建成功',
code=201
)
except Exception as e:
db.session.rollback()
return fail(f'创建失败: {str(e)}', code=500)
# ========== 更新 ==========
@app.patch('/api/users/<int:user_id>')
@app.input(UserCreateSchema)
@app.output(BaseResponse)
def update_user(user_id, data):
"""更新用户"""
user = User.query.get(user_id)
if not user:
return fail('用户不存在', code=404)
try:
for key, value in data.items():
setattr(user, key, value)
db.session.commit()
return success(UserSchema().dump(user), message='更新成功')
except Exception as e:
db.session.rollback()
return fail(f'更新失败: {str(e)}', code=500)
# ========== 删除 ==========
@app.delete('/api/users/<int:user_id>')
@app.output(BaseResponse)
def delete_user(user_id):
"""删除用户"""
user = User.query.get(user_id)
if not user:
return fail('用户不存在', code=404)
try:
db.session.delete(user)
db.session.commit()
return success(None, message='删除成功')
except Exception as e:
db.session.rollback()
return fail(f'删除失败: {str(e)}', code=500)
```
---
### 示例 2复杂查询与筛选
```python
from sqlalchemy import and_, or_
from iti.applications.common.utils import page, pagination_builder
@app.get('/api/products')
@app.output(BaseResponse)
def get_products():
"""获取产品列表(支持筛选、搜索、排序)"""
# 获取查询参数
page_num = request.args.get('page', 1, type=int)
page_size = request.args.get('size', 20, type=int)
category = request.args.get('category')
search = request.args.get('search')
sort_by = request.args.get('sort', 'created_at')
order = request.args.get('order', 'desc')
# 构建查询
query = Product.query
# 筛选条件
if category:
query = query.filter(Product.category == category)
# 搜索条件
if search:
query = query.filter(
or_(
Product.name.contains(search),
Product.description.contains(search)
)
)
# 排序
if order == 'desc':
query = query.order_by(getattr(Product, sort_by).desc())
else:
query = query.order_by(getattr(Product, sort_by).asc())
# 分页
db_pagination = query.paginate(page=page_num, per_page=page_size)
# 序列化
products = ProductSchema(many=True).dump(db_pagination.items)
return page(products, db_pagination, message='获取产品列表成功')
```
---
### 示例 3聚合统计非 ORM 分页)
```python
from sqlalchemy import func
from iti.applications.common.utils import page, pagination_builder
@app.get('/api/stats/daily')
@app.output(BaseResponse)
def get_daily_stats():
"""获取每日统计数据"""
page_num = request.args.get('page', 1, type=int)
page_size = request.args.get('size', 30, type=int)
# 聚合查询(不使用 ORM 分页)
query = db.session.query(
func.date(Order.created_at).label('date'),
func.count(Order.id).label('order_count'),
func.sum(Order.amount).label('total_amount')
).group_by(func.date(Order.created_at))
# 获取总数
total = query.count()
# 手动分页
offset = (page_num - 1) * page_size
items = query.offset(offset).limit(page_size).all()
# 格式化数据
stats = [
{
'date': str(item.date),
'order_count': item.order_count,
'total_amount': float(item.total_amount or 0)
}
for item in items
]
# 手动构建分页信息
pagination_info = pagination_builder(
None,
page=page_num,
size=page_size,
total=total
)
return page(stats, pagination_info, message='获取统计数据成功')
```
---
## ⚙️ 配置说明
### BaseResponse Schema
`applications/extensions/http.py` 中定义:
```python
from apiflask import Schema
from apiflask.fields import Integer, String, Field
class BaseResponse(Schema):
"""统一响应格式 Schema"""
data = Field()
code = Integer()
message = String()
def init_http(app):
# 配置 APIFlask 使用自定义响应格式
app.config["BASE_RESPONSE_SCHEMA"] = BaseResponse
app.config["BASE_RESPONSE_DATA_KEY"] = "data"
```
---
## 🔍 URL 生成规则
分页 URL 自动生成逻辑:
1. **获取当前请求的 `base_url`**`http://api.example.com/api/users`
2. **复制查询参数**`request.args.copy()`
3. **更新分页参数**
- `page`: 页码
- `size`: 每页数量(注意:使用 `size` 而非 `per_page`
4. **构建完整 URL**`base_url + ? + urlencode(args)`
**示例:**
请求 `/api/users?category=admin&page=2&size=10` 时生成的 URL
```json
{
"current": "http://api.example.com/api/users?category=admin&page=2&size=10",
"next": "http://api.example.com/api/users?category=admin&page=3&size=10",
"prev": "http://api.example.com/api/users?category=admin&page=1&size=10",
"first": "http://api.example.com/api/users?category=admin&page=1&size=10",
"last": "http://api.example.com/api/users?category=admin&page=10&size=10"
}
```
---
## 📝 最佳实践
### 1. 统一错误码规范
建议定义错误码常量:
```python
# applications/common/constants.py
class ErrorCode:
"""业务错误码"""
SUCCESS = 200
BAD_REQUEST = 400
UNAUTHORIZED = 401
FORBIDDEN = 403
NOT_FOUND = 404
CONFLICT = 409
UNPROCESSABLE_ENTITY = 422
INTERNAL_SERVER_ERROR = 500
# 使用
from iti.applications.common.utils import fail
from iti.applications.common.constants import ErrorCode
return fail('用户不存在', code=ErrorCode.NOT_FOUND)
```
---
### 2. 结合 Marshmallow Schema
```python
from iti.applications.schemas.user import UserSchema
from iti.applications.common.utils import success
@app.get('/api/users/<int:user_id>')
@app.output(BaseResponse)
def get_user(user_id):
user = User.query.get_or_404(user_id)
# ✅ 使用 Schema 序列化
user_data = UserSchema().dump(user)
return success(user_data)
```
---
### 3. 分页参数验证
```python
from iti.applications.common.utils import fail, page
@app.get('/api/users')
@app.output(BaseResponse)
def get_users():
page_num = request.args.get('page', 1, type=int)
page_size = request.args.get('size', 10, type=int)
# 验证分页参数
if page_num < 1:
return fail('页码必须大于 0', code=400)
if page_size < 1 or page_size > 100:
return fail('每页数量必须在 1-100 之间', code=400)
db_pagination = User.query.paginate(page=page_num, per_page=page_size)
return page(db_pagination)
```
---
### 4. 异常统一处理
```python
from iti.applications.common.utils import fail
@app.errorhandler(404)
def handle_404(error):
return fail('资源不存在', code=404)
@app.errorhandler(500)
def handle_500(error):
return fail('服务器内部错误', code=500)
@app.errorhandler(Exception)
def handle_exception(error):
app.logger.error(f'未处理的异常: {error}')
return fail(str(error), code=500)
```
---
## 🆚 对比 APIFlask 原生实现
| 特性 | APIFlask 原生 | 本工具库 |
|------|--------------|---------|
| 分页参数名 | `per_page` | `size` ✅ |
| 返回格式 | 灵活 | 统一 `{data, code, message}` ✅ |
| 错误处理 | HTTP 状态码 | 业务 `code` 字段 ✅ |
| 智能识别 | 需手动处理 | 自动识别 Pagination 对象 ✅ |
| URL 生成 | 手动 | 自动生成 ✅ |
---
## 🧪 测试
### 运行测试
```bash
# 运行 HTTP 工具测试
uv run --extra dev pytest tests/test_http_utils.py -v
# 运行所有测试
uv run --extra dev pytest
# 测试覆盖率
uv run --extra dev pytest --cov=iti --cov-report=html
```
### 测试统计
- **测试用例数量**: 33 个
- **测试分类**:
- `success()` 函数测试: 6 个
- `fail()` 函数测试: 5 个
- `pagination_builder()` 测试: 6 个
- `page()` 函数测试: 7 个
- Schema 定义测试: 2 个
- Flask 集成测试: 4 个
- 边界情况测试: 5 个
### 测试覆盖范围
- ✅ 基础功能测试
- ✅ 参数验证测试
- ✅ 默认值测试
- ✅ 边界情况测试
- ✅ Flask 应用集成测试
- ✅ Schema 定义测试
- ✅ 请求上下文处理测试
### 添加自定义测试
`tests/test_http_utils.py` 中添加测试:
```python
import pytest
from iti.applications.common.utils import success
def test_my_custom_case():
"""测试自定义场景"""
result = success({'key': 'value'})
assert result['code'] == 200
```
---
## 🐛 常见问题
### Q1: 为什么错误也返回 HTTP 200
**A:** 这是一种常见的 API 设计模式,优点:
- 前端统一处理,不需要捕获 HTTP 异常
- 避免浏览器/代理对非 200 状态码的拦截
- 业务状态由 `code` 字段表示,更清晰
如需返回 HTTP 错误状态码,可以手动返回元组:
```python
return fail('未找到', code=404), 404 # HTTP 404
```
---
### Q2: 如何自定义分页 URL 生成逻辑?
**A:** 修改 `_generate_page_url()` 函数:
```python
def _generate_page_url(page_num, page_size):
if page_num is None:
return None
# 自定义逻辑
return f"https://custom-domain.com/api?p={page_num}&s={page_size}"
```
---
### Q3: 分页信息中的 URL 字段可以去掉吗?
**A:** 可以。修改 `pagination_builder()` 返回值:
```python
return {
'page': page,
'size': size,
'pages': pages,
'total': total,
# 注释掉 URL 字段
# 'current': current_url,
# 'next': next_url,
# ...
}
```
---
## 📚 参考资料
- [APIFlask 官方文档](https://apiflask.com/)
- [APIFlask helpers.py 源码](https://github.com/apiflask/apiflask/blob/main/src/apiflask/helpers.py)
- [Flask-SQLAlchemy Pagination](https://flask-sqlalchemy.palletsprojects.com/en/3.0.x/pagination/)
---
## 📅 更新日志
- **v1.0.0** (2024-10-14)
- 初始版本
- 实现 `success()`, `fail()`, `page()` 函数
- 实现 `pagination_builder()` 分页构建器
- 支持智能识别 SQLAlchemy Pagination 对象
- 自动生成分页 URL
- 参数重命名:`per_page` → `size`
---
**编写人员:** AI Assistant
**最后更新:** 2024-10-14
**版本:** 1.0.0