目 录CONTENT

文章目录

Fastapi自定义中间件,request上下文对象仅能消费一次的坑及解决办法

Administrator
2025-09-23 / 0 评论 / 0 点赞 / 2 阅读 / 0 字

 

字数 1298,阅读大约需 7 分钟

Fastapi自定义中间件,request上下文对象仅能消费一次的坑及解决办法

在 FastAPI 中,Request 对象的 body 内容确实只能被消费一次,由其底层依赖的 Starlette 框架的设计决定的,主要和异步处理以及内存效率有关。
在不熟悉使用request的时候,经常会犯的错误是消费了request,但又在后面的代码里使用该request

同类问题文章

Fastapi框架-使用过程填坑篇(6)- 吐槽Request请求上下文对象只能被消费一次的问题和解决思路,竟然不能再中间件里面再消费一次~唉 - 掘金[1]

python - Why does fastapi hang when adding a middleware to print the HTTP request body? - Stack Overflow[2]

中间件请求解析永远挂起 · 问题 #847 · Kludex/starlette[3]

强约束限制:request在fastapi中间件有且仅能被消费一次,在其他中间件不能重复消费此request

问题代码复现

class RequestContext(BaseHTTPMiddleware):
    async def dispatch(self, request: Request, call_next: RequestResponseEndpoint):
        request_id = request_ctx.set(str(uuid4()))  # generate uuid to request
        body = await request.body()
        if body:
            logger.info(...)  # log request with body
        else:
            logger.info(...)  # log request without body
 
        response = await call_next(request)
        response.headers['X-Request-ID'] = request_ctx.get()
        logger.info("%s" % (response.status_code))
        request_ctx.reset(request_id)

        return response

核心问题: 在 dispatch 方法中,这行代码 body = await request.body() 消耗(consumes)了请求体。之后,又将原始的 request 对象传递给了 response = await call_next(request)。
吐槽:FastAPI(以及底层的Starlette)中一个让人头疼的设计问题

关于request的声明

为什么 Request 对象只能被消费一次?

  1. 1. 流式处理设计: - Request 的 body 是以流(stream)的形式传输的,类似于文件读取的流式处理 - 当你调用 await request.body()await request.json() 时,会读取整个流并将其消耗掉 - 流被消耗后就无法再次读取,这是为了避免内存中保存重复数据

  2. 2. 异步处理限制: - 在异步环境中,多次读取同一个流会导致状态混乱 - 为了保证数据一致性,设计上只允许消费一次

  3. 3. 中间件链的影响: - 如果在某个中间件中消费了 request body,后续的中间件或路由处理函数就无法再获取数据 - 这就是你遇到的"在中间件里消费后,其他地方无法再使用"的问题

解决思路 要在多个地方使用 request body,可以采用"缓存"策略:

from fastapi import FastAPI, Request
from starlette.responses import JSONResponse

app = FastAPI()

@app.middleware("http")
async def cache_request_body(request: Request, call_next):
    # 读取并缓存请求体
    body = await request.body()
    # 将缓存的body存储到request.state中
    request.state.body = body
    
    # 继续处理请求
    response = await call_next(request)
    return response

@app.post("/test")
async def test_endpoint(request: Request):
    # 从缓存中获取body
    body = request.state.body
    # 可以多次使用
    print("第一次使用:", body)
    print("第二次使用:", body)
    return JSONResponse({"message": "成功获取请求体"})
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse

class CachedRequest(Request):
    async def body(self):
        if not hasattr(self, "_body"):
            self._body = await super().body()
        return self._body

app = FastAPI(request_class=CachedRequest)

@app.middleware("http")
async def first_middleware(request: Request, call_next):
    # 第一次消费
    body = await request.body()
    print("中间件1读取:", body)
    response = await call_next(request)
    return response

@app.middleware("http")
async def second_middleware(request: Request, call_next):
    # 第二次消费(使用缓存)
    body = await request.body()
    print("中间件2读取:", body)
    response = await call_next(request)
    return response
