Skip to main content

Command Palette

Search for a command to run...

Your Multi-Agent System Works Until Two Agents Touch the Same File

6 coordination patterns that prevent agents from overwriting each other, losing context, and retrying forever

Published
11 min read
K
Curious to learn every day, I like Agentic automations, curious about space real estate.

Your multi-agent system works perfectly in demos. One agent writes, another reviews, a third publishes. Clean handoffs. Beautiful logs.

Then you deploy it. Two agents modify the same state simultaneously. A handoff drops context because the receiving agent never got the full payload. A failed retry enters an infinite loop between Agent A and Agent B, each passing the task back and forth until you burn through your API budget.

37% of multi-agent failures are coordination issues -- not model quality, not prompt engineering, not tool failures. The agents are individually competent. They just cannot work together.

This article covers the 6 coordination patterns that fix this. Not theory. Not academic multi-agent research. Production patterns with Python code you can deploy today.

Pattern 1: Agent Handoff Protocols

The most common multi-agent failure: Agent A finishes its work and "hands off" to Agent B. But Agent B receives a partial state, missing the reasoning behind Agent A's decisions, the constraints it encountered, or the intermediate artifacts it produced.

A handoff is not "call the next function." A handoff is a structured transfer of ownership that includes the work product, the context that produced it, and the constraints that apply going forward.

Here is a minimal handoff protocol:

from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Any

@dataclass
class HandoffPayload:
    """Structured context transfer between agents."""
    source_agent: str
    target_agent: str
    task_id: str
    work_product: dict[str, Any]
    context: dict[str, Any]
    constraints: list[str] = field(default_factory=list)
    timestamp: str = field(
        default_factory=lambda: datetime.now(timezone.utc).isoformat()
    )
    attempt: int = 1
    max_attempts: int = 3

    def validate(self) -> bool:
        """Verify the handoff payload is complete before transfer."""
        required_keys = {"result", "status"}
        return (
            bool(self.task_id)
            and required_keys.issubset(self.work_product.keys())
            and self.attempt <= self.max_attempts
        )

The key insight: every handoff carries its attempt counter and max. This prevents the infinite retry loop where two agents pass a failing task back and forth. If attempt > max_attempts, the handoff is rejected and the failure escalates to a supervisor or dead-letter queue.

In LangGraph, the Command object achieves something similar by combining state updates with routing in a single return value:

from langgraph.types import Command
from typing import Literal

def writer_agent(state: dict) -> Command[Literal["reviewer"]]:
    draft = generate_draft(state["topic"])
    return Command(
        update={
            "draft": draft,
            "writer_notes": "Used technical angle, 1800 words",
            "attempt": state.get("attempt", 0) + 1,
        },
        goto="reviewer",
    )

The Command type from langgraph.types lets a node simultaneously update the graph state and specify which node executes next. This eliminates the split between "update state" and "route to next agent" that causes lost-context bugs in handoff systems.

Pattern 2: State Isolation vs. Shared State

Every multi-agent system faces the same design decision: should agents share state or maintain their own?

Shared state means all agents read and write to one central data structure. It is simple but dangerous. Two agents writing to the same key at the same time produces a race condition. One agent's update overwrites the other's.

Isolated state means each agent maintains its own working memory. Agents communicate only through explicit handoff payloads. This is safer but slower -- agents cannot see each other's intermediate work.

The practical answer is neither extreme. Use a hybrid: shared read, isolated write.

from typing import Annotated, TypedDict
import operator

class CoordinationState(TypedDict):
    # Shared read: all agents see the task and global config
    task: str
    config: dict

    # Isolated write with merge: each agent appends, never overwrites
    messages: Annotated[list[str], operator.add]
    artifacts: Annotated[list[dict], operator.add]

    # Owned keys: only one agent writes to each
    draft: str          # writer owns this
    review: str         # reviewer owns this
    final: str          # publisher owns this

