Why Single Agents Have Limits
A single AI agent is remarkably capable. It can reason, use tools, write code, call APIs, and produce structured output. But it has hard limits that no amount of prompting can overcome.
Context window — every model has a maximum context length. On long tasks — processing a 500-page document, managing a software project with dozens of files, running a research pipeline — you will eventually hit that ceiling. You cannot fit everything into one prompt.
Single thread of execution — a single agent processes one thing at a time. If a task has ten independent subtasks, you are waiting for all ten to complete serially. Parallelism is not possible with one agent.
Specialisation vs generalisation — a general-purpose system prompt produces a general-purpose agent. An agent specialised in security review produces better security reviews than a general agent. If your task needs multiple specialisations, a single agent cannot be all of them simultaneously.
Reliability — if a single agent makes a mistake, there is no second opinion. Multi-agent systems can implement checks where one agent reviews another's work, dramatically reducing error rates on high-stakes tasks.
Multi-agent systems solve these problems — but they introduce their own complexity. The rest of this guide is about building systems that are worth the added complexity.
Pattern 1: Orchestrator / Worker
This is the most fundamental multi-agent pattern and the right starting point for most production systems.
┌─────────────────────┐
│ ORCHESTRATOR │
│ (Planner / Delegator)│
└──────────┬──────────┘
│ decomposes task
┌────────────────┼────────────────┐
▼ ▼ ▼
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ Worker A │ │ Worker B │ │ Worker C │
│ (Researcher)│ │ (Coder) │ │ (Reviewer) │
└──────┬───────┘ └──────┬───────┘ └──────┬───────┘
│ │ │
└────────────────┼────────────────┘
▼
┌─────────────────────┐
│ ORCHESTRATOR │
│ (Aggregates results)│
└─────────────────────┘
The orchestrator receives the top-level task and is responsible for two things: decomposing the task into subtasks, and aggregating the results when subtasks complete. It does not do the actual work — workers do.
Workers are narrowly focused. Each has a tight system prompt, specific tools, and a single responsibility. This specialisation is what makes them better than a generalist agent at their particular job.
When to use it
Use the orchestrator/worker pattern when your task has clearly separable subtasks where the decomposition logic is known in advance. "Research this topic, draft a report, and check it for factual errors" decomposes cleanly into three subtasks.
Code structure
import anthropic
client = anthropic.Anthropic()
def orchestrate(task: str) -> str:
"""Orchestrator: decompose task and aggregate results."""
# Step 1: Ask the orchestrator to plan
plan_response = client.messages.create(
model="claude-sonnet-4-5",
max_tokens=1024,
system="""You are a task orchestrator. Given a task, output a JSON array
of subtasks. Each subtask has: 'id', 'worker_type', 'instruction'.
Worker types: researcher, writer, reviewer.""",
messages=[{"role": "user", "content": f"Plan this task: {task}"}]
)
import json
subtasks = json.loads(plan_response.content[0].text)
# Step 2: Execute each subtask via worker agents
results = {}
for subtask in subtasks:
results[subtask["id"]] = run_worker(
worker_type=subtask["worker_type"],
instruction=subtask["instruction"],
context=results # pass prior results as context
)
# Step 3: Synthesise
synthesis = client.messages.create(
model="claude-sonnet-4-5",
max_tokens=2048,
system="You synthesise multiple agent outputs into a final result.",
messages=[{"role": "user", "content": f"Task: {task}\n\nResults: {json.dumps(results)}"}]
)
return synthesis.content[0].text
def run_worker(worker_type: str, instruction: str, context: dict) -> str:
"""Generic worker runner — system prompt varies by worker type."""
system_prompts = {
"researcher": "You are a research specialist. Find facts, cite sources, be precise.",
"writer": "You are a technical writer. Write clearly and concisely.",
"reviewer": "You are a critical reviewer. Find errors, omissions, and improvements.",
}
response = client.messages.create(
model="claude-haiku-4-5-20251001",
max_tokens=1024,
system=system_prompts.get(worker_type, "You are a helpful assistant."),
messages=[{
"role": "user",
"content": f"Context from previous steps:\n{context}\n\nYour task: {instruction}"
}]
)
return response.content[0].text
Pattern 2: Pipeline / Sequential
In the pipeline pattern, each agent hands off its output to the next agent as input. No agent sees what comes after it — it just does its job and passes the baton.
┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐
│ Agent 1 │────▶│ Agent 2 │────▶│ Agent 3 │────▶│ Agent 4 │
│ Research │ │ Draft │ │ Review │ │ Publish │
└──────────┘ └──────────┘ └──────────┘ └──────────┘
Think of it as Unix pipes, but with LLMs. cat article.txt | summarise | translate | format | publish.
When to use it
Pipelines work best for transformation chains where each step has a clear input and output format. Content production workflows, data enrichment, and multi-stage validation all fit this pattern.
The key constraint: each agent in the pipeline must produce output that is well-defined enough for the next agent to consume reliably. If your agents are producing free-form text that the next stage needs to parse, add structured output (JSON schema) to the intermediate agents.
Code structure
from typing import Callable
def run_pipeline(initial_input: str, stages: list[dict]) -> str:
"""
stages: list of {"name": str, "system": str, "model": str}
"""
current = initial_input
for stage in stages:
print(f"Running stage: {stage['name']}")
response = client.messages.create(
model=stage.get("model", "claude-haiku-4-5-20251001"),
max_tokens=stage.get("max_tokens", 1024),
system=stage["system"],
messages=[{"role": "user", "content": current}]
)
current = response.content[0].text
return current
# Example: content production pipeline
result = run_pipeline(
initial_input="Write about the impact of AI agents on software development in 2026.",
stages=[
{
"name": "Research",
"system": "Expand this topic into a detailed outline with 5 sections and key points for each.",
"model": "claude-sonnet-4-5",
},
{
"name": "Draft",
"system": "Write a 800-word article from this outline. Use concrete examples.",
"model": "claude-sonnet-4-5",
},
{
"name": "Edit",
"system": "Edit this article for clarity, cut unnecessary words, fix any factual issues.",
"model": "claude-haiku-4-5-20251001",
},
]
)
Pattern 3: Parallel Fan-Out / Fan-In
Fan-out sends the same task to multiple agents in parallel. Fan-in aggregates their outputs — by voting, merging, or selecting the best.
┌─────────────────────┐
│ ORCHESTRATOR │
└──────────┬──────────┘
│ same task, N copies
┌────────────────────┼────────────────────┐
▼ ▼ ▼
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ Agent 1 │ │ Agent 2 │ │ Agent 3 │
│ (same task) │ │ (same task) │ │ (same task) │
└──────┬───────┘ └──────┬───────┘ └──────┬───────┘
│ │ │
└───────────────────┼───────────────────┘
▼
┌─────────────────────┐
│ AGGREGATOR │
│ (vote / merge / best)│
└─────────────────────┘
When to use it
Fan-out shines when independent repetition improves quality. Use it for:
- Best-of-N generation — generate N options and pick the best one.
- Ensemble reasoning — vote across N agents to reduce hallucination rates.
- Parallel research — give each agent a different angle on the same question and merge findings.
The tradeoff is direct: N agents means N times the token cost. Run the math before you scale.
Code structure
import asyncio
import anthropic
async_client = anthropic.AsyncAnthropic()
async def fan_out(task: str, n_workers: int = 3, temperature_range=(0.3, 0.9)) -> list[str]:
"""Run the same task across N agents in parallel."""
import numpy as np
temperatures = np.linspace(*temperature_range, n_workers).tolist()
tasks = [
async_client.messages.create(
model="claude-haiku-4-5-20251001",
max_tokens=512,
messages=[{"role": "user", "content": task}],
)
for _ in range(n_workers)
]
responses = await asyncio.gather(*tasks)
return [r.content[0].text for r in responses]
async def fan_in_best(task: str, candidates: list[str]) -> str:
"""Use a judge agent to pick the best output."""
formatted = "\n\n".join(
f"Option {i+1}:\n{text}" for i, text in enumerate(candidates)
)
response = await async_client.messages.create(
model="claude-sonnet-4-5",
max_tokens=1024,
system="You are a quality judge. Evaluate options and return the best one verbatim. Explain your choice in one sentence first.",
messages=[{"role": "user", "content": f"Task: {task}\n\nCandidates:\n{formatted}"}]
)
return response.content[0].text
async def best_of_n(task: str, n: int = 3) -> str:
candidates = await fan_out(task, n_workers=n)
return await fan_in_best(task, candidates)
Pattern 4: Peer-to-Peer / Debate
In the debate pattern, agents critique each other's work. Agent A produces output, Agent B critiques it, Agent A revises, and so on. This adversarial loop catches errors that a single agent would miss.
┌──────────────┐ ┌──────────────┐
│ Agent A │────────▶│ Agent B │
│ (Proposer) │◀────────│ (Critic) │
└──────────────┘ └──────────────┘
│ N rounds of debate
▼
┌──────────────┐
│ Final │
│ Output │
└──────────────┘
When to use it
Debate works well for tasks where correctness is critical and a wrong answer is worse than a slower answer. Code security review, contract analysis, medical triage, and financial modelling are good candidates.
The number of rounds matters. One round of critique-and-revision is usually enough for quality improvement. More rounds yield diminishing returns and increasing cost.
def run_debate(task: str, n_rounds: int = 2) -> str:
"""Proposer produces, critic reviews, proposer revises."""
# Initial proposal
proposal = client.messages.create(
model="claude-sonnet-4-5",
max_tokens=1024,
system="You are an expert analyst. Provide a thorough, well-reasoned answer.",
messages=[{"role": "user", "content": task}]
).content[0].text
for round_num in range(n_rounds):
# Critic reviews the proposal
critique = client.messages.create(
model="claude-sonnet-4-5",
max_tokens=512,
system="""You are a rigorous critic. Find flaws, gaps, and errors in the
proposal. Be specific. Do not be polite about mistakes.""",
messages=[{
"role": "user",
"content": f"Original task: {task}\n\nProposal to critique:\n{proposal}"
}]
).content[0].text
# Proposer revises
proposal = client.messages.create(
model="claude-sonnet-4-5",
max_tokens=1024,
system="You are an expert analyst. Revise your answer based on the critique. Address every point raised.",
messages=[{
"role": "user",
"content": f"Task: {task}\n\nYour previous answer:\n{proposal}\n\nCritique:\n{critique}\n\nRevised answer:"
}]
).content[0].text
return proposal
Pattern 5: Specialist Routing
A router agent classifies the incoming task and dispatches it to the right specialist. The specialists never see tasks outside their domain.
┌─────────────────────┐
incoming task──▶│ ROUTER AGENT │
│ (Classify & Dispatch)│
└──────────┬──────────┘
│
┌────────────────────┼────────────────────┐
▼ ▼ ▼
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ Code │ │ Billing │ │ General │
│ Expert │ │ Specialist │ │ Support │
└──────────────┘ └──────────────┘ └──────────────┘
When to use it
Routing shines when you have a general-purpose entry point that needs to handle diverse query types, but different types require radically different handling. Think customer support bots, developer assistants, or enterprise search.
The router itself should be a small, fast model — it is just doing classification. Specialists can use larger, more expensive models when the task demands it.
from typing import Literal
import json
SpecialistType = Literal["code", "data_analysis", "writing", "research", "general"]
SPECIALIST_CONFIGS: dict[SpecialistType, dict] = {
"code": {
"system": "You are an expert software engineer. Focus on correctness, efficiency, and best practices.",
"model": "claude-sonnet-4-5",
},
"data_analysis": {
"system": "You are a data analyst. Work with numbers, statistics, and structured data.",
"model": "claude-sonnet-4-5",
},
"writing": {
"system": "You are a professional writer and editor. Focus on clarity, tone, and persuasion.",
"model": "claude-haiku-4-5-20251001",
},
"research": {
"system": "You are a research assistant. Provide accurate, cited, well-organised information.",
"model": "claude-sonnet-4-5",
},
"general": {
"system": "You are a helpful assistant.",
"model": "claude-haiku-4-5-20251001",
},
}
def route_and_respond(user_query: str) -> str:
# Step 1: Classify with a small, fast model
classification = client.messages.create(
model="claude-haiku-4-5-20251001",
max_tokens=64,
system="""Classify the user query into exactly one category:
code, data_analysis, writing, research, or general.
Respond with JSON: {"category": "<category>"}""",
messages=[{"role": "user", "content": user_query}]
).content[0].text
category: SpecialistType = json.loads(classification)["category"]
config = SPECIALIST_CONFIGS.get(category, SPECIALIST_CONFIGS["general"])
# Step 2: Route to specialist
response = client.messages.create(
model=config["model"],
max_tokens=1024,
system=config["system"],
messages=[{"role": "user", "content": user_query}]
)
return response.content[0].text
Communication Patterns Between Agents
How agents share information is as important as how you orchestrate them.
| Pattern | Mechanism | Pros | Cons | Use when |
|---|---|---|---|---|
| Shared file/DB | Database or filesystem | Durable, auditable | Requires locking, slower | Long-running pipelines |
| Message queue | Redis, RabbitMQ, SQS | Decoupled, scalable | Infrastructure overhead | High-throughput systems |
| Direct call | Function return value | Simple, fast | Tightly coupled | Synchronous orchestration |
| In-memory dict | Python dict / JS object | Zero overhead | Lost on crash | Short, single-process tasks |
For most development and moderate-production use cases, direct function calls with in-memory state is the right choice. Graduate to a message queue when you need:
- Horizontal scaling (multiple orchestrator processes)
- Durability across restarts
- Backpressure and flow control
State Management
Every multi-agent system has state. The question is where it lives and who can access it.
Per-agent state is the conversation history each agent maintains. Keep it minimal — only the messages relevant to that agent's subtask.
Shared state is the work product that agents pass between each other: the research findings, the draft document, the error list. Design this as an explicit data structure, not a blob of text.
Global state tracks which tasks are complete, which are pending, and which failed. The orchestrator owns this.
A clean pattern is the "blackboard" — a shared dictionary that every agent can read from and write to, with the orchestrator controlling write access:
from dataclasses import dataclass, field
from typing import Any
@dataclass
class Blackboard:
task: str = ""
artifacts: dict[str, Any] = field(default_factory=dict)
status: dict[str, str] = field(default_factory=dict) # agent_id -> "pending"|"done"|"failed"
errors: dict[str, str] = field(default_factory=dict)
def write(self, agent_id: str, key: str, value: Any):
self.artifacts[key] = value
self.status[agent_id] = "done"
def fail(self, agent_id: str, error: str):
self.errors[agent_id] = error
self.status[agent_id] = "failed"
def is_complete(self) -> bool:
return all(s in ("done", "failed") for s in self.status.values())
Error Handling and Resilience
In a system with five agents, each with a 5% failure rate, you have roughly a 23% chance of at least one failure per run. Design for failure from day one.
Retry with backoff
import time
from functools import wraps
def with_retry(max_attempts: int = 3, base_delay: float = 1.0):
def decorator(fn):
@wraps(fn)
def wrapper(*args, **kwargs):
delay = base_delay
for attempt in range(max_attempts):
try:
return fn(*args, **kwargs)
except Exception as e:
if attempt == max_attempts - 1:
raise
print(f"Attempt {attempt+1} failed: {e}. Retrying in {delay}s...")
time.sleep(delay)
delay *= 2
return wrapper
return decorator
Graceful degradation
When an optional agent fails, continue with partial results rather than aborting:
def run_with_fallback(agent_fn, task, fallback="[Agent output unavailable]"):
try:
return agent_fn(task)
except Exception as e:
print(f"Agent failed: {e}. Using fallback.")
return fallback
Circuit breakers
If an agent fails repeatedly, stop calling it and use the fallback immediately. This prevents cascading failures where one slow or broken agent holds up the entire system.
Cost Management
Multi-agent systems can surprise you with costs. The key levers are:
Model selection per agent — use the cheapest model that produces acceptable quality for each role. Routers and simple classifiers work well on Haiku. Complex reasoning tasks need Sonnet or above.
Context pruning — do not pass the entire conversation history to every agent. Each agent should receive only the context it needs to do its specific job.
Budget limits — set a maximum token budget for the entire pipeline and abort if you are on track to exceed it:
class BudgetTracker:
def __init__(self, max_tokens: int):
self.max_tokens = max_tokens
self.used = 0
def record(self, input_tokens: int, output_tokens: int):
self.used += input_tokens + output_tokens
if self.used > self.max_tokens:
raise RuntimeError(f"Budget exceeded: {self.used}/{self.max_tokens} tokens used")
def remaining(self) -> int:
return max(0, self.max_tokens - self.used)
Parallel efficiency — fan-out costs N times as much as single-agent but takes the same wall-clock time. If speed is the goal, the cost premium may be worth it. If cost is the goal, run sequentially.
Frameworks for Multi-Agent Orchestration
You do not always need to build orchestration from scratch. Choose a framework when the patterns it implements match what you need.
LangGraph
LangGraph models your agent workflow as a directed graph where nodes are agents and edges are transitions. It supports cycles (agent A can call agent B, which calls agent A again), conditional branching, and persistent state between runs.
Best for: complex workflows with conditional logic, feedback loops, and long-running jobs that need checkpointing.
CrewAI
CrewAI uses a role-based metaphor: you define a "crew" of agents with names, goals, and backstories. Agents collaborate on tasks and produce structured outputs. It handles orchestration automatically.
Best for: simulating team dynamics, role-based content production, and workflows where the "human team" metaphor helps you design the system.
AutoGen
AutoGen centres on conversation: agents send messages to each other in a group chat. It is particularly good for coding tasks where a user proxy, a coder, and a critic talk to each other until the code works.
Best for: iterative coding and debugging, multi-model conversations, human-in-the-loop workflows.
Claude Code subagents
When working inside Claude Code, you can spawn subagents using the built-in Agent tool. Subagents are lightweight — they run in the same environment, share the filesystem, and communicate through files and function calls. No extra library needed.
Best for: code-centric tasks within a single development session, one-off research and implementation tasks.
Production Example: Content Research and Writing System
Here is a real architecture for a system that takes a topic, researches it, writes an article, and produces a final edited draft.
┌───────────────────────────────────────────────────────────────────────┐
│ ORCHESTRATOR │
│ Receives: "Write an article about X" │
│ Produces: Final edited article │
└────────────────────────────┬──────────────────────────────────────────┘
│
┌───────────────▼──────────────┐
│ ROUTER │
│ Is this a short or long │
│ article? Tech or general? │
└──┬──────────────────────┬───┘
│ │
▼ ▼
┌────────────────────┐ ┌────────────────────────┐
│ RESEARCH AGENTS │ │ OUTLINE AGENT │
│ (Fan-out: 3x) │ │ (Sequential: 1x) │
│ - Angle 1 │ │ Produces section plan │
│ - Angle 2 │ └────────────┬────────────┘
│ - Angle 3 │ │
└──────────┬─────────┘ │
│ │
└──────────┬──────────────┘
▼
┌───────────────────────┐
│ MERGE AGENT │
│ Combines research + │
│ outline into brief │
└──────────┬────────────┘
▼
┌───────────────────────┐
│ WRITER AGENT │
│ Produces first draft │
└──────────┬────────────┘
│
┌─────────┴─────────┐
▼ ▼
┌──────────────────┐ ┌──────────────────┐
│ FACT CHECKER │ │ EDITOR AGENT │
│ Agent │ │ Agent │
└────────┬─────────┘ └────────┬─────────┘
│ │
└──────────┬──────────┘
▼
┌───────────────────────┐
│ FINAL ASSEMBLY │
│ Apply edits + fact │
│ corrections │
└───────────────────────┘
The key design decisions:
- Research is fanned out to 3 agents in parallel — different angles on the same topic, merged by the aggregator.
- Writer runs sequentially after research is complete — it needs the research context.
- Fact checker and editor run in parallel after drafting — they are independent of each other.
- Final assembly applies both outputs together.
This structure takes the same wall-clock time as a 4-agent sequential pipeline while doing the work of a 6-agent system.
Observability: Debugging Multi-Agent Systems
A multi-agent system where you cannot see what each agent did is a debugging nightmare. Build observability in from the start.
Structured logging — log every agent call as a JSON event with: trace ID, agent name, input tokens, output tokens, latency, and a truncated preview of input and output.
Trace IDs — generate a unique ID at the start of each top-level task and pass it through every agent call. When something goes wrong, you can filter all logs for that trace ID and see the full execution history.
LangSmith — if you are using LangChain or LangGraph, LangSmith gives you a browser UI that shows every agent step, its inputs and outputs, and token costs per step. Worth the setup time.
Replay capability — log enough information that you can replay any agent's step with the same inputs. This lets you debug without re-running the entire expensive pipeline.
When NOT to Use Multiple Agents
Multi-agent systems are not always the answer. Avoid them when:
- The task fits in one context window. If everything fits, a single well-prompted agent is simpler, cheaper, and easier to debug.
- Latency matters more than quality. Each agent hop adds latency. For interactive applications where response time is critical, fewer agents means faster responses.
- Your workflow is simple and linear. One pipeline with two steps does not need an orchestration framework. Two function calls in sequence is fine.
- You are still figuring out the task. Build a single-agent version first. You will learn what the task actually requires before committing to a complex architecture.
The most expensive multi-agent system is one that does not need to be multi-agent.
Summary
| Pattern | Structure | Best for |
|---|---|---|
| Orchestrator/Worker | 1 planner, N doers | Tasks with clear decomposable subtasks |
| Pipeline | A → B → C | Sequential transformation chains |
| Fan-Out/Fan-In | 1 → N → 1 | Best-of-N, ensemble reasoning |
| Debate | A ↔ B, N rounds | High-stakes correctness |
| Specialist Routing | Router → 1 of N | Diverse query types with specialist handling |
Start with the simplest pattern that addresses your problem. Add complexity only when you have evidence that simpler systems cannot meet your requirements.