from fastapi import FastAPI, Request

app = FastAPI()

@app.middleware("http")
async def cache_json_body(request: Request, call_next):
    if request.method in ["POST", "PUT", "PATCH"] and request.headers.get("content-type") == "application/json":
        # 缓存JSON数据
        json_body = await request.json()
        request.state.json_body = json_body
        # 重新构建body(因为json()会消耗body)
        from fastapi.encoders import jsonable_encoder
        import json
        request._body = json.dumps(jsonable_encoder(json_body)).encode("utf-8")
    
    response = await call_next(request)
    return response

解决方法:缓存request body

from fastapi import FastAPI, Request, Response
from fastapi.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request as StarletteRequest
import json
from typing import Any

class CachedBodyMiddleware(BaseHTTPMiddleware):
    """
    缓存请求体的中间件,让后续中间件可以重复读取请求体
    """
    
    async def dispatch(self, request: Request, call_next):
        # 读取并缓存请求体
        body = await request.body()
        
        # 创建一个新的request对象,包含缓存的body
        async def receive():
            return {"type": "http.request", "body": body}
        
        # 替换request的receive方法
        request._receive = receive
        
        # 将body缓存到request的状态中,供其他中间件使用
        request.state.cached_body = body
        
        response = await call_next(request)
        return response

class LoggingMiddleware(BaseHTTPMiddleware):
    """
    日志中间件,记录请求信息
    """
    
    async def dispatch(self, request: Request, call_next):
        # 现在可以安全地读取body了
        if hasattr(request.state, 'cached_body'):
            body = request.state.cached_body
        else:
            body = await request.body()
        
        print(f"Request path: {request.url.path}")
        print(f"Request method: {request.method}")
        
        # 尝试解析JSON body
        if body:
            try:
                json_body = json.loads(body.decode('utf-8'))
                print(f"Request body: {json_body}")
            except:
                print(f"Request body (raw): {body}")
        
        response = await call_next(request)
        return response

class AuthMiddleware(BaseHTTPMiddleware):
    """
    认证中间件,也需要读取请求体进行验证
    """
    
    async def dispatch(self, request: Request, call_next):
        # 同样可以读取缓存的body
        if hasattr(request.state, 'cached_body'):
            body = request.state.cached_body
            if body:
                try:
                    data = json.loads(body.decode('utf-8'))
                    # 进行一些认证逻辑
                    if 'auth_token' in data:
                        print(f"Auth token found: {data['auth_token']}")
                    else:
                        print("No auth token in request")
                except:
                    pass
        
        response = await call_next(request)
        return response

# 使用示例
app = FastAPI()

# 注意:中间件的添加顺序很重要!
# 最后添加的中间件最先执行
app.add_middleware(AuthMiddleware)
app.add_middleware(LoggingMiddleware)
app.add_middleware(CachedBodyMiddleware)  # 必须最先执行(最后添加)

@app.post("/test")
async def test_endpoint(request: Request):
    # 在路由处理函数中也可以正常读取body
    body = await request.body()
    return {"message": "success", "received": body.decode('utf-8') if body else ""}

# 测试用的根路径
@app.get("/")
async def root():
    return {"message": "Hello World"}

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

引用链接

[1] Fastapi框架-使用过程填坑篇(6)- 吐槽Request请求上下文对象只能被消费一次的问题和解决思路,竟然不能再中间件里面再消费一次~唉 - 掘金: https://juejin.cn/post/6972031031155097631
[2] python - Why does fastapi hang when adding a middleware to print the HTTP request body? - Stack Overflow: https://stackoverflow.com/questions/70134618/why-does-fastapi-hang-when-adding-a-middleware-to-print-the-http-request-body
[3] 中间件请求解析永远挂起 · 问题 #847 · Kludex/starlette: https://github.com/Kludex/starlette/issues/847

 

0
  1. 支付宝打赏

    qrcode alipay
  2. 微信打赏

    qrcode weixin

评论区