Core Logic: Agent Loop & LLM Integration
kimi-code
Core Logic: Agent Loop & LLM Integration
KimiSoul - The Heart of the Agent
KimiSoul at src/kimi_cli/soul/kimisoul.py:89 is the main orchestrator that runs the agent loop.
class KimiSoul:
def __init__(
self,
agent: Agent,
context: Context,
*,
wire: Wire | None = None,
compaction: Compaction | None = None,
):
self._agent = agent
self._context = context
self._wire = wire or Wire()
self._compaction = compaction or SimpleCompaction()Main Entry: run()
The run() method at kimisoul.py:182 is the entry point for user input:
async def run(self, user_input: str) -> AsyncIterator[WireMessage]:
"""Process user input and yield wire messages."""
# Check for slash commands first
if user_input.startswith("/"):
# Handle skill or flow invocation
yield from self._handle_slash_command(user_input)
return
# Regular input goes to _turn()
yield from self._turn(Message(role="user", content=user_input))Agent Loop Flow
Single Turn: _turn()
# kimisoul.py:210
async def _turn(self, user_message: Message) -> AsyncIterator[WireMessage]:
"""Execute one turn of conversation."""
# Validate LLM is configured
if self._agent.runtime.llm is None:
raise LLMNotConfigured()
# Create checkpoint for this turn
self._checkpoint()
# Add user message to context
self._context.append_message(user_message)
# Run agent loop until completion
yield from self._agent_loop()Agent Loop: _agent_loop()
The core loop at kimisoul.py:302:
async def _agent_loop(self) -> AsyncIterator[WireMessage]:
"""Run steps until no more tool calls."""
step_no = 0
max_steps = self._agent.runtime.config.loop_control.max_steps_per_turn
while step_no < max_steps:
step_no += 1
# Check if context needs compaction
if self._needs_compaction():
yield from self._compact_context()
# Create checkpoint for this step
self._checkpoint()
try:
# Execute one step
outcome = await self._step()
if outcome is not None:
# Step completed without pending tool calls
yield WireMessage.TurnEnd(outcome)
return
except BackToTheFuture as btf:
# D-Mail triggered - revert to checkpoint
self._context.revert_to(btf.checkpoint_id)
self._checkpoint()
self._context.append_message(btf.messages)
# Continue loop from reverted stateSingle Step: _step()
Each step at kimisoul.py:382 involves one LLM call and tool execution:
async def _step(self) -> StepOutcome | None:
"""Execute one LLM call and handle tool responses."""
# 1. Call LLM via kosong
result: StepResult = await kosong.step(
llm=self._agent.runtime.llm,
system_prompt=self._agent.system_prompt,
messages=self._context.messages,
tools=self._agent.toolset.tools,
)
# 2. Send wire messages for response parts
for part in result.parts:
yield WireMessage.MessagePart(part)
# 3. Execute tool calls in parallel
tool_results = await result.tool_results(
handler=self._agent.toolset.handle
)
# 4. Append messages to context
self._context.append_message(result.assistant_message)
for tool_result in tool_results:
self._context.append_message(tool_result.message)
# 5. Check for tool rejection
if any(r.rejected for r in tool_results):
return StepOutcome("tool_rejected")
# 6. Check for pending D-Mail
dmail = self._agent.runtime.denwa_renji.fetch_pending_dmail()
if dmail:
raise BackToTheFuture(dmail.checkpoint_id, dmail.messages)
# 7. Return outcome if no more tool calls
if not result.has_tool_calls:
return StepOutcome("complete", result.final_message)
return None # Continue loopFlow Diagram
User Input
│
▼
┌─────────────────────────────────────────────────────────────┐
│ KimiSoul.run() │
│ │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ Is slash command? │ │
│ │ /skill:name → SkillRunner │ │
│ │ /flow:name → FlowRunner │ │
│ │ /builtin → Built-in handler │ │
│ └──────────────────────────────────────────────────────┘ │
│ │ No │
│ ▼ │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ _turn() │ │
│ │ 1. Validate LLM configured │ │
│ │ 2. Create checkpoint │ │
│ │ 3. Append user message │ │
│ │ 4. Enter _agent_loop() │ │
│ └──────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ _agent_loop() │ │
│ │ │ │
│ │ for step in range(max_steps): │ │
│ │ ┌────────────────────────────────────────────┐ │ │
│ │ │ Check context size → compact if needed │ │ │
│ │ └────────────────────────────────────────────┘ │ │
│ │ ┌────────────────────────────────────────────┐ │ │
│ │ │ Create checkpoint │ │ │
│ │ └────────────────────────────────────────────┘ │ │
│ │ ┌────────────────────────────────────────────┐ │ │
│ │ │ _step() │ │ │
│ │ │ ├── kosong.step() → LLM call │ │ │
│ │ │ ├── tool_results() → parallel execution │ │ │
│ │ │ ├── append messages to context │ │ │
│ │ │ ├── check for rejections │ │ │
│ │ │ └── check for D-Mail │ │ │
│ │ └────────────────────────────────────────────┘ │ │
│ │ ┌────────────────────────────────────────────┐ │ │
│ │ │ StepOutcome? │ │ │
│ │ │ Yes → Return TurnOutcome │ │ │
│ │ │ No → Continue loop │ │ │
│ │ └────────────────────────────────────────────┘ │ │
│ │ ┌────────────────────────────────────────────┐ │ │
│ │ │ BackToTheFuture caught? │ │ │
│ │ │ → Revert context to checkpoint │ │ │
│ │ │ → Append D-Mail message │ │ │
│ │ │ → Continue loop │ │ │
│ │ └────────────────────────────────────────────┘ │ │
│ └──────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘
│
▼
Wire Messages → UI
LLM Integration via kosong
The kosong library handles LLM interactions:
LLM Creation (llm.py:106)
def create_llm(
provider: LLMProvider,
model: LLMModel,
*,
thinking: bool | ThinkingEffort = False,
session_id: str | None = None,
oauth: OAuthManager | None = None,
) -> LLM | None:
"""Create LLM instance for the given provider."""
match provider.type:
case "kimi":
from kosong.chat_provider.kimi import Kimi
chat_provider = Kimi(
model=model.model,
base_url=provider.base_url,
api_key=provider.api_key,
custom_headers=provider.custom_headers,
)
case "openai_legacy":
from kosong.contrib.chat_provider.openai_legacy import OpenAILegacy
chat_provider = OpenAILegacy(...)
case "anthropic":
from kosong.contrib.chat_provider.anthropic import Anthropic
chat_provider = Anthropic(...)
# ... other providers
return LLM(
chat_provider=chat_provider,
max_context_size=model.max_context_size,
capabilities=model.capabilities or {},
)kosong.step()
The core LLM call:
result = await kosong.step(
llm=llm, # LLM instance
system_prompt=system_prompt, # Agent's system prompt
messages=context.messages, # Conversation history
tools=toolset.tools, # Available tools
)Returns a StepResult containing:
parts- Response parts (text, thinking, etc.)assistant_message- The full assistant messagetool_calls- List of tool calls to executehas_tool_calls- Whether there are pending callstool_results()- Method to execute tools
Tool Execution
Tools are executed via KimiToolset.handle():
# toolset.py
async def handle(self, tool_call: ToolCall) -> ToolResult:
"""Execute a tool call."""
# Set context variable for tool access
current_tool_call.set(tool_call)
try:
# Find and invoke tool
tool = self.find(tool_call.name)
result = await tool(tool_call.arguments)
return ToolResult(tool_call.id, result)
finally:
current_tool_call.set(None)Approval Flow
Tools that modify state request approval:
# In a tool like Shell
async def __call__(self, command: str) -> ToolReturnValue:
approved = await self._approval.request(
sender="Shell",
action="execute",
description=f"Run: {command}",
)
if not approved:
return ToolRejectedError("User rejected command")
# Execute command...Context Compaction
When context approaches the token limit:
# kimisoul.py:480
async def compact_context(self) -> AsyncIterator[WireMessage]:
"""Compact context when approaching token limit."""
yield WireMessage.CompactionBegin()
# Use compaction strategy (SimpleCompaction by default)
compacted_messages = await self._compaction.compact(
messages=self._context.messages,
system_prompt=self._agent.system_prompt,
)
# Replace context
self._context.clear()
self._checkpoint()
self._context.append_messages(compacted_messages)
yield WireMessage.CompactionEnd()D-Mail Time-Travel
The D-Mail system enables reverting to past checkpoints:
# SendDMail tool
async def __call__(self, checkpoint_id: int, message: str):
self._denwa_renji.send_dmail(DMail(
checkpoint_id=checkpoint_id,
messages=[Message(role="user", content=message)],
))
return ToolOk("D-Mail sent")
# In _step() after tool execution
dmail = self._agent.runtime.denwa_renji.fetch_pending_dmail()
if dmail:
raise BackToTheFuture(dmail.checkpoint_id, dmail.messages)
# In _agent_loop() exception handler
except BackToTheFuture as btf:
self._context.revert_to(btf.checkpoint_id)
self._checkpoint()
self._context.append_message(btf.messages)
# Loop continues from reverted stateWire Protocol
Communication between soul and UI:
# Wire message types (wire/types.py)
class WireMessage:
TurnBegin # Start of turn
StepBegin # Start of step
MessagePart # Text, image, thinking content
ToolCallRequest # Tool being called
ToolCallResult # Tool execution result
ApprovalRequest # Approval needed
ApprovalResponse # Approval decision
CompactionBegin # Starting compaction
CompactionEnd # Compaction complete
TurnEnd # Turn finishedRelated Documentation
- Overview - High-level architecture
- Key Abstractions - Classes and patterns
- Configuration - Config options