The Annotated[list, operator.add] pattern is a LangGraph convention where the reducer function (operator.add) defines how concurrent updates merge. Instead of overwriting, new values are appended. This eliminates the "last write wins" race condition for list-typed state.

The ownership convention is equally important: draft is written only by the writer agent. review is written only by the reviewer. If you enforce this in code -- by having each agent's return type only include its owned keys -- you get compile-time guarantees against cross-agent state corruption.

When to share state:

  • Read-only configuration (model names, temperature, system prompts)
  • Append-only logs (messages, audit trails)
  • Status flags that follow a monotonic progression (pending -> in_progress -> done)

When to isolate state:

  • Work products (drafts, code, analysis results)
  • Agent-specific reasoning traces
  • Intermediate computations that other agents should not see until complete

Pattern 3: Ownership Boundaries

In a well-coordinated system, every piece of state has exactly one owner at any point in time. This is the single-writer principle borrowed from database design.

from dataclasses import dataclass
from enum import Enum

class AgentRole(Enum):
    WRITER = "writer"
    REVIEWER = "reviewer"
    PUBLISHER = "publisher"

@dataclass
class TaskOwnership:
    task_id: str
    current_owner: AgentRole
    locked: bool = False

    def transfer(self, new_owner: AgentRole) -> bool:
        """Transfer ownership. Fails if task is locked."""
        if self.locked:
            return False
        self.current_owner = new_owner
        return True

    def lock(self, requesting_agent: AgentRole) -> bool:
        """Lock task to prevent concurrent modification."""
        if self.current_owner != requesting_agent:
            return False
        self.locked = True
        return True

    def unlock(self, requesting_agent: AgentRole) -> bool:
        """Release lock after work is complete."""
        if self.current_owner != requesting_agent:
            return False
        self.locked = False
        return True

The ownership boundary pattern prevents the most common multi-agent bug: two agents modifying the same resource simultaneously. When the writer is drafting, it locks the task. The reviewer cannot start reviewing until the writer explicitly unlocks and transfers ownership.

This is not theoretical. In production, without ownership boundaries, you get scenarios where a reviewer starts reviewing a draft while the writer is still editing it. The reviewer's feedback references text that no longer exists. The writer overwrites the reviewer's inline edits. Both agents waste compute.

The rule is simple: one agent, one task, one lock. Transfer happens only at well-defined handoff points.

Pattern 4: Fan-Out / Fan-In

Some tasks decompose naturally into independent subtasks. Research a topic from 5 angles. Generate code for 4 modules. Review 3 documents in parallel. This is the fan-out/fan-in pattern.

Fan-out splits work across N agents. Fan-in collects their results and combines them.

Here is a raw Python implementation using asyncio.gather:

import asyncio
from dataclasses import dataclass

@dataclass
class SubtaskResult:
    agent_id: str
    section: str
    content: str
    success: bool

async def research_section(agent_id: str, section: str) -> SubtaskResult:
    """Simulate one agent researching one section."""
    # Each agent works independently on its assigned section
    content = await call_llm(f"Research: {section}")
    return SubtaskResult(
        agent_id=agent_id,
        section=section,
        content=content,
        success=True,
    )

async def fan_out_research(sections: list[str]) -> list[SubtaskResult]:
    """Fan-out: dispatch each section to a parallel agent."""
    tasks = [
        research_section(f"agent_{i}", section)
        for i, section in enumerate(sections)
    ]
    results = await asyncio.gather(*tasks, return_exceptions=True)

    # Fan-in: collect and validate results
    validated = []
    for result in results:
        if isinstance(result, Exception):
            validated.append(SubtaskResult(
                agent_id="unknown",
                section="failed",
                content=str(result),
                success=False,
            ))
        else:
            validated.append(result)

    return validated

The return_exceptions=True parameter in asyncio.gather is critical. Without it, one failing subtask kills the entire fan-out. With it, exceptions are returned as values alongside successful results, letting you handle partial failures gracefully.

