Skip to main content

Overview

Agent middleware is the mechanism for injecting custom logic — logging, tracing, input rewriting, access control — into key points of the agent execution pipeline, without modifying the agent or model code. AgentScope exposes 6 hook positions plus a tool-provider hook, covering the full path from the outer reply process down to the raw model API call:
PositionTypeDescription
on_replyOnionWraps a complete reply, covering all ReAct rounds, tool executions, and the final output
on_reasoningOnionWraps a single ReAct round’s reasoning step (input assembly → model call → stream decoding)
on_actingOnionWraps a single tool call execution
on_model_callOnionWraps the underlying ChatModel API call — the closest to the model
on_compress_contextOnionWraps Agent.compress_context() — fires before each reasoning step when the agent decides whether to compress its context
on_system_promptTransformerFires every time the system prompt is assembled; multiple middlewares chain in sequence, each transforming the previous one’s output
list_toolsTool sourceOptional. Returns a list[ToolBase] that the middleware contributes. Not invoked automatically — the caller assembling the agent’s toolkit decides whether to call it and how to merge the result.
The three types differ as follows:
  • Onion — middleware wraps the next handler, allowing logic before/after next_handler() and observation of the intermediate event stream.
  • Transformer — middlewares form a pipeline; the previous one’s output feeds into the next one. There is no “inner layer” concept.
  • Tool source — not a hook on the runtime path. Agent.__init__ does not call list_tools(); you opt in explicitly by collecting the tools from your middlewares and passing them into the toolkit yourself.
The diagram below shows how these hooks nest within the agent lifecycle. on_system_prompt is embedded inside on_reasoning because it fires when the reasoning step assembles the system prompt; on_compress_context sits at the top of each ReAct round, before reasoning:
on_reply
ReAct loop (per round)
on_compress_context (context compression decision)
on_reasoning
on_system_prompt (system prompt assembly)
on_model_call (model API call)
on_acting (once per tool call)
on_acting currently wraps only tool execution inside the agent runtime; tools dispatched outside the agent via external execution are not tracked by on_acting.

Equip Middleware

AgentScope packages a set of hooks into a class — a single middleware class can implement any subset of the 6 hook positions (plus the optional list_tools tool-provider hook) at the same time. Pass instances to Agent(middlewares=[...]) to equip them:
from agentscope.agent import Agent
from agentscope.middleware import TracingMiddleware

agent = Agent(
    name="assistant",
    system_prompt="You are a helpful assistant.",
    model=model,
    toolkit=toolkit,
    middlewares=[TracingMiddleware()],
)
At construction time the agent scans each middleware instance, checks which hooks it actually implements, and routes it into the matching position-specific execution lists. Unimplemented positions are skipped automatically with no call overhead.

Built-in Middleware

TracingMiddleware

TracingMiddleware wires the full agent lifecycle to OpenTelemetry tracing. It instruments on_reply, on_model_call, and on_acting, producing hierarchical spans. Before using it, register a TracerProvider and an OTLP exporter in the process:
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter

provider = TracerProvider()
provider.add_span_processor(
    BatchSpanProcessor(OTLPSpanExporter(endpoint="http://localhost:4318/v1/traces")),
)
trace.set_tracer_provider(provider)
Then attach TracingMiddleware to the agent:
from agentscope.agent import Agent
from agentscope.middleware import TracingMiddleware

agent = Agent(
    name="assistant",
    system_prompt="You are a helpful assistant.",
    model=model,
    toolkit=toolkit,
    middlewares=[TracingMiddleware()],
)
Each reply produces a nested span tree. The key attributes captured at each level are:
From on_reply:
  • Agent name, session ID, reply ID
  • Input messages and the final output message
  • HITL pending tool calls
  • External execution pending tool calls
When no TracerProvider is configured, every hook short-circuits directly to next_handler() — no spans are created, no attributes are computed — making the overhead negligible.
When the agent receives an ExternalExecutionResultEvent (a tool executed outside the agent), TracingMiddleware synthesizes a compensating span for each external execution result, preserving full observability for tools run by external systems.

Add Additional Spans

To trace custom operations within the agent lifecycle, use the standard OpenTelemetry Python SDK directly. Obtain a tracer scoped to AgentScope and wrap any target code in a span:
from opentelemetry import trace
from agentscope import __version__

tracer = trace.get_tracer("agentscope", __version__)

with tracer.start_as_current_span(
    name="your_span_name",
    attributes={
        # Optional key-value pairs attached to the span,
        # e.g. function name, input arguments, or any custom metadata.
    },
    end_on_exit=True,
) as span:
    # your code here
These custom spans are emitted alongside AgentScope’s built-in spans and delivered to the same OTLP collector configured in the TracerProvider.

Custom Middleware

Subclass MiddlewareBase and implement only the hooks you need — leave the rest alone. The example below covers every position in a single middleware. Each onion hook receives an input_kwargs dict carrying the fields that flow into the wrapped layer; forward it with next_handler(**input_kwargs), or pass keyword arguments to override specific fields:
from typing import AsyncGenerator, Awaitable, Callable

from agentscope.agent import Agent
from agentscope.event import AgentEvent
from agentscope.message import Msg
from agentscope.middleware import MiddlewareBase
from agentscope.model import ChatResponse
from agentscope.tool import ToolBase


