CodeDocs Vault

Middleware Pipeline

The middleware system provides extensible pre/post turn hooks for controlling conversation flow without coupling concerns to the core agent logic.

Overview

File: vibe/core/middleware.py

Middleware intercepts the conversation at two points:

  1. Before turn: Before each LLM API call
  2. After turn: After each LLM turn completes (including tool execution)

Core Types

MiddlewareAction

Enum: at middleware.py:14

# middleware.py:14-18
class MiddlewareAction(StrEnum):
    CONTINUE = auto()       # Proceed normally
    STOP = auto()           # Stop conversation
    COMPACT = auto()        # Trigger context compaction
    INJECT_MESSAGE = auto() # Inject message into conversation

ConversationContext

Dataclass: at middleware.py:26

# middleware.py:26-30
@dataclass
class ConversationContext:
    messages: list[LLMMessage]  # Full message history
    stats: AgentStats           # Current statistics
    config: VibeConfig          # Configuration

MiddlewareResult

Dataclass: at middleware.py:33

# middleware.py:33-38
@dataclass
class MiddlewareResult:
    action: MiddlewareAction = MiddlewareAction.CONTINUE
    message: str | None = None           # For INJECT_MESSAGE
    reason: str | None = None            # For STOP (displayed to user)
    metadata: dict[str, Any] = field(default_factory=dict)

ConversationMiddleware Protocol

Protocol: at middleware.py:41

# middleware.py:41-46
class ConversationMiddleware(Protocol):
    async def before_turn(self, context: ConversationContext) -> MiddlewareResult: ...
    async def after_turn(self, context: ConversationContext) -> MiddlewareResult: ...
    def reset(self, reset_reason: ResetReason = ResetReason.STOP) -> None: ...

Built-in Middleware

1. TurnLimitMiddleware

Class: at middleware.py:49

Stops conversation after N LLM turns.

# middleware.py:49-65
class TurnLimitMiddleware:
    def __init__(self, max_turns: int) -> None:
        self.max_turns = max_turns
 
    async def before_turn(self, context: ConversationContext) -> MiddlewareResult:
        if context.stats.steps - 1 >= self.max_turns:
            return MiddlewareResult(
                action=MiddlewareAction.STOP,
                reason=f"Turn limit of {self.max_turns} reached",
            )
        return MiddlewareResult()

Used when: --max-turns CLI argument is provided.

2. PriceLimitMiddleware

Class: at middleware.py:68

Stops conversation when cost exceeds threshold.

# middleware.py:68-84
class PriceLimitMiddleware:
    def __init__(self, max_price: float) -> None:
        self.max_price = max_price
 
    async def before_turn(self, context: ConversationContext) -> MiddlewareResult:
        if context.stats.session_cost > self.max_price:
            return MiddlewareResult(
                action=MiddlewareAction.STOP,
                reason=f"Price limit exceeded: ${context.stats.session_cost:.4f} > ${self.max_price:.2f}",
            )
        return MiddlewareResult()

Used when: --max-price CLI argument is provided.

3. AutoCompactMiddleware

Class: at middleware.py:87

Triggers context compaction when token count exceeds threshold.

# middleware.py:87-106
class AutoCompactMiddleware:
    def __init__(self, threshold: int) -> None:
        self.threshold = threshold
 
    async def before_turn(self, context: ConversationContext) -> MiddlewareResult:
        if context.stats.context_tokens >= self.threshold:
            return MiddlewareResult(
                action=MiddlewareAction.COMPACT,
                metadata={
                    "old_tokens": context.stats.context_tokens,
                    "threshold": self.threshold,
                },
            )
        return MiddlewareResult()

Used when: config.auto_compact_threshold > 0 (default: 200,000 tokens).

4. ContextWarningMiddleware

Class: at middleware.py:109

Warns user when approaching context limit.

# middleware.py:109-141
class ContextWarningMiddleware:
    def __init__(
        self, threshold_percent: float = 0.5, max_context: int | None = None
    ) -> None:
        self.threshold_percent = threshold_percent
        self.max_context = max_context
        self.has_warned = False
 
    async def before_turn(self, context: ConversationContext) -> MiddlewareResult:
        if self.has_warned:
            return MiddlewareResult()
 
        max_context = self.max_context
        if max_context is None:
            return MiddlewareResult()
 
        if context.stats.context_tokens >= max_context * self.threshold_percent:
            self.has_warned = True
            warning_msg = f"<{VIBE_WARNING_TAG}>You have used {percentage_used:.0f}% of your total context...</{VIBE_WARNING_TAG}>"
            return MiddlewareResult(
                action=MiddlewareAction.INJECT_MESSAGE,
                message=warning_msg
            )
 
        return MiddlewareResult()

