> ## Documentation Index
> Fetch the complete documentation index at: https://docs.agentscope.io/llms.txt
> Use this file to discover all available pages before exploring further.

# 中间件

> 在 agent 生命周期的关键位置拦截并扩展行为

## 概述

Agent middleware 是在不修改 agent 或 model 代码的前提下，向 agent 执行流程中的关键位置注入自定义逻辑（日志、追踪、输入改写、访问控制等）的机制。

AgentScope 暴露了 6 个 hook 位置外加一个 tool-provider hook，覆盖了从外层 reply 流程一路下沉到底层模型 API 调用的全链路：

| 位置                    | 类型          | 说明                                                                                         |
| --------------------- | ----------- | ------------------------------------------------------------------------------------------ |
| `on_reply`            | Onion       | 包裹一次完整的 reply 流程，覆盖其中所有 ReAct 轮次、工具执行与最终输出                                                 |
| `on_reasoning`        | Onion       | 包裹一轮 ReAct 中的推理步骤（输入组装 → 模型调用 → 流式解码）                                                      |
| `on_acting`           | Onion       | 包裹一次工具调用的执行                                                                                |
| `on_model_call`       | Onion       | 包裹一次底层 `ChatModel` API 调用，最贴近模型                                                            |
| `on_compress_context` | Onion       | 包裹 `Agent.compress_context()` —— 在每次推理步骤前触发，由 agent 决定是否压缩上下文                              |
| `on_system_prompt`    | Transformer | 在每次组装 system prompt 时触发；多个 middleware 串行接力，每一个把上一个的输出再做一次变换                                |
| `list_tools`          | Tool source | 可选。返回 middleware 贡献的 `list[ToolBase]`。**不会被自动调用** —— 由组装 agent toolkit 的调用方决定是否调用以及如何合并结果。 |

