Skip to main content

Overview

Agent middleware is the mechanism for injecting custom logic — logging, tracing, input rewriting, access control — into key points of the agent execution pipeline, without modifying the agent or model code. AgentScope exposes 6 hook positions plus a tool-provider hook, covering the full path from the outer reply process down to the raw model API call:
PositionTypeDescription
on_replyOnionWraps a complete reply, covering all ReAct rounds, tool executions, and the final output
on_reasoningOnionWraps a single ReAct round’s reasoning step (input assembly → model call → stream decoding)
on_actingOnionWraps a single tool call execution
on_model_callOnionWraps the underlying ChatModel API call — the closest to the model
on_compress_contextOnionWraps Agent.compress_context() — fires before each reasoning step when the agent decides whether to compress its context
on_system_promptTransformerFires every time the system prompt is assembled; multiple middlewares chain in sequence, each transforming the previous one’s output
list_toolsTool sourceOptional. Returns a list[ToolBase] that the middleware contributes. Not invoked automatically — the caller assembling the agent’s toolkit decides whether to call it and how to merge the result.
These hooks operate at the agent level. For per-tool onion hooks that fire on every invocation of a specific tool — regardless of whether it’s called inside or outside an agent — see Tool Middleware.
The three types differ as follows:
  • Onion — middleware wraps the next handler, allowing logic before/after next_handler() and observation of the intermediate event stream.
  • Transformer — middlewares form a pipeline; the previous one’s output feeds into the next one. There is no “inner layer” concept.
  • Tool source — not a hook on the runtime path. Agent.__init__ does not call list_tools(); you opt in explicitly by collecting the tools from your middlewares and passing them into the toolkit yourself.
The diagram below shows how these hooks nest within the agent lifecycle. on_system_prompt is embedded inside on_reasoning because it fires when the reasoning step assembles the system prompt; on_compress_context sits at the top of each ReAct round, before reasoning:
on_reply
ReAct loop (per round)
on_compress_context (context compression decision)
on_reasoning
on_system_prompt (system prompt assembly)
on_model_call (model API call)
on_acting (once per tool call)
on_acting currently wraps only tool execution inside the agent runtime; tools dispatched outside the agent via external execution are not tracked by on_acting.

Equip Middleware

AgentScope packages a set of hooks into a class — a single middleware class can implement any subset of the 6 hook positions (plus the optional list_tools tool-provider hook) at the same time. Pass instances to Agent(middlewares=[...]) to equip them:
from agentscope.agent import Agent
from agentscope.middleware import TracingMiddleware

agent = Agent(
    name="assistant",
    system_prompt="You are a helpful assistant.",
    model=model,
    toolkit=toolkit,
    middlewares=[TracingMiddleware()],
)
At construction time the agent scans each middleware instance, checks which hooks it actually implements, and routes it into the matching position-specific execution lists. Unimplemented positions are skipped automatically with no call overhead.

Built-in Middleware

TracingMiddleware

TracingMiddleware wires the full agent lifecycle to OpenTelemetry tracing. It instruments on_reply, on_model_call, and on_acting, producing hierarchical spans. Before using it, register a TracerProvider and an OTLP exporter in the process:
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter

provider = TracerProvider()
provider.add_span_processor(
    BatchSpanProcessor(OTLPSpanExporter(endpoint="http://localhost:4318/v1/traces")),
)
trace.set_tracer_provider(provider)
Then attach TracingMiddleware to the agent:
from agentscope.agent import Agent
from agentscope.middleware import TracingMiddleware

agent = Agent(
    name="assistant",
    system_prompt="You are a helpful assistant.",
    model=model,
    toolkit=toolkit,
    middlewares=[TracingMiddleware()],
)
Each reply produces a nested span tree. The key attributes captured at each level are:
From on_reply:
  • Agent name, session ID, reply ID
  • Input messages and the final output message
  • HITL pending tool calls
  • External execution pending tool calls
When no TracerProvider is configured, every hook short-circuits directly to next_handler() — no spans are created, no attributes are computed — making the overhead negligible.
When the agent receives an ExternalExecutionResultEvent (a tool executed outside the agent), TracingMiddleware synthesizes a compensating span for each external execution result, preserving full observability for tools run by external systems.

