> ## Documentation Index
> Fetch the complete documentation index at: https://docs.agentscope.io/llms.txt
> Use this file to discover all available pages before exploring further.

# Middleware

> Intercept and extend agent behavior at key lifecycle points

## Overview

Agent middleware is the mechanism for injecting custom logic — logging, tracing, input rewriting, access control — into key points of the agent execution pipeline, without modifying the agent or model code.

AgentScope exposes 6 hook positions plus a tool-provider hook, covering the full path from the outer reply process down to the raw model API call:

| Position              | Type        | Description                                                                                                                                                                                             |
| --------------------- | ----------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `on_reply`            | Onion       | Wraps a complete reply, covering all ReAct rounds, tool executions, and the final output                                                                                                                |
| `on_reasoning`        | Onion       | Wraps a single ReAct round's reasoning step (input assembly → model call → stream decoding)                                                                                                             |
| `on_acting`           | Onion       | Wraps a single tool call execution                                                                                                                                                                      |
| `on_model_call`       | Onion       | Wraps the underlying `ChatModel` API call — the closest to the model                                                                                                                                    |
| `on_compress_context` | Onion       | Wraps `Agent.compress_context()` — fires before each reasoning step when the agent decides whether to compress its context                                                                              |
| `on_system_prompt`    | Transformer | Fires every time the system prompt is assembled; multiple middlewares chain in sequence, each transforming the previous one's output                                                                    |
| `list_tools`          | Tool source | Optional. Returns a `list[ToolBase]` that the middleware contributes. **Not invoked automatically** — the caller assembling the agent's toolkit decides whether to call it and how to merge the result. |

<Note>
  These hooks operate at the **agent** level. For per-tool onion hooks that fire on every invocation of a specific tool — regardless of whether it's called inside or outside an agent — see [Tool Middleware](/versions/2.0.3/en/building-blocks/tool#tool-middleware).
</Note>

The three types differ as follows:

* **Onion** — middleware wraps the next handler, allowing logic before/after `next_handler()` and observation of the intermediate event stream.
* **Transformer** — middlewares form a pipeline; the previous one's output feeds into the next one. There is no "inner layer" concept.
* **Tool source** — not a hook on the runtime path. `Agent.__init__` does not call `list_tools()`; you opt in explicitly by collecting the tools from your middlewares and passing them into the toolkit yourself.

The diagram below shows how these hooks nest within the agent lifecycle. `on_system_prompt` is embedded inside `on_reasoning` because it fires when the reasoning step assembles the system prompt; `on_compress_context` sits at the top of each ReAct round, before reasoning:

<Tree>
  <Tree.Folder name="on_reply" defaultOpen>
    <Tree.Folder name="ReAct loop (per round)" defaultOpen>
      <Tree.File name="on_compress_context (context compression decision)" />

      <Tree.Folder name="on_reasoning" defaultOpen>
        <Tree.File name="on_system_prompt (system prompt assembly)" />

        <Tree.File name="on_model_call (model API call)" />
      </Tree.Folder>

      <Tree.Folder name="on_acting (once per tool call)" />
    </Tree.Folder>
  </Tree.Folder>
</Tree>

<Note>
  `on_acting` currently wraps only tool execution inside the agent runtime; tools dispatched outside the agent via external execution are not tracked by `on_acting`.
</Note>

## Equip Middleware

AgentScope packages a set of hooks into a class — a single middleware class can implement any subset of the 6 hook positions (plus the optional `list_tools` tool-provider hook) at the same time. Pass instances to `Agent(middlewares=[...])` to equip them:

```python theme={null}
from agentscope.agent import Agent
from agentscope.middleware import TracingMiddleware

agent = Agent(
    name="assistant",
    system_prompt="You are a helpful assistant.",
    model=model,
    toolkit=toolkit,
    middlewares=[TracingMiddleware()],
)
```

At construction time the agent scans each middleware instance, checks which hooks it actually implements, and routes it into the matching position-specific execution lists. Unimplemented positions are skipped automatically with no call overhead.

## Built-in Middleware

### TracingMiddleware

