字数 1298,阅读大约需 7 分钟
Fastapi自定义中间件,request上下文对象仅能消费一次的坑及解决办法
在 FastAPI 中,Request
对象的 body 内容确实只能被消费一次,由其底层依赖的 Starlette 框架的设计决定的,主要和异步处理以及内存效率有关。
在不熟悉使用request的时候,经常会犯的错误是消费了request,但又在后面的代码里使用该request。
同类问题文章
Fastapi框架-使用过程填坑篇(6)- 吐槽Request请求上下文对象只能被消费一次的问题和解决思路,竟然不能再中间件里面再消费一次~唉 - 掘金[1]
python - Why does fastapi hang when adding a middleware to print the HTTP request body? - Stack Overflow[2]
中间件请求解析永远挂起 · 问题 #847 · Kludex/starlette[3]
强约束限制:request在fastapi中间件有且仅能被消费一次,在其他中间件不能重复消费此request
问题代码复现
class RequestContext(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next: RequestResponseEndpoint):
request_id = request_ctx.set(str(uuid4())) # generate uuid to request
body = await request.body()
if body:
logger.info(...) # log request with body
else:
logger.info(...) # log request without body
response = await call_next(request)
response.headers['X-Request-ID'] = request_ctx.get()
logger.info("%s" % (response.status_code))
request_ctx.reset(request_id)
return response
核心问题: 在 dispatch 方法中,这行代码 body = await request.body() 消耗(consumes)了请求体。之后,又将原始的 request 对象传递给了 response = await call_next(request)。
吐槽:FastAPI(以及底层的Starlette)中一个让人头疼的设计问题
关于request的声明
为什么 Request 对象只能被消费一次?
1. 流式处理设计: - Request 的 body 是以流(stream)的形式传输的,类似于文件读取的流式处理 - 当你调用
await request.body()
或await request.json()
时,会读取整个流并将其消耗掉 - 流被消耗后就无法再次读取,这是为了避免内存中保存重复数据2. 异步处理限制: - 在异步环境中,多次读取同一个流会导致状态混乱 - 为了保证数据一致性,设计上只允许消费一次
3. 中间件链的影响: - 如果在某个中间件中消费了 request body,后续的中间件或路由处理函数就无法再获取数据 - 这就是你遇到的"在中间件里消费后,其他地方无法再使用"的问题
解决思路 要在多个地方使用 request body,可以采用"缓存"策略:
from fastapi import FastAPI, Request
from starlette.responses import JSONResponse
app = FastAPI()
@app.middleware("http")
async def cache_request_body(request: Request, call_next):
# 读取并缓存请求体
body = await request.body()
# 将缓存的body存储到request.state中
request.state.body = body
# 继续处理请求
response = await call_next(request)
return response
@app.post("/test")
async def test_endpoint(request: Request):
# 从缓存中获取body
body = request.state.body
# 可以多次使用
print("第一次使用:", body)
print("第二次使用:", body)
return JSONResponse({"message": "成功获取请求体"})
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
class CachedRequest(Request):
async def body(self):
if not hasattr(self, "_body"):
self._body = await super().body()
return self._body
app = FastAPI(request_class=CachedRequest)
@app.middleware("http")
async def first_middleware(request: Request, call_next):
# 第一次消费
body = await request.body()
print("中间件1读取:", body)
response = await call_next(request)
return response
@app.middleware("http")
async def second_middleware(request: Request, call_next):
# 第二次消费(使用缓存)
body = await request.body()
print("中间件2读取:", body)
response = await call_next(request)
return response
from fastapi import FastAPI, Request
app = FastAPI()
@app.middleware("http")
async def cache_json_body(request: Request, call_next):
if request.method in ["POST", "PUT", "PATCH"] and request.headers.get("content-type") == "application/json":
# 缓存JSON数据
json_body = await request.json()
request.state.json_body = json_body
# 重新构建body(因为json()会消耗body)
from fastapi.encoders import jsonable_encoder
import json
request._body = json.dumps(jsonable_encoder(json_body)).encode("utf-8")
response = await call_next(request)
return response
解决方法:缓存request body
from fastapi import FastAPI, Request, Response
from fastapi.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request as StarletteRequest
import json
from typing import Any
class CachedBodyMiddleware(BaseHTTPMiddleware):
"""
缓存请求体的中间件,让后续中间件可以重复读取请求体
"""
async def dispatch(self, request: Request, call_next):
# 读取并缓存请求体
body = await request.body()
# 创建一个新的request对象,包含缓存的body
async def receive():
return {"type": "http.request", "body": body}
# 替换request的receive方法
request._receive = receive
# 将body缓存到request的状态中,供其他中间件使用
request.state.cached_body = body
response = await call_next(request)
return response
class LoggingMiddleware(BaseHTTPMiddleware):
"""
日志中间件,记录请求信息
"""
async def dispatch(self, request: Request, call_next):
# 现在可以安全地读取body了
if hasattr(request.state, 'cached_body'):
body = request.state.cached_body
else:
body = await request.body()
print(f"Request path: {request.url.path}")
print(f"Request method: {request.method}")
# 尝试解析JSON body
if body:
try:
json_body = json.loads(body.decode('utf-8'))
print(f"Request body: {json_body}")
except:
print(f"Request body (raw): {body}")
response = await call_next(request)
return response
class AuthMiddleware(BaseHTTPMiddleware):
"""
认证中间件,也需要读取请求体进行验证
"""
async def dispatch(self, request: Request, call_next):
# 同样可以读取缓存的body
if hasattr(request.state, 'cached_body'):
body = request.state.cached_body
if body:
try:
data = json.loads(body.decode('utf-8'))
# 进行一些认证逻辑
if 'auth_token' in data:
print(f"Auth token found: {data['auth_token']}")
else:
print("No auth token in request")
except:
pass
response = await call_next(request)
return response
# 使用示例
app = FastAPI()
# 注意:中间件的添加顺序很重要!
# 最后添加的中间件最先执行
app.add_middleware(AuthMiddleware)
app.add_middleware(LoggingMiddleware)
app.add_middleware(CachedBodyMiddleware) # 必须最先执行(最后添加)
@app.post("/test")
async def test_endpoint(request: Request):
# 在路由处理函数中也可以正常读取body
body = await request.body()
return {"message": "success", "received": body.decode('utf-8') if body else ""}
# 测试用的根路径
@app.get("/")
async def root():
return {"message": "Hello World"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
引用链接
[1]
Fastapi框架-使用过程填坑篇(6)- 吐槽Request请求上下文对象只能被消费一次的问题和解决思路,竟然不能再中间件里面再消费一次~唉 - 掘金: https://juejin.cn/post/6972031031155097631[2]
python - Why does fastapi hang when adding a middleware to print the HTTP request body? - Stack Overflow: https://stackoverflow.com/questions/70134618/why-does-fastapi-hang-when-adding-a-middleware-to-print-the-http-request-body[3]
中间件请求解析永远挂起 · 问题 #847 · Kludex/starlette: https://github.com/Kludex/starlette/issues/847
评论区