In the master-worker pattern, a central agent (master) coordinates and delegates tasks to subordinate agents (workers). The master agent controls the execution flow and decides which worker should handle each task.Key characteristics:
Centralized control and decision-making
Explicit task delegation and routing
Workers typically don’t communicate directly with each other
Clear hierarchical structure
Use cases:
Task decomposition and parallel execution
Specialized agents for different domains (e.g., code generation, web search, data analysis)
Dynamic agent creation based on task requirements
Complex workflows requiring orchestration logic
Implementation approaches:
Explicit routing: Use structured output or tool calls to route tasks to different agents
Agent-as-tool: Wrap sub-agents as tool functions that the master agent can invoke
In the conversation pattern, agents communicate by broadcasting and passing Msg objects among themselves. There’s no central controller — agents interact peer-to-peer, and the execution flow emerges from their interactions.Key characteristics:
Decentralized communication
Message-driven coordination
Agents observe and respond to each other’s messages
Flexible, dynamic interaction patterns
Use cases:
Multi-agent discussions and debates
Collaborative problem-solving
Simulations with multiple autonomous entities
Scenarios requiring peer-to-peer communication
Core tools:
MsgHub: Automatically broadcasts messages among a group of agents
A single application can use both paradigms. For example, a master agent might orchestrate multiple conversation groups, or agents within a conversation might delegate specialized tasks to worker agents.
Wrap entire agents as tool functions to enable dynamic agent creation and delegation. The master agent invokes these tools to create and execute worker agents.
from agentscope.tool import Toolkit, ToolResponse, execute_python_codeasync def create_worker(task_description: str) -> ToolResponse: """Create a worker agent to finish the given task. Args: task_description: The description of the task to be done by the worker, should contain all the necessary information. Returns: ToolResponse containing the worker's result. """ # Create a toolkit for the worker toolkit = Toolkit() toolkit.register_tool_function(execute_python_code) # Create a worker agent with specific capabilities worker = ReActAgent( name="Worker", sys_prompt="You're a worker agent. Finish the given task using your tools.", model=DashScopeChatModel(...), formatter=DashScopeChatFormatter(), toolkit=toolkit, ) # Execute the task res = await worker(Msg("user", task_description, "user")) return ToolResponse(content=res.get_content_blocks("text"))# The master agent uses create_worker as a tooltoolkit = Toolkit()toolkit.register_tool_function(create_worker)master = ReActAgent( name="Master", sys_prompt="Decompose tasks and create workers to finish them.", toolkit=toolkit, ...)await master(Msg("user", "Execute hello world in Python", "user"))
Key benefits:
Dynamic worker creation based on task requirements
Alice: Hello, I'm Alice, a 50-year-old teacher with a passion for education.Bob: Hello, I'm Bob, a 35-year-old engineer who enjoys solving complex problems.Charlie: Hi, I'm Charlie, a 28-year-old designer with a keen eye for aesthetics.
Dynamic participant management:
async with MsgHub(participants=[alice]) as hub: hub.add(bob) # Add new participant hub.delete(alice) # Remove participant await hub.broadcast( # Manually broadcast a message Msg("system", "Topic changed!", "system"), )
from agentscope.pipeline import MsgHubasync with MsgHub( participants=[alice, bob, charlie], announcement=Msg("system", "Let's begin.", "system"), enable_auto_broadcast=True, name="meeting-room",) as hub: await alice() # Bob and Charlie auto-receive Alice's reply await bob() # Alice and Charlie auto-receive Bob's reply
Parameter
Default
Description
participants
—
List of agents to include
announcement
None
Message(s) to broadcast on enter
enable_auto_broadcast
True
Auto-broadcast replies to all participants
name
random UUID
Name identifier for this hub
Methods:
Method
Description
hub.add(agent)
Add one or more agents as participants
hub.delete(agent)
Remove one or more agents
hub.broadcast(msg)
Manually broadcast a message to all participants
hub.set_auto_broadcast(bool)
Enable/disable auto-broadcast
How it works:When entering the context, MsgHub registers each participant as a subscriber of all other participants. When any participant generates a reply via __call__, the reply message is automatically sent to all other participants via their observe() method. On exit, all subscriptions are cleaned up.
Newly added participants (via hub.add()) will not receive previous messages — only future ones.
When enable_auto_broadcast=False, MsgHub only broadcasts via the announcement parameter and the broadcast() method. This is useful when you want fine-grained control over message routing.
Convert an agent’s internal print messages into an async generator for streaming to a web UI or other consumers.
from agentscope.pipeline import stream_printing_messagesagent.set_console_output_enabled(False) # Avoid duplicate outputasync for msg, last in stream_printing_messages( agents=[agent], coroutine_task=agent(Msg("user", "Hello!", "user")),): print(msg.get_text_content(), end="\r") if last: print() # Final message
Parameter
Description
agents
List of agents whose print messages to capture
coroutine_task
The coroutine to execute while capturing messages
queue
Optional custom asyncio.Queue (uses agents’ shared queue by default)
end_signal
Signal string for end of stream (default: "[END]")
yield_speech
If True, yields (msg, last, speech) tuples including audio data
How messages are identified: Messages with the same msg.id are considered the same message being updated (streaming). The content is accumulative (not delta), so each yield contains the latest full content.
from agentscope.pipeline import ChatRoomchat_room = ChatRoom(agents=[agent1, agent2])await chat_room.start(outgoing_queue) # Connect all agentsawait chat_room.stop() # Disconnect all agents
Parameter / Method
Description
ChatRoom(agents)
Initialize with a list of RealtimeAgent instances
await start(outgoing_queue)
Connect all agents, start the internal forwarding loop
await stop()
Disconnect all agents, cancel the forwarding loop
await handle_input(event)
Forward a ClientEvent from the frontend to all agents
Internal forwarding loop: The ChatRoom maintains a central asyncio.Queue. When a ServerEvent is received from any agent, it is forwarded to the outgoing_queue (for the frontend) and broadcast to all other agents (excluding the sender, identified by agent_id). When a ClientEvent is received, it is distributed to all agents via handle_input().
Example of how DashScopeMultiAgentFormatter transforms messages:
# Input messages[ Msg("system", "You're Bob.", "system"), Msg("Alice", "Hi!", "user"), Msg("Bob", "Nice to meet you.", "assistant"), Msg("Charlie", "Me too!", "assistant"),]# Formatted output for LLM[ {"role": "system", "content": "You're Bob."}, {"role": "user", "content": "# Conversation History\nThe content between <history></history> tags contains your conversation history\n<history>\nAlice: Hi!\nBob: Nice to meet you.\nCharlie: Me too!\n</history>"},]
The system message is preserved as-is. All other messages are combined into a <history> section within a single user message, with each speaker’s name prefixed to their text.
Use ChatFormatter (e.g., DashScopeChatFormatter) for user-agent conversations — it uses the role field to distinguish user and assistant. Use MultiAgentFormatter when more than two agents are involved.
When more than two agents are involved, use MultiAgentFormatter (e.g., DashScopeMultiAgentFormatter) and MsgHub:
from agentscope.formatter import DashScopeMultiAgentFormatter# MultiAgentFormatter wraps all messages into a single user message# with name prefixes, so the LLM can distinguish different speakersformatter = DashScopeMultiAgentFormatter()
The formatter converts multi-party history like this:
Alice: Hi!Bob: Nice to meet you guys.Charlie: Me too!
into a single user message with XML-tagged history, suitable for LLM APIs that only support user/assistant roles.Combined with MsgHub, a multi-agent discussion is simply:
Multiple agents discuss a topic in rounds, with a moderator deciding when consensus is reached:
from pydantic import BaseModel, Fieldclass JudgeModel(BaseModel): finished: bool = Field(description="Whether the debate is finished.") correct_answer: str | None = Field(default=None)async def run_debate(): while True: # Debaters discuss within MsgHub async with MsgHub(participants=[alice, bob, moderator]): await alice(Msg("user", "Present your viewpoint.", "user")) await bob(Msg("user", "Present your counter-argument.", "user")) # Moderator judges outside MsgHub (debaters don't need to see the verdict) msg_judge = await moderator( Msg("user", "Can you determine the correct answer?", "user"), structured_model=JudgeModel, ) if msg_judge.metadata.get("finished"): print("Answer:", msg_judge.metadata.get("correct_answer")) breakasyncio.run(run_debate())
For realtime voice agent scenarios, ChatRoom orchestrates multiple RealtimeAgent instances sharing a session:
from agentscope.pipeline import ChatRoomchat_room = ChatRoom(agents=[agent1, agent2])await chat_room.start(outgoing_queue) # Connect all agents, start forwarding loop# ... handle events ...await chat_room.stop() # Disconnect all agents, cancel forwarding loop
Unlike MsgHub (which works with text-based agents), ChatRoom handles ServerEvents and ClientEvents in the realtime voice pipeline. When one agent generates a response, ChatRoom forwards it both to the frontend and to other agents (excluding the sender).
An orchestrator decomposes tasks and dynamically creates worker agents:
from agentscope.tool import execute_python_codeasync def create_worker(task_description: str) -> ToolResponse: """Create a worker to finish the given task.""" toolkit = Toolkit() toolkit.register_tool_function(execute_python_code) worker = ReActAgent( name="Worker", sys_prompt="You're a worker agent. Finish the given task.", model=DashScopeChatModel(...), formatter=DashScopeChatFormatter(), toolkit=toolkit, ) res = await worker(Msg("user", task_description, "user")) return ToolResponse(content=res.get_content_blocks("text"))# The orchestrator uses create_worker as a tooltoolkit = Toolkit()toolkit.register_tool_function(create_worker)orchestrator = ReActAgent( name="Orchestrator", sys_prompt="Decompose the task and create workers to finish them.", toolkit=toolkit, ...)await orchestrator(Msg("user", "Execute hello world in Python", "user"))