Add Additional Spans

To trace custom operations within the agent lifecycle, use the standard OpenTelemetry Python SDK directly. Obtain a tracer scoped to AgentScope and wrap any target code in a span:
from opentelemetry import trace
from agentscope import __version__

tracer = trace.get_tracer("agentscope", __version__)

with tracer.start_as_current_span(
    name="your_span_name",
    attributes={
        # Optional key-value pairs attached to the span,
        # e.g. function name, input arguments, or any custom metadata.
    },
    end_on_exit=True,
) as span:
    # your code here
These custom spans are emitted alongside AgentScope’s built-in spans and delivered to the same OTLP collector configured in the TracerProvider.

ReplyBudgetControlMiddleware

ReplyBudgetControlMiddleware enforces a weighted token budget per reply. It tracks cumulative token usage across all reasoning steps within a single reply and, once the budget is exhausted, instructs the agent to wrap up immediately without invoking any further tools. This is useful for capping the cost or latency of long, tool-heavy ReAct loops. The weighted cost is computed on every model call as:
cost = input_token_weight * input_tokens + output_token_weight * output_tokens
Once the accumulated cost reaches token_budget, the middleware:
  1. Appends a HintBlock to the last assistant message in the agent’s context (or creates a new AssistantMsg if needed), reminding the model to produce a final concluding response.
  2. Overrides tool_choice to ToolChoice(mode="none") for the next reasoning step, preventing any further tool calls.
Attach it like any other middleware:
from agentscope.agent import Agent
from agentscope.middleware import ReplyBudgetControlMiddleware

agent = Agent(
    name="assistant",
    system_prompt="You are a helpful assistant.",
    model=model,
    toolkit=toolkit,
    middlewares=[
        ReplyBudgetControlMiddleware(
            token_budget=10000,
            input_token_weight=1.0,
            output_token_weight=2.0,
        ),
    ],
)
The constructor accepts the following parameters:
token_budget
float
required
Maximum weighted token cost allowed per reply. Once the accumulated cost reaches this threshold, the agent is instructed to wrap up without calling any more tools.
input_token_weight
float
default:"1"
Multiplier applied to input tokens when computing the weighted cost.
output_token_weight
float
default:"1"
Multiplier applied to output tokens when computing the weighted cost. Set this higher than input_token_weight to reflect that output tokens are typically more expensive.
hint_message
str
The message injected into the agent’s context when the budget is exceeded. Defaults to a built-in wrap-up prompt that asks the model to provide a final concluding response without invoking any tools.
The middleware is stateless on the instance itself — all runtime state lives in agent.state.middle_context, keyed by the middleware key and the current reply_id. This means the same middleware instance can safely be shared across multiple agents, and budget state persists across human-in-the-loop (HITL) interruptions and resumptions. State is automatically cleaned up when the reply ends.
The budget is scoped per reply, not per agent lifetime. Each new reply starts with a fresh counter, so the limit applies independently to every call to agent(...).

TTSMiddleware

TTSMiddleware intercepts the agent’s text output and synthesizes speech audio, injecting DataBlockStartEvent / DataBlockDeltaEvent / DataBlockEndEvent into the event stream alongside the text. It hooks into on_reply to observe every TextBlockDeltaEvent and TextBlockEndEvent.
from agentscope.agent import Agent
from agentscope.middleware import TTSMiddleware
from agentscope.tts import DashScopeTTSModel
from agentscope.credential import DashScopeCredential

