Middleware Pipeline
The middleware system provides extensible pre/post turn hooks for controlling conversation flow without coupling concerns to the core agent logic.
Overview
File: vibe/core/middleware.py
Middleware intercepts the conversation at two points:
- Before turn: Before each LLM API call
- After turn: After each LLM turn completes (including tool execution)
Core Types
MiddlewareAction
Enum: at middleware.py:14
# middleware.py:14-18
class MiddlewareAction(StrEnum):
CONTINUE = auto() # Proceed normally
STOP = auto() # Stop conversation
COMPACT = auto() # Trigger context compaction
INJECT_MESSAGE = auto() # Inject message into conversationConversationContext
Dataclass: at middleware.py:26
# middleware.py:26-30
@dataclass
class ConversationContext:
messages: list[LLMMessage] # Full message history
stats: AgentStats # Current statistics
config: VibeConfig # ConfigurationMiddlewareResult
Dataclass: at middleware.py:33
# middleware.py:33-38
@dataclass
class MiddlewareResult:
action: MiddlewareAction = MiddlewareAction.CONTINUE
message: str | None = None # For INJECT_MESSAGE
reason: str | None = None # For STOP (displayed to user)
metadata: dict[str, Any] = field(default_factory=dict)ConversationMiddleware Protocol
Protocol: at middleware.py:41
# middleware.py:41-46
class ConversationMiddleware(Protocol):
async def before_turn(self, context: ConversationContext) -> MiddlewareResult: ...
async def after_turn(self, context: ConversationContext) -> MiddlewareResult: ...
def reset(self, reset_reason: ResetReason = ResetReason.STOP) -> None: ...Built-in Middleware
1. TurnLimitMiddleware
Class: at middleware.py:49
Stops conversation after N LLM turns.
# middleware.py:49-65
class TurnLimitMiddleware:
def __init__(self, max_turns: int) -> None:
self.max_turns = max_turns
async def before_turn(self, context: ConversationContext) -> MiddlewareResult:
if context.stats.steps - 1 >= self.max_turns:
return MiddlewareResult(
action=MiddlewareAction.STOP,
reason=f"Turn limit of {self.max_turns} reached",
)
return MiddlewareResult()Used when: --max-turns CLI argument is provided.
2. PriceLimitMiddleware
Class: at middleware.py:68
Stops conversation when cost exceeds threshold.
# middleware.py:68-84
class PriceLimitMiddleware:
def __init__(self, max_price: float) -> None:
self.max_price = max_price
async def before_turn(self, context: ConversationContext) -> MiddlewareResult:
if context.stats.session_cost > self.max_price:
return MiddlewareResult(
action=MiddlewareAction.STOP,
reason=f"Price limit exceeded: ${context.stats.session_cost:.4f} > ${self.max_price:.2f}",
)
return MiddlewareResult()Used when: --max-price CLI argument is provided.
3. AutoCompactMiddleware
Class: at middleware.py:87
Triggers context compaction when token count exceeds threshold.
# middleware.py:87-106
class AutoCompactMiddleware:
def __init__(self, threshold: int) -> None:
self.threshold = threshold
async def before_turn(self, context: ConversationContext) -> MiddlewareResult:
if context.stats.context_tokens >= self.threshold:
return MiddlewareResult(
action=MiddlewareAction.COMPACT,
metadata={
"old_tokens": context.stats.context_tokens,
"threshold": self.threshold,
},
)
return MiddlewareResult()Used when: config.auto_compact_threshold > 0 (default: 200,000 tokens).
4. ContextWarningMiddleware
Class: at middleware.py:109
Warns user when approaching context limit.
# middleware.py:109-141
class ContextWarningMiddleware:
def __init__(
self, threshold_percent: float = 0.5, max_context: int | None = None
) -> None:
self.threshold_percent = threshold_percent
self.max_context = max_context
self.has_warned = False
async def before_turn(self, context: ConversationContext) -> MiddlewareResult:
if self.has_warned:
return MiddlewareResult()
max_context = self.max_context
if max_context is None:
return MiddlewareResult()
if context.stats.context_tokens >= max_context * self.threshold_percent:
self.has_warned = True
warning_msg = f"<{VIBE_WARNING_TAG}>You have used {percentage_used:.0f}% of your total context...</{VIBE_WARNING_TAG}>"
return MiddlewareResult(
action=MiddlewareAction.INJECT_MESSAGE,
message=warning_msg
)
return MiddlewareResult()Used when: config.context_warnings = true and auto_compact_threshold > 0.
5. PlanAgentMiddleware
Class: at middleware.py:152
Enforces read-only behavior when the plan agent is active.
# middleware.py:152-175
class PlanAgentMiddleware:
def __init__(
self,
profile_getter: Callable[[], AgentProfile],
reminder: str = PLAN_AGENT_REMINDER,
) -> None:
self._profile_getter = profile_getter
self.reminder = reminder
def _is_plan_agent(self) -> bool:
return self._profile_getter().name == BuiltinAgentName.PLAN
async def before_turn(self, context: ConversationContext) -> MiddlewareResult:
if not self._is_plan_agent():
return MiddlewareResult()
return MiddlewareResult(
action=MiddlewareAction.INJECT_MESSAGE, message=self.reminder
)When the active agent is "plan", this middleware injects a PLAN_AGENT_REMINDER message before every turn. The reminder instructs the LLM to not make any edits or run non-readonly tools, and to present a plan for user confirmation instead.
Constant: PLAN_AGENT_REMINDER at middleware.py:147
Used when: Always added to the pipeline (only activates when plan agent is selected).
MiddlewarePipeline
Class: at middleware.py:178
Orchestrates multiple middleware in sequence.
# middleware.py:144-191
class MiddlewarePipeline:
def __init__(self) -> None:
self.middlewares: list[ConversationMiddleware] = []
def add(self, middleware: ConversationMiddleware) -> MiddlewarePipeline:
self.middlewares.append(middleware)
return self
def clear(self) -> None:
self.middlewares.clear()
def reset(self, reset_reason: ResetReason = ResetReason.STOP) -> None:
for mw in self.middlewares:
mw.reset(reset_reason)Pipeline Execution
Before Turn: run_before_turn() at middleware.py:159
run_before_turn(context) [middleware.py:159-174]
│
messages_to_inject = []
│
for each middleware:
│
├─► result = await mw.before_turn(context)
│
├── If INJECT_MESSAGE:
│ messages_to_inject.append(result.message)
│
├── If STOP or COMPACT:
│ return result immediately
│
└── If CONTINUE:
continue to next middleware
│
└─► If any messages to inject:
return MiddlewareResult(INJECT_MESSAGE, combined_message)
After Turn: run_after_turn() at middleware.py:210
Similar to run_before_turn() but raises ValueError if any middleware returns INJECT_MESSAGE:
# middleware.py:210-220
async def run_after_turn(self, context: ConversationContext) -> MiddlewareResult:
for mw in self.middlewares:
result = await mw.after_turn(context)
if result.action == MiddlewareAction.INJECT_MESSAGE:
raise ValueError(
f"INJECT_MESSAGE not allowed in after_turn (from {type(mw).__name__})"
)
if result.action in {MiddlewareAction.STOP, MiddlewareAction.COMPACT}:
return result
return MiddlewareResult()Middleware Setup in Agent
Method: _setup_middleware() at agent_loop.py:166
# agent_loop.py:166-182
def _setup_middleware(self, max_turns: int | None, max_price: float | None) -> None:
self.middleware_pipeline.clear()
if max_turns is not None:
self.middleware_pipeline.add(TurnLimitMiddleware(max_turns))
if max_price is not None:
self.middleware_pipeline.add(PriceLimitMiddleware(max_price))
if self.config.auto_compact_threshold > 0:
self.middleware_pipeline.add(
AutoCompactMiddleware(self.config.auto_compact_threshold)
)
if self.config.context_warnings:
self.middleware_pipeline.add(
ContextWarningMiddleware(0.5, self.config.auto_compact_threshold)
)
# Always add PlanAgentMiddleware (only activates when plan agent is selected)
self.middleware_pipeline.add(
PlanAgentMiddleware(lambda: self.agent_manager.active_profile)
)Middleware Result Handling
Method: _handle_middleware_result() at agent_loop.py:184
_handle_middleware_result(result) [agent_loop.py:184-231]
│
match result.action:
│
├── STOP [agent_loop.py:188-200]:
│ - Yield AssistantEvent with stop message
│ - Save interaction
│
├── INJECT_MESSAGE [agent_loop.py:202-208]:
│ - Append message to last message's content
│
├── COMPACT [agent_loop.py:210-228]:
│ - Yield CompactStartEvent
│ - await agent.compact()
│ - Yield CompactEndEvent
│
└── CONTINUE [agent_loop.py:230-231]:
- Do nothing, proceed normally
Integration in Conversation Loop
_conversation_loop() [agent_loop.py:238]
│
while not should_break:
│
├─1─► BEFORE TURN [agent_loop.py:245-254]
│ result = await middleware_pipeline.run_before_turn(context)
│ async for event in _handle_middleware_result(result):
│ yield event
│ if result.action == STOP:
│ return
│
├─2─► Perform LLM turn
│
└─3─► AFTER TURN [agent_loop.py:282-291]
result = await middleware_pipeline.run_after_turn(context)
async for event in _handle_middleware_result(result):
yield event
if result.action == STOP:
return
Execution Flow Diagram
User Message
│
▼
┌─────────────────────────────────────────────────────────┐
│ BEFORE TURN │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ TurnLimit │─►│ PriceLimit │─►│ AutoCompact │──┐ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │ │
│ │ │
│ ┌─────────────────────────────────────────────────┐│ │
│ │ ContextWarning ││ │
│ └─────────────────────────────────────────────────┘│ │
│ ┌─────────────────────────────────────────────────┐│ │
│ │ PlanAgent (inject reminder if plan mode) ││ │
│ └─────────────────────────────────────────────────┘│ │
└──────────────────────────────────────────────────────┼──┘
│ │
│ CONTINUE STOP/COMPACT │
▼ ▼
┌─────────────────┐ ┌─────────────────┐
│ LLM Turn │ │ Stop or │
│ + Tools │ │ Compact │
└────────┬────────┘ └─────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ AFTER TURN │
│ Same middleware sequence │
└─────────────────────────────────────────────────────────┘
│
▼
Next iteration or exit loop
Reset Behavior
Enum: ResetReason at middleware.py:21
# middleware.py:21-23
class ResetReason(StrEnum):
STOP = auto() # Conversation ended
COMPACT = auto() # Context was compactedMiddleware can behave differently based on reset reason:
ContextWarningMiddlewareresetshas_warnedflag on any reset- Other middleware currently ignore reset reason
Creating Custom Middleware
from vibe.core.middleware import (
ConversationMiddleware,
ConversationContext,
MiddlewareResult,
MiddlewareAction,
ResetReason,
)
class CustomMiddleware:
def __init__(self, some_threshold: int) -> None:
self.threshold = some_threshold
self.state = {}
async def before_turn(self, context: ConversationContext) -> MiddlewareResult:
# Check condition
if some_condition(context):
return MiddlewareResult(
action=MiddlewareAction.STOP,
reason="Custom stop reason"
)
return MiddlewareResult() # CONTINUE
async def after_turn(self, context: ConversationContext) -> MiddlewareResult:
# Can also check after turn completes
return MiddlewareResult()
def reset(self, reset_reason: ResetReason = ResetReason.STOP) -> None:
self.state = {}Source File References
| File | Key Lines | Description |
|---|---|---|
middleware.py:14-18 |
MiddlewareAction |
Action enum |
middleware.py:21-23 |
ResetReason |
Reset reason enum |
middleware.py:26-30 |
ConversationContext |
Context dataclass |
middleware.py:33-38 |
MiddlewareResult |
Result dataclass |
middleware.py:41-46 |
ConversationMiddleware |
Protocol definition |
middleware.py:49-65 |
TurnLimitMiddleware |
Turn limit implementation |
middleware.py:68-84 |
PriceLimitMiddleware |
Price limit implementation |
middleware.py:87-106 |
AutoCompactMiddleware |
Auto-compact implementation |
middleware.py:109-141 |
ContextWarningMiddleware |
Warning implementation |
middleware.py:147-149 |
PLAN_AGENT_REMINDER |
Plan mode enforcement message |
middleware.py:152-175 |
PlanAgentMiddleware |
Plan agent enforcement |
middleware.py:178-220 |
MiddlewarePipeline |
Pipeline orchestration |
agent_loop.py |
_setup_middleware() |
Middleware setup |
agent_loop.py |
_handle_middleware_result() |
Result handling |