Used when: config.context_warnings = true and auto_compact_threshold > 0.

5. PlanAgentMiddleware

Class: at middleware.py:152

Enforces read-only behavior when the plan agent is active.

# middleware.py:152-175
class PlanAgentMiddleware:
    def __init__(
        self,
        profile_getter: Callable[[], AgentProfile],
        reminder: str = PLAN_AGENT_REMINDER,
    ) -> None:
        self._profile_getter = profile_getter
        self.reminder = reminder
 
    def _is_plan_agent(self) -> bool:
        return self._profile_getter().name == BuiltinAgentName.PLAN
 
    async def before_turn(self, context: ConversationContext) -> MiddlewareResult:
        if not self._is_plan_agent():
            return MiddlewareResult()
        return MiddlewareResult(
            action=MiddlewareAction.INJECT_MESSAGE, message=self.reminder
        )

When the active agent is "plan", this middleware injects a PLAN_AGENT_REMINDER message before every turn. The reminder instructs the LLM to not make any edits or run non-readonly tools, and to present a plan for user confirmation instead.

Constant: PLAN_AGENT_REMINDER at middleware.py:147

Used when: Always added to the pipeline (only activates when plan agent is selected).

MiddlewarePipeline

Class: at middleware.py:178

Orchestrates multiple middleware in sequence.

# middleware.py:144-191
class MiddlewarePipeline:
    def __init__(self) -> None:
        self.middlewares: list[ConversationMiddleware] = []
 
    def add(self, middleware: ConversationMiddleware) -> MiddlewarePipeline:
        self.middlewares.append(middleware)
        return self
 
    def clear(self) -> None:
        self.middlewares.clear()
 
    def reset(self, reset_reason: ResetReason = ResetReason.STOP) -> None:
        for mw in self.middlewares:
            mw.reset(reset_reason)

Pipeline Execution

Before Turn: run_before_turn() at middleware.py:159

run_before_turn(context) [middleware.py:159-174]
    │
    messages_to_inject = []
    │
    for each middleware:
    │
    ├─► result = await mw.before_turn(context)
    │
    ├── If INJECT_MESSAGE:
    │   messages_to_inject.append(result.message)
    │
    ├── If STOP or COMPACT:
    │   return result immediately
    │
    └── If CONTINUE:
        continue to next middleware
    │
    └─► If any messages to inject:
        return MiddlewareResult(INJECT_MESSAGE, combined_message)

After Turn: run_after_turn() at middleware.py:210

Similar to run_before_turn() but raises ValueError if any middleware returns INJECT_MESSAGE:

# middleware.py:210-220
async def run_after_turn(self, context: ConversationContext) -> MiddlewareResult:
    for mw in self.middlewares:
        result = await mw.after_turn(context)
        if result.action == MiddlewareAction.INJECT_MESSAGE:
            raise ValueError(
                f"INJECT_MESSAGE not allowed in after_turn (from {type(mw).__name__})"
            )
        if result.action in {MiddlewareAction.STOP, MiddlewareAction.COMPACT}:
            return result
    return MiddlewareResult()

Middleware Setup in Agent

Method: _setup_middleware() at agent_loop.py:166

# agent_loop.py:166-182
def _setup_middleware(self, max_turns: int | None, max_price: float | None) -> None:
    self.middleware_pipeline.clear()
 
    if max_turns is not None:
        self.middleware_pipeline.add(TurnLimitMiddleware(max_turns))
 
    if max_price is not None:
        self.middleware_pipeline.add(PriceLimitMiddleware(max_price))
 
    if self.config.auto_compact_threshold > 0:
        self.middleware_pipeline.add(
            AutoCompactMiddleware(self.config.auto_compact_threshold)
        )
        if self.config.context_warnings:
            self.middleware_pipeline.add(
                ContextWarningMiddleware(0.5, self.config.auto_compact_threshold)
            )
 
    # Always add PlanAgentMiddleware (only activates when plan agent is selected)
    self.middleware_pipeline.add(
        PlanAgentMiddleware(lambda: self.agent_manager.active_profile)
    )

Middleware Result Handling

Method: _handle_middleware_result() at agent_loop.py:184