In LangGraph, the Send API provides a declarative fan-out mechanism:

from langgraph.graph import StateGraph, START, END
from langgraph.types import Send
from typing import Annotated, TypedDict
import operator

class FanOutState(TypedDict):
    topic: str
    sections: list[str]
    results: Annotated[list[str], operator.add]

def plan_sections(state: FanOutState) -> dict:
    """Generate sections to research in parallel."""
    return {"sections": ["architecture", "performance", "security"]}

def research_one(state: dict) -> dict:
    """Each instance researches one section independently."""
    content = call_llm(f"Research {state['section']} in depth")
    return {"results": [content]}

def dispatch_to_researchers(state: FanOutState) -> list[Send]:
    """Fan-out: create one Send per section."""
    return [
        Send("research_one", {"section": s})
        for s in state["sections"]
    ]

def synthesize(state: FanOutState) -> dict:
    """Fan-in: combine all research results."""
    combined = "\n\n".join(state["results"])
    return {"final_report": combined}

graph = StateGraph(FanOutState)
graph.add_node("plan_sections", plan_sections)
graph.add_node("research_one", research_one)
graph.add_node("synthesize", synthesize)

graph.add_edge(START, "plan_sections")
graph.add_conditional_edges("plan_sections", dispatch_to_researchers, ["research_one"])
graph.add_edge("research_one", "synthesize")
graph.add_edge("synthesize", END)

app = graph.compile()

Each Send("research_one", {"section": s}) creates an independent instance of the research_one node with its own state. All instances execute concurrently. When all complete, their results merge into the parent state via the operator.add reducer, and synthesize runs once with the combined results.

The LangGraph Send object from langgraph.types handles scenarios where the number of parallel branches is not known at graph definition time. The conditional edge function returns a list of Send objects, and LangGraph spawns one node execution per Send.

Pattern 5: Conflict Detection

When two agents can potentially modify the same resource -- even with ownership boundaries in place -- you need conflict detection as a safety net.

The simplest effective approach: version stamping. Every state mutation carries a version number. Before writing, an agent checks that the current version matches what it last read. If it does not match, someone else modified the state since the agent's last read.

from dataclasses import dataclass, field
from threading import Lock

@dataclass
class VersionedState:
    """State with optimistic concurrency control."""
    data: dict = field(default_factory=dict)
    version: int = 0
    _lock: Lock = field(default_factory=Lock, repr=False)

    def read(self) -> tuple[dict, int]:
        """Read state and its current version."""
        with self._lock:
            return dict(self.data), self.version

    def write(self, updates: dict, expected_version: int) -> bool:
        """Write only if version matches. Returns False on conflict."""
        with self._lock:
            if self.version != expected_version:
                return False  # Conflict detected
            self.data.update(updates)
            self.version += 1
            return True

# Usage in an agent
state = VersionedState()

def agent_update(state: VersionedState, key: str, value: str) -> bool:
    """Agent reads, processes, then writes with version check."""
    data, version = state.read()

    # Agent does its work (potentially slow LLM call)
    processed_value = process(value)

    # Write with optimistic concurrency check
    success = state.write({key: processed_value}, expected_version=version)
    if not success:
        # Another agent modified state -- re-read and retry or escalate
        return False
    return True

This is optimistic concurrency control -- the same pattern databases use. The agent assumes it will not conflict, does its work, then checks at write time. If a conflict occurs, it re-reads the current state and retries (with a bounded retry count).

For multi-agent systems specifically, three conflict scenarios matter:

  1. Write-write conflict: Two agents update the same key. Solution: version stamps as shown above.
  2. Read-write conflict: Agent A reads state, Agent B modifies it, Agent A acts on stale data. Solution: version check before acting on read data.
  3. Semantic conflict: Two agents produce logically contradictory outputs (one says "approve," another says "reject"). Solution: a reconciliation step that detects contradictions and routes to a decision-maker.