`TracingMiddleware` wires the full agent lifecycle to [OpenTelemetry](https://opentelemetry.io/docs/specs/semconv/gen-ai/) tracing. It instruments `on_reply`, `on_model_call`, and `on_acting`, producing hierarchical spans.

Before using it, register a `TracerProvider` and an OTLP exporter in the process:

```python theme={null}
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter

provider = TracerProvider()
provider.add_span_processor(
    BatchSpanProcessor(OTLPSpanExporter(endpoint="http://localhost:4318/v1/traces")),
)
trace.set_tracer_provider(provider)
```

Then attach `TracingMiddleware` to the agent:

```python theme={null}
from agentscope.agent import Agent
from agentscope.middleware import TracingMiddleware

agent = Agent(
    name="assistant",
    system_prompt="You are a helpful assistant.",
    model=model,
    toolkit=toolkit,
    middlewares=[TracingMiddleware()],
)
```

Each reply produces a nested span tree. The key attributes captured at each level are:

<Tabs>
  <Tab title="Agent Reply Span">
    From `on_reply`:

    * Agent name, session ID, reply ID
    * Input messages and the final output message
    * HITL pending tool calls
    * External execution pending tool calls
  </Tab>

  <Tab title="Model Call Span">
    From `on_model_call`:

    * Model name, provider, input/output token counts
    * Request and response message content
    * Wraps streaming responses, writing attributes onto the final chunk
  </Tab>

  <Tab title="Tool Execution Span">
    From `on_acting`:

    * Tool name, call ID, input arguments
    * Tool execution result
  </Tab>
</Tabs>

When no `TracerProvider` is configured, every hook short-circuits directly to `next_handler()` — no spans are created, no attributes are computed — making the overhead negligible.

<Info>
  When the agent receives an `ExternalExecutionResultEvent` (a tool executed outside the agent), `TracingMiddleware` synthesizes a compensating span for each external execution result, preserving full observability for tools run by external systems.
</Info>

#### Add Additional Spans

To trace custom operations within the agent lifecycle, use the [standard OpenTelemetry Python SDK](https://opentelemetry.io/docs/languages/python/) directly. Obtain a tracer scoped to AgentScope and wrap any target code in a span:

```python theme={null}
from opentelemetry import trace
from agentscope import __version__

tracer = trace.get_tracer("agentscope", __version__)

with tracer.start_as_current_span(
    name="your_span_name",
    attributes={
        # Optional key-value pairs attached to the span,
        # e.g. function name, input arguments, or any custom metadata.
    },
    end_on_exit=True,
) as span:
    # your code here
```

These custom spans are emitted alongside AgentScope's built-in spans and delivered to the same OTLP collector configured in the `TracerProvider`.

### ReplyBudgetControlMiddleware

`ReplyBudgetControlMiddleware` enforces a **weighted token budget per reply**. It tracks cumulative token usage across all reasoning steps within a single reply and, once the budget is exhausted, instructs the agent to wrap up immediately without invoking any further tools. This is useful for capping the cost or latency of long, tool-heavy ReAct loops.

The weighted cost is computed on every model call as:

```
cost = input_token_weight * input_tokens + output_token_weight * output_tokens
```

Once the accumulated cost reaches `token_budget`, the middleware:

1. Appends a `HintBlock` to the last assistant message in the agent's context (or creates a new `AssistantMsg` if needed), reminding the model to produce a final concluding response.
2. Overrides `tool_choice` to `ToolChoice(mode="none")` for the next reasoning step, preventing any further tool calls.

Attach it like any other middleware:

```python theme={null}
from agentscope.agent import Agent
from agentscope.middleware import ReplyBudgetControlMiddleware

agent = Agent(
    name="assistant",
    system_prompt="You are a helpful assistant.",
    model=model,
    toolkit=toolkit,
    middlewares=[
        ReplyBudgetControlMiddleware(
            token_budget=10000,
            input_token_weight=1.0,
            output_token_weight=2.0,
        ),
    ],
)
```

The constructor accepts the following parameters:

<ParamField path="token_budget" type="float" required>
  Maximum weighted token cost allowed per reply. Once the accumulated cost reaches this threshold, the agent is instructed to wrap up without calling any more tools.
</ParamField>

<ParamField path="input_token_weight" type="float" default="1">
  Multiplier applied to input tokens when computing the weighted cost.
</ParamField>

<ParamField path="output_token_weight" type="float" default="1">
  Multiplier applied to output tokens when computing the weighted cost. Set this higher than `input_token_weight` to reflect that output tokens are typically more expensive.
</ParamField>

<ParamField path="hint_message" type="str">
  The message injected into the agent's context when the budget is exceeded. Defaults to a built-in wrap-up prompt that asks the model to provide a final concluding response without invoking any tools.
</ParamField>

<Tip>
  The middleware is **stateless on the instance itself** — all runtime state lives in `agent.state.middle_context`, keyed by the middleware key and the current `reply_id`. This means the same middleware instance can safely be shared across multiple agents, and budget state persists across human-in-the-loop (HITL) interruptions and resumptions. State is automatically cleaned up when the reply ends.
</Tip>

<Note>
  The budget is scoped **per reply**, not per agent lifetime. Each new reply starts with a fresh counter, so the limit applies independently to every call to `agent(...)`.
</Note>

### TTSMiddleware

`TTSMiddleware` intercepts the agent's text output and synthesizes speech audio, injecting `DataBlockStartEvent` / `DataBlockDeltaEvent` / `DataBlockEndEvent` into the event stream alongside the text. It hooks into `on_reply` to observe every `TextBlockDeltaEvent` and `TextBlockEndEvent`.

```python theme={null}
from agentscope.agent import Agent
from agentscope.middleware import TTSMiddleware
from agentscope.tts import DashScopeTTSModel
from agentscope.credential import DashScopeCredential

agent = Agent(
    name="assistant",
    system_prompt="You are a helpful assistant.",
    model=model,
    toolkit=toolkit,
    middlewares=[
        TTSMiddleware(
            DashScopeTTSModel(
                credential=DashScopeCredential(api_key="..."),
                model="qwen3-tts-flash",
                parameters=DashScopeTTSModel.Parameters(voice="Cherry"),
                stream=True,
            ),
        ),
    ],
)
```

The middleware automatically adapts to the TTS model's mode:

| TTS Mode                        | Behavior                                                                                                                                                         |
| ------------------------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| Non-realtime (`realtime=False`) | Accumulates text until `TextBlockEndEvent`, then calls `synthesize(text)` and emits the full audio as one data block                                             |
| Realtime (`realtime=True`)      | Pushes each `TextBlockDeltaEvent.delta` via `push()`, emitting audio chunks as they arrive; calls `synthesize()` on `TextBlockEndEvent` to flush remaining audio |

The output event stream differs by mode:

**Non-realtime** — audio follows after the text block completes:

```
TextBlockStartEvent
TextBlockDeltaEvent (text)
TextBlockDeltaEvent (text)
TextBlockEndEvent
DataBlockStartEvent           ← audio synthesized after text ends
DataBlockDeltaEvent (audio)
DataBlockDeltaEvent (audio)
DataBlockEndEvent
```

**Realtime** — audio chunks arrive interleaved with text as synthesis runs concurrently:

```
TextBlockStartEvent
TextBlockDeltaEvent (text)
DataBlockStartEvent           ← audio begins during text stream
DataBlockDeltaEvent (audio)
TextBlockDeltaEvent (text)
DataBlockDeltaEvent (audio)
TextBlockEndEvent
DataBlockDeltaEvent (audio)   ← remaining audio from synthesize()
DataBlockEndEvent
```

Each `DataBlockDeltaEvent.data` carries an incremental base64-encoded audio chunk; the full audio is the concatenation of every delta's decoded bytes, keyed by `block_id`.

### Long-Term Memory

AgentScope implements long-term memory as middleware, so an agent can persist and recall durable facts across sessions. A memory backend hooks `on_reply` (pre-reply search + post-reply write-back), `on_system_prompt` (advertise memory tools), and contributes agent-callable memory tools (such as `search_memory` / `add_memory`) via `list_tools`. The currently available backend is `Mem0Middleware`, powered by [mem0](https://github.com/mem0ai/mem0). See [Long-Term Memory](/versions/2.0.3/en/building-blocks/long-term-memory) for installation, control modes, and construction paths.

## Custom Middleware

Subclass `MiddlewareBase` and implement only the hooks you need — leave the rest alone.

The example below covers every position in a single middleware. Each onion hook receives an `input_kwargs` dict carrying the fields that flow into the wrapped layer; forward it with `next_handler(**input_kwargs)`, or pass keyword arguments to override specific fields:

```python theme={null}
from typing import AsyncGenerator, Awaitable, Callable

from agentscope.agent import Agent
from agentscope.event import AgentEvent
from agentscope.message import Msg
from agentscope.middleware import MiddlewareBase
from agentscope.model import ChatResponse
from agentscope.tool import ToolBase


class FullObservabilityMiddleware(MiddlewareBase):
    """Observe every middleware position at once, plus contribute a tool."""

    async def on_reply(
        self,
        agent: Agent,
        # {"inputs": Msg | list[Msg] | UserConfirmResultEvent | ExternalExecutionResultEvent | None}
        input_kwargs: dict,
        next_handler: Callable[..., AsyncGenerator[AgentEvent | Msg, None]],
    ) -> AsyncGenerator[AgentEvent | Msg, None]:
        print(f"[reply] start for {agent.name}")
        async for item in next_handler(**input_kwargs):
            yield item
        print(f"[reply] end for {agent.name}")

    async def on_reasoning(
        self,
        agent: Agent,
        # {"tool_choice": ToolChoice | None}
        input_kwargs: dict,
        next_handler: Callable[..., AsyncGenerator[AgentEvent, None]],
    ) -> AsyncGenerator[AgentEvent, None]:
        print("[reasoning] start")
        async for event in next_handler(**input_kwargs):
            yield event
        print("[reasoning] end")

    async def on_model_call(
        self,
        agent: Agent,
        # {"messages": list[Msg], "tools": list[dict], "tool_choice": ToolChoice | None, "current_model": ChatModelBase}
        input_kwargs: dict,
        next_handler: Callable[
            ..., Awaitable[ChatResponse | AsyncGenerator[ChatResponse, None]]
        ],
    ) -> ChatResponse | AsyncGenerator[ChatResponse, None]:
        print(f"[model_call] {input_kwargs['current_model'].model}")
        result = await next_handler(**input_kwargs)
        print("[model_call] done")
        return result

    async def on_compress_context(
        self,
        agent: Agent,
        # {"context_config": ContextConfig | None}
        input_kwargs: dict,
        next_handler: Callable[..., Awaitable[None]],
    ) -> None:
        print(f"[compress_context] checking context for {agent.name}")
        await next_handler(**input_kwargs)
        print("[compress_context] done")

    async def on_system_prompt(
        self,
        agent: Agent,
        current_prompt: str,
    ) -> str:
        print(f"[system_prompt] length={len(current_prompt)}")
        return current_prompt

    async def list_tools(self) -> list[ToolBase]:
        # Optional hook. Not invoked automatically by ``Agent.__init__``;
        # if you want these tools available to the agent, collect them
        # from your middlewares yourself and pass them into the toolkit.
        return []
```

### Execution Order

Onion hooks (`on_reply`, `on_reasoning`, `on_acting`, `on_model_call`) — **the first middleware in the list is the outermost layer**:

```python theme={null}
middlewares = [mw1, mw2]
# Call order:
# mw1 pre → mw2 pre → inner logic → mw2 post → mw1 post
```

For streaming / event-yielding hooks, the inner middleware sees each yielded event first:

```
mw1_pre → mw2_pre → mw2_event → mw1_event → ... → mw2_post → mw1_post
```

Transformer hooks (`on_system_prompt`) — middlewares **chain left to right**:

```python theme={null}
middlewares = [mw1, mw2]
# original_prompt → mw1.on_system_prompt() → mw2.on_system_prompt() → final
```

The overall execution order of all hooks within a single reply follows the agent lifecycle:

```
on_reply
  └── per ReAct round:
        ├── on_compress_context → compress_context()
        │     └── on_system_prompt (token counting before compression)
        ├── on_reasoning
        │     ├── _prepare_model_input() → on_system_prompt
        │     └── on_model_call
        └── on_acting (once per tool call in this round)
```

`list_tools` is not part of the per-reply execution path and is not invoked automatically by the agent — it is a convenience interface so a middleware can advertise its own tools. The caller assembling the toolkit decides whether to collect them.

## Practical Examples

### Timing middleware

The middleware below records the elapsed time of every model call:

```python theme={null}
import time
from agentscope.middleware import MiddlewareBase

class TimingMiddleware(MiddlewareBase):
    async def on_model_call(self, agent, input_kwargs, next_handler):
        model_name = input_kwargs["current_model"].model
        start = time.time()

        result = await next_handler()

        elapsed = time.time() - start
        print(f"[timing] {agent.name} → {model_name}: {elapsed:.2f}s")
        return result
```

### Rate-limiting middleware

The middleware below enforces a minimum interval between two model calls:

```python theme={null}
import asyncio
import time
from agentscope.middleware import MiddlewareBase

class RateLimitMiddleware(MiddlewareBase):
    def __init__(self, min_interval: float = 1.0):
        self._last_call = 0.0
        self._min_interval = min_interval

    async def on_model_call(self, agent, input_kwargs, next_handler):
        now = time.time()
        wait = self._min_interval - (now - self._last_call)
        if wait > 0:
            await asyncio.sleep(wait)
        self._last_call = time.time()
        return await next_handler()
```

### Dynamic system prompt middleware

The middleware below injects real-time context into the system prompt:

```python theme={null}
from datetime import datetime
from agentscope.middleware import MiddlewareBase

class DynamicContextMiddleware(MiddlewareBase):
    def __init__(self, context_fn):
        self._context_fn = context_fn

    async def on_system_prompt(self, agent, current_prompt):
        context = self._context_fn()
        return f"{current_prompt}\n\n## Current Context\n{context}"

agent = Agent(
    ...
    middlewares=[
        DynamicContextMiddleware(
            lambda: f"Time: {datetime.now().isoformat()}"
        ),
    ],
)
```

### Model fallback middleware

The middleware below switches to a fallback model when the primary one fails:

```python theme={null}
from agentscope.middleware import MiddlewareBase

class ModelFallbackMiddleware(MiddlewareBase):
    def __init__(self, fallback_model):
        self._fallback = fallback_model

    async def on_model_call(self, agent, input_kwargs, next_handler):
        try:
            return await next_handler()
        except Exception as e:
            print(f"Primary model failed: {e}, switching to fallback")
            return await next_handler(
                current_model=self._fallback,
            )
```