<Note>
  以上 hook 均作用于 **agent** 层面。若需要在单个工具实例上挂钩 —— 无论该工具是在 agent 内部还是外部被调用 —— 请参见[工具中间件](/versions/2.0.4dev/zh/building-blocks/tool#tool-middleware)。
</Note>

三种类型的差别：

* **Onion**（洋葱式）—— middleware 包裹下一层 handler，可以在 `next_handler()` 前后插入逻辑、观察中间事件流。
* **Transformer**（变换式）—— middleware 之间串成流水线，前一个的输出作为后一个的输入，不存在「内层」概念。
* **Tool source**（工具来源）—— 不在运行时路径上的 hook。`Agent.__init__` 不会调用 `list_tools()`；需要显式从 middleware 中收集工具并自行传入 toolkit。

下图展示这些 hook 在 agent 生命周期中的嵌套关系。`on_system_prompt` 嵌入在 `on_reasoning` 内部，因为它在 reasoning 步骤组装 system prompt 时被触发；`on_compress_context` 位于每轮 ReAct 顶部，在 reasoning 之前：

<Tree>
  <Tree.Folder name="on_reply" defaultOpen>
    <Tree.Folder name="ReAct loop（每一轮）" defaultOpen>
      <Tree.File name="on_compress_context（上下文压缩判定）" />

      <Tree.Folder name="on_reasoning" defaultOpen>
        <Tree.File name="on_system_prompt（组装 system prompt）" />

        <Tree.File name="on_model_call（模型 API 调用）" />
      </Tree.Folder>

      <Tree.Folder name="on_acting（每个工具调用一次）" />
    </Tree.Folder>
  </Tree.Folder>
</Tree>

<Note>
  当前 `on_acting` 只包裹 agent 运行时内部的工具执行；通过 external execution 在 agent 外部执行的工具不会被 `on_acting` 追踪到。
</Note>

## 装备 Middleware

AgentScope 把一组 hook 装在一个类里 —— 同一个 middleware 类可以同时实现 6 个 hook 位置（外加可选的 `list_tools` tool-provider hook）中的任意子集。把实例传入 `Agent(middlewares=[...])` 即可装备：

```python theme={null}
from agentscope.agent import Agent
from agentscope.middleware import TracingMiddleware

agent = Agent(
    name="assistant",
    system_prompt="You are a helpful assistant.",
    model=model,
    toolkit=toolkit,
    middlewares=[TracingMiddleware()],
)
```

构造时 agent 会扫描每个 middleware 实例，按它实际实现了哪些 hook，把它分配到对应位置的执行列表中。未实现的位置自动跳过，不产生任何调用开销。

## 内置 Middleware

### TracingMiddleware

`TracingMiddleware` 为 agent 全生命周期接入 [OpenTelemetry](https://opentelemetry.io/docs/specs/semconv/gen-ai/) 追踪。它在 `on_reply`、`on_model_call`、`on_acting` 三个位置打点，按层级生成 span。

使用前先在进程中注册 `TracerProvider` 与 OTLP 导出器：

```python theme={null}
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter

provider = TracerProvider()
provider.add_span_processor(
    BatchSpanProcessor(OTLPSpanExporter(endpoint="http://localhost:4318/v1/traces")),
)
trace.set_tracer_provider(provider)
```

随后把 `TracingMiddleware` 装到 agent 上即可：

```python theme={null}
from agentscope.agent import Agent
from agentscope.middleware import TracingMiddleware

agent = Agent(
    name="assistant",
    system_prompt="You are a helpful assistant.",
    model=model,
    toolkit=toolkit,
    middlewares=[TracingMiddleware()],
)
```

每次 reply 会产出一棵嵌套 span 树，每个层级捕获的关键属性如下：

<Tabs>
  <Tab title="Agent Reply Span">
    来自 `on_reply`：

    * Agent 名称、session ID、reply ID
    * 输入消息与最终输出消息
    * HITL 等待中的工具调用
    * External execution 等待中的工具调用
  </Tab>

  <Tab title="Model Call Span">
    来自 `on_model_call`：

    * 模型名、提供商、输入 / 输出 token 数
    * 请求与响应的消息内容
    * 对流式响应做包装，在最终 chunk 上写入属性
  </Tab>

  <Tab title="Tool Execution Span">
    来自 `on_acting`：

    * 工具名、调用 ID、入参
    * 工具执行结果
  </Tab>
</Tabs>

未配置 `TracerProvider` 时，所有 hook 会直接短路到 `next_handler()`，不创建 span 也不计算属性 —— 几乎零开销。

<Info>
  Agent 收到 `ExternalExecutionResultEvent`（外部执行的工具结果）时，`TracingMiddleware` 会为每条外部执行结果合成一个补偿 span，让外部系统执行的工具也能保留完整可观测性。
</Info>

#### 添加自定义 Span

如需在 agent 生命周期内追踪自定义操作，可直接使用 [OpenTelemetry Python SDK](https://opentelemetry.io/docs/languages/python/)。获取一个作用域为 AgentScope 的 tracer，并将目标代码包裹在 span 中：

```python theme={null}
from opentelemetry import trace
from agentscope import __version__

tracer = trace.get_tracer("agentscope", __version__)

with tracer.start_as_current_span(
    name="your_span_name",
    attributes={
        # 附加到 span 上的可选键值对，
        # 例如函数名、入参或任何自定义元数据。
    },
    end_on_exit=True,
) as span:
    # 你的代码
```

这些自定义 span 会与 AgentScope 内置的 span 一并上报，发送到 `TracerProvider` 中配置的同一个 OTLP collector。

### 预算限制中间件（ReplyBudgetControlMiddleware）

`ReplyBudgetControlMiddleware` 为**单次 reply 强制一个加权 token 预算**。它累计统计一次 reply 内所有 reasoning 步骤的 token 消耗，一旦预算耗尽，便指示 agent 立即收尾、不再调用任何工具。适用于为长链路、tool 密集的 ReAct 循环设定成本或时延上限。

每次模型调用的加权成本按如下公式计算：

```
cost = input_token_weight * input_tokens + output_token_weight * output_tokens
```

当累计成本达到 `token_budget` 时，middleware 会：

1. 在 agent 上下文最后一条 assistant 消息中追加一个 `HintBlock`（若不存在则新建一条 `AssistantMsg`），提醒模型给出最终结论性回复。
2. 把下一步 reasoning 的 `tool_choice` 强制覆写为 `ToolChoice(mode="none")`，阻止继续调用工具。

像其他 middleware 一样挂载：

```python theme={null}
from agentscope.agent import Agent
from agentscope.middleware import ReplyBudgetControlMiddleware

agent = Agent(
    name="assistant",
    system_prompt="You are a helpful assistant.",
    model=model,
    toolkit=toolkit,
    middlewares=[
        ReplyBudgetControlMiddleware(
            token_budget=10000,
            input_token_weight=1.0,
            output_token_weight=2.0,
        ),
    ],
)
```

构造参数如下：

<ParamField path="token_budget" type="float" required>
  单次 reply 允许的最大加权 token 成本。累计成本达到此阈值后，agent 会被指示立刻收尾、不再调用工具。
</ParamField>

<ParamField path="input_token_weight" type="float" default="1">
  计算加权成本时应用于 input tokens 的乘数。
</ParamField>

<ParamField path="output_token_weight" type="float" default="1">
  计算加权成本时应用于 output tokens 的乘数。通常将其设为高于 `input_token_weight`，以体现 output tokens 实际成本更高。
</ParamField>

<ParamField path="hint_message" type="str">
  预算超出时注入 agent 上下文的提示消息。默认是内置的收尾 prompt，要求模型给出最终结论性回复、不再调用任何工具。
</ParamField>

<Tip>
  该 middleware **实例自身无状态** —— 所有运行时状态存放在 `agent.state.middle_context` 中，以 middleware key 与当前 `reply_id` 作为索引。这意味着同一个 middleware 实例可以安全地在多个 agent 间共享，且预算状态在 human-in-the-loop（HITL）中断与恢复中保持有效。reply 结束时状态会被自动清理。
</Tip>

<Note>
  预算的作用域是**单次 reply**，而非 agent 生命周期。每次新的 reply 都会从零计数，因此该限制对每次 `agent(...)` 调用独立生效。
</Note>

### 文本转语音中间件（TTSMiddleware）

`TTSMiddleware` 拦截 agent 的文本输出并合成语音，将 `DataBlockStartEvent` / `DataBlockDeltaEvent` / `DataBlockEndEvent` 音频事件注入到事件流中。它在 `on_reply` 阶段拦截，观察每个 `TextBlockDeltaEvent` 和 `TextBlockEndEvent`。

```python theme={null}
from agentscope.agent import Agent
from agentscope.middleware import TTSMiddleware
from agentscope.tts import DashScopeTTSModel
from agentscope.credential import DashScopeCredential

agent = Agent(
    name="assistant",
    system_prompt="You are a helpful assistant.",
    model=model,
    toolkit=toolkit,
    middlewares=[
        TTSMiddleware(
            DashScopeTTSModel(
                credential=DashScopeCredential(api_key="..."),
                model="qwen3-tts-flash",
                parameters=DashScopeTTSModel.Parameters(voice="Cherry"),
                stream=True,
            ),
        ),
    ],
)
```

中间件根据 TTS 模型的模式自动适配：

| TTS 模式                  | 行为                                                                                                        |
| ----------------------- | --------------------------------------------------------------------------------------------------------- |
| 标准模式 (`realtime=False`) | 累积文本直到 `TextBlockEndEvent`，然后调用 `synthesize(text)` 一次性合成，将完整音频作为一个 data block 输出                          |
| 实时模式 (`realtime=True`)  | 每个 `TextBlockDeltaEvent.delta` 通过 `push()` 推送，音频 chunk 即时输出；`TextBlockEndEvent` 时调用 `synthesize()` 刷新剩余音频 |

输出事件流因模式而异：

**标准模式** — 音频在文本块结束后输出：

```
TextBlockStartEvent
TextBlockDeltaEvent (文本)
TextBlockDeltaEvent (文本)
TextBlockEndEvent
DataBlockStartEvent           ← 文本结束后合成音频
DataBlockDeltaEvent (音频)
DataBlockDeltaEvent (音频)
DataBlockEndEvent
```

**实时模式** — 音频 chunk 与文本交替到达（并发合成）：

```
TextBlockStartEvent
TextBlockDeltaEvent (文本)
DataBlockStartEvent           ← 文本流期间音频开始
DataBlockDeltaEvent (音频)
TextBlockDeltaEvent (文本)
DataBlockDeltaEvent (音频)
TextBlockEndEvent
DataBlockDeltaEvent (音频)   ← synthesize() 刷新的剩余音频
DataBlockEndEvent
```

每个 `DataBlockDeltaEvent.data` 携带增量 base64 编码的音频 chunk；完整音频为所有 delta 解码后的字节拼接，按 `block_id` 关联。

### 长期记忆中间件

AgentScope 以中间件的形式实现长期记忆，让 agent 能够在多个会话间持久保存并调取信息。记忆后端挂接 `on_reply`（reply 前检索 + reply 后写回）、`on_system_prompt`（声明记忆工具），并通过 `list_tools` 提供供 agent 调用的 memory 相关工具（例如 `search_memory` / `add_memory`）。当前可用的后端为 `Mem0Middleware`，由 [mem0](https://github.com/mem0ai/mem0) 驱动。安装方式、控制模式与构造方式详见[长期记忆](/versions/2.0.4dev/zh/building-blocks/long-term-memory)。

## 自定义中间件

继承 `MiddlewareBase`，只实现需要的 hook 即可，其它的不用管。

下面的示例用一个 middleware 覆盖了所有位置。每个 onion hook 都会收到一个 `input_kwargs` 字典，承载流入下一层的字段，通过 `next_handler(**input_kwargs)` 透传，也可以用关键字参数覆写其中某些字段：

```python theme={null}
from typing import AsyncGenerator, Awaitable, Callable

from agentscope.agent import Agent
from agentscope.event import AgentEvent
from agentscope.message import Msg
from agentscope.middleware import MiddlewareBase
from agentscope.model import ChatResponse
from agentscope.tool import ToolBase


class FullObservabilityMiddleware(MiddlewareBase):
    """同时观察所有 middleware 位置，并贡献一个工具。"""

    async def on_reply(
        self,
        agent: Agent,
        # {"inputs": Msg | list[Msg] | UserConfirmResultEvent | ExternalExecutionResultEvent | None}
        input_kwargs: dict,
        next_handler: Callable[..., AsyncGenerator[AgentEvent | Msg, None]],
    ) -> AsyncGenerator[AgentEvent | Msg, None]:
        print(f"[reply] start for {agent.name}")
        async for item in next_handler(**input_kwargs):
            yield item
        print(f"[reply] end for {agent.name}")

    async def on_reasoning(
        self,
        agent: Agent,
        # {"tool_choice": ToolChoice | None}
        input_kwargs: dict,
        next_handler: Callable[..., AsyncGenerator[AgentEvent, None]],
    ) -> AsyncGenerator[AgentEvent, None]:
        print("[reasoning] start")
        async for event in next_handler(**input_kwargs):
            yield event
        print("[reasoning] end")

    async def on_model_call(
        self,
        agent: Agent,
        # {"messages": list[Msg], "tools": list[dict], "tool_choice": ToolChoice | None, "current_model": ChatModelBase}
        input_kwargs: dict,
        next_handler: Callable[
            ..., Awaitable[ChatResponse | AsyncGenerator[ChatResponse, None]]
        ],
    ) -> ChatResponse | AsyncGenerator[ChatResponse, None]:
        print(f"[model_call] {input_kwargs['current_model'].model}")
        result = await next_handler(**input_kwargs)
        print("[model_call] done")
        return result

    async def on_compress_context(
        self,
        agent: Agent,
        # {"context_config": ContextConfig | None}
        input_kwargs: dict,
        next_handler: Callable[..., Awaitable[None]],
    ) -> None:
        print(f"[compress_context] checking context for {agent.name}")
        await next_handler(**input_kwargs)
        print("[compress_context] done")

    async def on_system_prompt(
        self,
        agent: Agent,
        current_prompt: str,
    ) -> str:
        print(f"[system_prompt] length={len(current_prompt)}")
        return current_prompt

    async def list_tools(self) -> list[ToolBase]:
        # 可选 hook。``Agent.__init__`` 不会自动调用；
        # 如要让这些工具可被 agent 使用，需要自行从 middleware 中收集
        # 并传入 toolkit。
        return []
```

### 执行顺序

Onion 类 hook（`on_reply`、`on_reasoning`、`on_acting`、`on_model_call`）—— **列表中第一个 middleware 处于最外层**：

```python theme={null}
middlewares = [mw1, mw2]
# 调用顺序：
# mw1 前 → mw2 前 → 内部逻辑 → mw2 后 → mw1 后
```

对于流式 / 产出事件的 hook，内层 middleware 先看到每一个 yield 出的事件：

```
mw1_pre → mw2_pre → mw2_event → mw1_event → ... → mw2_post → mw1_post
```

Transformer 类 hook（`on_system_prompt`）—— middleware **从左到右串行接力**：

```python theme={null}
middlewares = [mw1, mw2]
# original_prompt → mw1.on_system_prompt() → mw2.on_system_prompt() → final
```

一次 reply 中各 hook 的整体执行顺序遵循 agent 生命周期：

```
on_reply
  └── 每一轮 ReAct：
        ├── on_compress_context → compress_context()
        │     └── on_system_prompt（压缩前 token 计数）
        ├── on_reasoning
        │     ├── _prepare_model_input() → on_system_prompt
        │     └── on_model_call
        └── on_acting（本轮每个工具调用一次）
```

`list_tools` 不在每次 reply 的执行路径上，agent 不会自动调用它 —— 它是一个便捷接口，让 middleware 可以声明自己的工具。是否收集这些工具由组装 toolkit 的调用方决定。

## 实用示例

### 计时中间件

下面的 middleware 记录每次模型调用的耗时：

```python theme={null}
import time
from agentscope.middleware import MiddlewareBase

class TimingMiddleware(MiddlewareBase):
    async def on_model_call(self, agent, input_kwargs, next_handler):
        model_name = input_kwargs["current_model"].model
        start = time.time()

        result = await next_handler()

        elapsed = time.time() - start
        print(f"[timing] {agent.name} → {model_name}: {elapsed:.2f}s")
        return result
```

### 限速中间件

下面的 middleware 在两次模型调用之间强制留出最小间隔：

```python theme={null}
import asyncio
import time
from agentscope.middleware import MiddlewareBase

class RateLimitMiddleware(MiddlewareBase):
    def __init__(self, min_interval: float = 1.0):
        self._last_call = 0.0
        self._min_interval = min_interval

    async def on_model_call(self, agent, input_kwargs, next_handler):
        now = time.time()
        wait = self._min_interval - (now - self._last_call)
        if wait > 0:
            await asyncio.sleep(wait)
        self._last_call = time.time()
        return await next_handler()
```

### 动态 system prompt 中间件

下面的 middleware 在 system prompt 中注入实时上下文：

```python theme={null}
from datetime import datetime
from agentscope.middleware import MiddlewareBase

class DynamicContextMiddleware(MiddlewareBase):
    def __init__(self, context_fn):
        self._context_fn = context_fn

    async def on_system_prompt(self, agent, current_prompt):
        context = self._context_fn()
        return f"{current_prompt}\n\n## Current Context\n{context}"

agent = Agent(
    ...
    middlewares=[
        DynamicContextMiddleware(
            lambda: f"Time: {datetime.now().isoformat()}"
        ),
    ],
)
```

### 模型回退中间件

下面的中间件在主模型失败时切换到备用模型：

```python theme={null}
from agentscope.middleware import MiddlewareBase

class ModelFallbackMiddleware(MiddlewareBase):
    def __init__(self, fallback_model):
        self._fallback = fallback_model

    async def on_model_call(self, agent, input_kwargs, next_handler):
        try:
            return await next_handler()
        except Exception as e:
            print(f"Primary model failed: {e}, switching to fallback")
            return await next_handler(
                current_model=self._fallback,
            )
```