Pattern 6: Bounded Retry Between Agents

When a handoff fails -- the reviewer rejects the draft, the publisher finds a formatting error, the code reviewer flags a bug -- the work routes back to the originating agent. Without bounds, this creates a retry loop that burns tokens and never converges.

The fix: every retry cycle carries a counter, a maximum, and a degradation strategy.

from dataclasses import dataclass
from enum import Enum

class RetryAction(Enum):
    RETRY = "retry"
    ESCALATE = "escalate"
    ABORT = "abort"

@dataclass
class BoundedRetry:
    max_attempts: int = 3
    current_attempt: int = 0
    feedback_history: list[str] = None

    def __post_init__(self):
        if self.feedback_history is None:
            self.feedback_history = []

    def should_continue(self, feedback: str) -> RetryAction:
        """Decide whether to retry, escalate, or abort."""
        self.current_attempt += 1
        self.feedback_history.append(feedback)

        if self.current_attempt >= self.max_attempts:
            return RetryAction.ESCALATE

        # Detect non-convergence: same feedback repeating
        if len(self.feedback_history) >= 2:
            if self.feedback_history[-1] == self.feedback_history[-2]:
                return RetryAction.ESCALATE

        return RetryAction.RETRY

# In a review cycle
retry = BoundedRetry(max_attempts=3)

def review_cycle(draft: str, reviewer_feedback: str) -> str:
    action = retry.should_continue(reviewer_feedback)

    if action == RetryAction.RETRY:
        return revise_draft(draft, reviewer_feedback)
    elif action == RetryAction.ESCALATE:
        return escalate_to_human(draft, retry.feedback_history)
    else:
        return abort_task(draft)

Two critical details in this pattern:

Non-convergence detection. If the reviewer gives the same feedback twice in a row, the writer is not able to address it. Retrying a third time will produce the same result. Escalate immediately instead of wasting the remaining attempt.

Feedback history accumulation. Each retry carries all previous feedback, not just the latest. This prevents the writer from fixing issue B while reintroducing issue A. The writer sees the full history and can avoid regressions.

Here is how this looks integrated into a LangGraph flow:

from langgraph.types import Command
from typing import Literal

def reviewer_agent(state: dict) -> Command[Literal["writer", "publisher", "escalation"]]:
    review = evaluate_draft(state["draft"])

    if review["approved"]:
        return Command(
            update={"review": review, "status": "approved"},
            goto="publisher",
        )

    attempt = state.get("attempt", 0) + 1
    if attempt >= 3:
        return Command(
            update={"review": review, "status": "escalated", "attempt": attempt},
            goto="escalation",
        )

    return Command(
        update={
            "review": review,
            "status": "revision_needed",
            "attempt": attempt,
            "feedback_history": state.get("feedback_history", []) + [review["feedback"]],
        },
        goto="writer",
    )

Putting It Together

These 6 patterns are not independent. They compose:

  • Handoff protocols carry the context that ownership boundaries protect.
  • State isolation prevents the conflicts that conflict detection catches.
  • Fan-out/fan-in uses bounded retry at the individual subtask level so one failing branch does not block the others.
  • Bounded retry feeds its failure data back through the handoff protocol so the receiving agent has full context.

The implementation order matters too. Start with ownership boundaries and handoff protocols. These prevent most coordination failures. Add conflict detection when you have agents that might run concurrently on overlapping resources. Implement fan-out/fan-in when you have naturally decomposable tasks. Add bounded retry last -- it is the safety net for when everything else works but the output quality is not sufficient.

The goal is not to build a distributed systems framework. The goal is to make 3-10 agents work together on a real task without losing context, overwriting each other's work, or burning your API budget on infinite retry loops.

Start with the handoff protocol. Make every transfer explicit. The rest follows.


Follow @klement_gunndu for more AI agent engineering content. We're building in public.

More from this blog

N

Netanel Systems by klement Gunndu

33 posts