agent = Agent(
    name="assistant",
    system_prompt="You are a helpful assistant.",
    model=model,
    toolkit=toolkit,
    middlewares=[
        TTSMiddleware(
            DashScopeTTSModel(
                credential=DashScopeCredential(api_key="..."),
                model="qwen3-tts-flash",
                parameters=DashScopeTTSModel.Parameters(voice="Cherry"),
                stream=True,
            ),
        ),
    ],
)
The middleware automatically adapts to the TTS model’s mode:
TTS ModeBehavior
Non-realtime (realtime=False)Accumulates text until TextBlockEndEvent, then calls synthesize(text) and emits the full audio as one data block
Realtime (realtime=True)Pushes each TextBlockDeltaEvent.delta via push(), emitting audio chunks as they arrive; calls synthesize() on TextBlockEndEvent to flush remaining audio
The output event stream differs by mode: Non-realtime — audio follows after the text block completes:
TextBlockStartEvent
TextBlockDeltaEvent (text)
TextBlockDeltaEvent (text)
TextBlockEndEvent
DataBlockStartEvent           ← audio synthesized after text ends
DataBlockDeltaEvent (audio)
DataBlockDeltaEvent (audio)
DataBlockEndEvent
Realtime — audio chunks arrive interleaved with text as synthesis runs concurrently:
TextBlockStartEvent
TextBlockDeltaEvent (text)
DataBlockStartEvent           ← audio begins during text stream
DataBlockDeltaEvent (audio)
TextBlockDeltaEvent (text)
DataBlockDeltaEvent (audio)
TextBlockEndEvent
DataBlockDeltaEvent (audio)   ← remaining audio from synthesize()
DataBlockEndEvent
Each DataBlockDeltaEvent.data carries an incremental base64-encoded audio chunk; the full audio is the concatenation of every delta’s decoded bytes, keyed by block_id.

Long-Term Memory

AgentScope implements long-term memory as middleware, so an agent can persist and recall durable facts across sessions. A memory backend hooks on_reply (pre-reply search + post-reply write-back), on_system_prompt (advertise memory tools), and contributes agent-callable memory tools (such as search_memory / add_memory) via list_tools. The currently available backend is Mem0Middleware, powered by mem0. See Long-Term Memory for installation, control modes, and construction paths.

Custom Middleware

Subclass MiddlewareBase and implement only the hooks you need — leave the rest alone. The example below covers every position in a single middleware. Each onion hook receives an input_kwargs dict carrying the fields that flow into the wrapped layer; forward it with next_handler(**input_kwargs), or pass keyword arguments to override specific fields:
from typing import AsyncGenerator, Awaitable, Callable

from agentscope.agent import Agent
from agentscope.event import AgentEvent
from agentscope.message import Msg
from agentscope.middleware import MiddlewareBase
from agentscope.model import ChatResponse
from agentscope.tool import ToolBase


class FullObservabilityMiddleware(MiddlewareBase):
    """Observe every middleware position at once, plus contribute a tool."""

    async def on_reply(
        self,
        agent: Agent,
        # {"inputs": Msg | list[Msg] | UserConfirmResultEvent | ExternalExecutionResultEvent | None}
        input_kwargs: dict,
        next_handler: Callable[..., AsyncGenerator[AgentEvent | Msg, None]],
    ) -> AsyncGenerator[AgentEvent | Msg, None]:
        print(f"[reply] start for {agent.name}")
        async for item in next_handler(**input_kwargs):
            yield item
        print(f"[reply] end for {agent.name}")

    async def on_reasoning(
        self,
        agent: Agent,
        # {"tool_choice": ToolChoice | None}
        input_kwargs: dict,
        next_handler: Callable[..., AsyncGenerator[AgentEvent, None]],
    ) -> AsyncGenerator[AgentEvent, None]:
        print("[reasoning] start")
        async for event in next_handler(**input_kwargs):
            yield event
        print("[reasoning] end")

    async def on_model_call(
        self,
        agent: Agent,
        # {"messages": list[Msg], "tools": list[dict], "tool_choice": ToolChoice | None, "current_model": ChatModelBase}
        input_kwargs: dict,
        next_handler: Callable[
            ..., Awaitable[ChatResponse | AsyncGenerator[ChatResponse, None]]
        ],
    ) -> ChatResponse | AsyncGenerator[ChatResponse, None]:
        print(f"[model_call] {input_kwargs['current_model'].model}")
        result = await next_handler(**input_kwargs)
        print("[model_call] done")
        return result

    async def on_compress_context(
        self,
        agent: Agent,
        # {"context_config": ContextConfig | None}
        input_kwargs: dict,
        next_handler: Callable[..., Awaitable[None]],
    ) -> None:
        print(f"[compress_context] checking context for {agent.name}")
        await next_handler(**input_kwargs)
        print("[compress_context] done")

    async def on_system_prompt(
        self,
        agent: Agent,
        current_prompt: str,
    ) -> str:
        print(f"[system_prompt] length={len(current_prompt)}")
        return current_prompt

    async def list_tools(self) -> list[ToolBase]:
        # Optional hook. Not invoked automatically by ``Agent.__init__``;
        # if you want these tools available to the agent, collect them
        # from your middlewares yourself and pass them into the toolkit.
        return []