_handle_middleware_result(result) [agent_loop.py:184-231]
    │
    match result.action:
    │
    ├── STOP [agent_loop.py:188-200]:
    │   - Yield AssistantEvent with stop message
    │   - Save interaction
    │
    ├── INJECT_MESSAGE [agent_loop.py:202-208]:
    │   - Append message to last message's content
    │
    ├── COMPACT [agent_loop.py:210-228]:
    │   - Yield CompactStartEvent
    │   - await agent.compact()
    │   - Yield CompactEndEvent
    │
    └── CONTINUE [agent_loop.py:230-231]:
        - Do nothing, proceed normally

Integration in Conversation Loop

_conversation_loop() [agent_loop.py:238]
    │
    while not should_break:
    │
    ├─1─► BEFORE TURN [agent_loop.py:245-254]
    │     result = await middleware_pipeline.run_before_turn(context)
    │     async for event in _handle_middleware_result(result):
    │         yield event
    │     if result.action == STOP:
    │         return
    │
    ├─2─► Perform LLM turn
    │
    └─3─► AFTER TURN [agent_loop.py:282-291]
          result = await middleware_pipeline.run_after_turn(context)
          async for event in _handle_middleware_result(result):
              yield event
          if result.action == STOP:
              return

Execution Flow Diagram

User Message
    │
    ▼
┌─────────────────────────────────────────────────────────┐
│                    BEFORE TURN                          │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐     │
│  │ TurnLimit   │─►│ PriceLimit  │─►│ AutoCompact │──┐  │
│  └─────────────┘  └─────────────┘  └─────────────┘  │  │
│                                                      │  │
│  ┌─────────────────────────────────────────────────┐│  │
│  │ ContextWarning                                  ││  │
│  └─────────────────────────────────────────────────┘│  │
│  ┌─────────────────────────────────────────────────┐│  │
│  │ PlanAgent (inject reminder if plan mode)        ││  │
│  └─────────────────────────────────────────────────┘│  │
└──────────────────────────────────────────────────────┼──┘
    │                                                  │
    │ CONTINUE                        STOP/COMPACT     │
    ▼                                                  ▼
┌─────────────────┐                     ┌─────────────────┐
│   LLM Turn      │                     │  Stop or        │
│   + Tools       │                     │  Compact        │
└────────┬────────┘                     └─────────────────┘
         │
         ▼
┌─────────────────────────────────────────────────────────┐
│                    AFTER TURN                           │
│  Same middleware sequence                               │
└─────────────────────────────────────────────────────────┘
         │
         ▼
    Next iteration or exit loop

Reset Behavior

Enum: ResetReason at middleware.py:21

# middleware.py:21-23
class ResetReason(StrEnum):
    STOP = auto()     # Conversation ended
    COMPACT = auto()  # Context was compacted

Middleware can behave differently based on reset reason:

Creating Custom Middleware

from vibe.core.middleware import (
    ConversationMiddleware,
    ConversationContext,
    MiddlewareResult,
    MiddlewareAction,
    ResetReason,
)
 
class CustomMiddleware:
    def __init__(self, some_threshold: int) -> None:
        self.threshold = some_threshold
        self.state = {}
 
    async def before_turn(self, context: ConversationContext) -> MiddlewareResult:
        # Check condition
        if some_condition(context):
            return MiddlewareResult(
                action=MiddlewareAction.STOP,
                reason="Custom stop reason"
            )
        return MiddlewareResult()  # CONTINUE
 
    async def after_turn(self, context: ConversationContext) -> MiddlewareResult:
        # Can also check after turn completes
        return MiddlewareResult()
 
    def reset(self, reset_reason: ResetReason = ResetReason.STOP) -> None:
        self.state = {}

Source File References

File Key Lines Description
middleware.py:14-18 MiddlewareAction Action enum
middleware.py:21-23 ResetReason Reset reason enum
middleware.py:26-30 ConversationContext Context dataclass
middleware.py:33-38 MiddlewareResult Result dataclass
middleware.py:41-46 ConversationMiddleware Protocol definition
middleware.py:49-65 TurnLimitMiddleware Turn limit implementation
middleware.py:68-84 PriceLimitMiddleware Price limit implementation
middleware.py:87-106 AutoCompactMiddleware Auto-compact implementation
middleware.py:109-141 ContextWarningMiddleware Warning implementation
middleware.py:147-149 PLAN_AGENT_REMINDER Plan mode enforcement message
middleware.py:152-175 PlanAgentMiddleware Plan agent enforcement
middleware.py:178-220 MiddlewarePipeline Pipeline orchestration
agent_loop.py _setup_middleware() Middleware setup
agent_loop.py _handle_middleware_result() Result handling