class FullObservabilityMiddleware(MiddlewareBase):
    """Observe every middleware position at once, plus contribute a tool."""

    async def on_reply(
        self,
        agent: Agent,
        # {"inputs": Msg | list[Msg] | UserConfirmResultEvent | ExternalExecutionResultEvent | None}
        input_kwargs: dict,
        next_handler: Callable[..., AsyncGenerator[AgentEvent | Msg, None]],
    ) -> AsyncGenerator[AgentEvent | Msg, None]:
        print(f"[reply] start for {agent.name}")
        async for item in next_handler(**input_kwargs):
            yield item
        print(f"[reply] end for {agent.name}")

    async def on_reasoning(
        self,
        agent: Agent,
        # {"tool_choice": ToolChoice | None}
        input_kwargs: dict,
        next_handler: Callable[..., AsyncGenerator[AgentEvent, None]],
    ) -> AsyncGenerator[AgentEvent, None]:
        print("[reasoning] start")
        async for event in next_handler(**input_kwargs):
            yield event
        print("[reasoning] end")

    async def on_model_call(
        self,
        agent: Agent,
        # {"messages": list[Msg], "tools": list[dict], "tool_choice": ToolChoice | None, "current_model": ChatModelBase}
        input_kwargs: dict,
        next_handler: Callable[
            ..., Awaitable[ChatResponse | AsyncGenerator[ChatResponse, None]]
        ],
    ) -> ChatResponse | AsyncGenerator[ChatResponse, None]:
        print(f"[model_call] {input_kwargs['current_model'].model}")
        result = await next_handler(**input_kwargs)
        print("[model_call] done")
        return result

    async def on_compress_context(
        self,
        agent: Agent,
        # {"context_config": ContextConfig | None}
        input_kwargs: dict,
        next_handler: Callable[..., Awaitable[None]],
    ) -> None:
        print(f"[compress_context] checking context for {agent.name}")
        await next_handler(**input_kwargs)
        print("[compress_context] done")

    async def on_system_prompt(
        self,
        agent: Agent,
        current_prompt: str,
    ) -> str:
        print(f"[system_prompt] length={len(current_prompt)}")
        return current_prompt

    async def list_tools(self) -> list[ToolBase]:
        # Optional hook. Not invoked automatically by ``Agent.__init__``;
        # if you want these tools available to the agent, collect them
        # from your middlewares yourself and pass them into the toolkit.
        return []

Execution Order

Onion hooks (on_reply, on_reasoning, on_acting, on_model_call) — the first middleware in the list is the outermost layer:
middlewares = [mw1, mw2]
# Call order:
# mw1 pre → mw2 pre → inner logic → mw2 post → mw1 post
For streaming / event-yielding hooks, the inner middleware sees each yielded event first:
mw1_pre → mw2_pre → mw2_event → mw1_event → ... → mw2_post → mw1_post
Transformer hooks (on_system_prompt) — middlewares chain left to right:
middlewares = [mw1, mw2]
# original_prompt → mw1.on_system_prompt() → mw2.on_system_prompt() → final
The overall execution order of all hooks within a single reply follows the agent lifecycle:
on_reply
  └── per ReAct round:
        ├── on_compress_context → compress_context()
        │     └── on_system_prompt (token counting before compression)
        ├── on_reasoning
        │     ├── _prepare_model_input() → on_system_prompt
        │     └── on_model_call
        └── on_acting (once per tool call in this round)
list_tools is not part of the per-reply execution path and is not invoked automatically by the agent — it is a convenience interface so a middleware can advertise its own tools. The caller assembling the toolkit decides whether to collect them.

Practical Examples

Timing middleware

The middleware below records the elapsed time of every model call:
import time
from agentscope.middleware import MiddlewareBase

class TimingMiddleware(MiddlewareBase):
    async def on_model_call(self, agent, input_kwargs, next_handler):
        model_name = input_kwargs["current_model"].model
        start = time.time()

        result = await next_handler()

        elapsed = time.time() - start
        print(f"[timing] {agent.name}{model_name}: {elapsed:.2f}s")
        return result

Rate-limiting middleware

The middleware below enforces a minimum interval between two model calls:
import asyncio
import time
from agentscope.middleware import MiddlewareBase

class RateLimitMiddleware(MiddlewareBase):
    def __init__(self, min_interval: float = 1.0):
        self._last_call = 0.0
        self._min_interval = min_interval

    async def on_model_call(self, agent, input_kwargs, next_handler):
        now = time.time()
        wait = self._min_interval - (now - self._last_call)
        if wait > 0:
            await asyncio.sleep(wait)
        self._last_call = time.time()
        return await next_handler()

Dynamic system prompt middleware

The middleware below injects real-time context into the system prompt:
from datetime import datetime
from agentscope.middleware import MiddlewareBase

class DynamicContextMiddleware(MiddlewareBase):
    def __init__(self, context_fn):
        self._context_fn = context_fn

    async def on_system_prompt(self, agent, current_prompt):
        context = self._context_fn()
        return f"{current_prompt}\n\n## Current Context\n{context}"

agent = Agent(
    ...
    middlewares=[
        DynamicContextMiddleware(
            lambda: f"Time: {datetime.now().isoformat()}"
        ),
    ],
)

Model fallback middleware

The middleware below switches to a fallback model when the primary one fails:
from agentscope.middleware import MiddlewareBase

class ModelFallbackMiddleware(MiddlewareBase):
    def __init__(self, fallback_model):
        self._fallback = fallback_model

    async def on_model_call(self, agent, input_kwargs, next_handler):
        try:
            return await next_handler()
        except Exception as e:
            print(f"Primary model failed: {e}, switching to fallback")
            return await next_handler(
                current_model=self._fallback,
            )