Execution Order

Onion hooks (on_reply, on_reasoning, on_acting, on_model_call) — the first middleware in the list is the outermost layer:
middlewares = [mw1, mw2]
# Call order:
# mw1 pre → mw2 pre → inner logic → mw2 post → mw1 post
For streaming / event-yielding hooks, the inner middleware sees each yielded event first:
mw1_pre → mw2_pre → mw2_event → mw1_event → ... → mw2_post → mw1_post
Transformer hooks (on_system_prompt) — middlewares chain left to right:
middlewares = [mw1, mw2]
# original_prompt → mw1.on_system_prompt() → mw2.on_system_prompt() → final
The overall execution order of all hooks within a single reply follows the agent lifecycle:
on_reply
  └── per ReAct round:
        ├── on_compress_context → compress_context()
        │     └── on_system_prompt (token counting before compression)
        ├── on_reasoning
        │     ├── _prepare_model_input() → on_system_prompt
        │     └── on_model_call
        └── on_acting (once per tool call in this round)
list_tools is not part of the per-reply execution path and is not invoked automatically by the agent — it is a convenience interface so a middleware can advertise its own tools. The caller assembling the toolkit decides whether to collect them.

Practical Examples

Timing middleware

The middleware below records the elapsed time of every model call:
import time
from agentscope.middleware import MiddlewareBase

class TimingMiddleware(MiddlewareBase):
    async def on_model_call(self, agent, input_kwargs, next_handler):
        model_name = input_kwargs["current_model"].model
        start = time.time()

        result = await next_handler()

        elapsed = time.time() - start
        print(f"[timing] {agent.name}{model_name}: {elapsed:.2f}s")
        return result

Rate-limiting middleware

The middleware below enforces a minimum interval between two model calls:
import asyncio
import time
from agentscope.middleware import MiddlewareBase

class RateLimitMiddleware(MiddlewareBase):
    def __init__(self, min_interval: float = 1.0):
        self._last_call = 0.0
        self._min_interval = min_interval

    async def on_model_call(self, agent, input_kwargs, next_handler):
        now = time.time()
        wait = self._min_interval - (now - self._last_call)
        if wait > 0:
            await asyncio.sleep(wait)
        self._last_call = time.time()
        return await next_handler()

Dynamic system prompt middleware

The middleware below injects real-time context into the system prompt:
from datetime import datetime
from agentscope.middleware import MiddlewareBase

class DynamicContextMiddleware(MiddlewareBase):
    def __init__(self, context_fn):
        self._context_fn = context_fn

    async def on_system_prompt(self, agent, current_prompt):
        context = self._context_fn()
        return f"{current_prompt}\n\n## Current Context\n{context}"

agent = Agent(
    ...
    middlewares=[
        DynamicContextMiddleware(
            lambda: f"Time: {datetime.now().isoformat()}"
        ),
    ],
)

Model fallback middleware

The middleware below switches to a fallback model when the primary one fails:
from agentscope.middleware import MiddlewareBase

class ModelFallbackMiddleware(MiddlewareBase):
    def __init__(self, fallback_model):
        self._fallback = fallback_model

    async def on_model_call(self, agent, input_kwargs, next_handler):
        try:
            return await next_handler()
        except Exception as e:
            print(f"Primary model failed: {e}, switching to fallback")
            return await next_handler(
                current_model=self._fallback,
            )