CodeDocs Vault

Core Logic: Agent Loop & LLM Integration

KimiSoul - The Heart of the Agent

KimiSoul at src/kimi_cli/soul/kimisoul.py:89 is the main orchestrator that runs the agent loop.

class KimiSoul:
    def __init__(
        self,
        agent: Agent,
        context: Context,
        *,
        wire: Wire | None = None,
        compaction: Compaction | None = None,
    ):
        self._agent = agent
        self._context = context
        self._wire = wire or Wire()
        self._compaction = compaction or SimpleCompaction()

Main Entry: run()

The run() method at kimisoul.py:182 is the entry point for user input:

async def run(self, user_input: str) -> AsyncIterator[WireMessage]:
    """Process user input and yield wire messages."""
    # Check for slash commands first
    if user_input.startswith("/"):
        # Handle skill or flow invocation
        yield from self._handle_slash_command(user_input)
        return
 
    # Regular input goes to _turn()
    yield from self._turn(Message(role="user", content=user_input))

Agent Loop Flow

Single Turn: _turn()

# kimisoul.py:210
async def _turn(self, user_message: Message) -> AsyncIterator[WireMessage]:
    """Execute one turn of conversation."""
    # Validate LLM is configured
    if self._agent.runtime.llm is None:
        raise LLMNotConfigured()
 
    # Create checkpoint for this turn
    self._checkpoint()
 
    # Add user message to context
    self._context.append_message(user_message)
 
    # Run agent loop until completion
    yield from self._agent_loop()

Agent Loop: _agent_loop()

The core loop at kimisoul.py:302:

async def _agent_loop(self) -> AsyncIterator[WireMessage]:
    """Run steps until no more tool calls."""
    step_no = 0
    max_steps = self._agent.runtime.config.loop_control.max_steps_per_turn
 
    while step_no < max_steps:
        step_no += 1
 
        # Check if context needs compaction
        if self._needs_compaction():
            yield from self._compact_context()
 
        # Create checkpoint for this step
        self._checkpoint()
 
        try:
            # Execute one step
            outcome = await self._step()
 
            if outcome is not None:
                # Step completed without pending tool calls
                yield WireMessage.TurnEnd(outcome)
                return
 
        except BackToTheFuture as btf:
            # D-Mail triggered - revert to checkpoint
            self._context.revert_to(btf.checkpoint_id)
            self._checkpoint()
            self._context.append_message(btf.messages)
            # Continue loop from reverted state

Single Step: _step()

Each step at kimisoul.py:382 involves one LLM call and tool execution:

async def _step(self) -> StepOutcome | None:
    """Execute one LLM call and handle tool responses."""
 
    # 1. Call LLM via kosong
    result: StepResult = await kosong.step(
        llm=self._agent.runtime.llm,
        system_prompt=self._agent.system_prompt,
        messages=self._context.messages,
        tools=self._agent.toolset.tools,
    )
 
    # 2. Send wire messages for response parts
    for part in result.parts:
        yield WireMessage.MessagePart(part)
 
    # 3. Execute tool calls in parallel
    tool_results = await result.tool_results(
        handler=self._agent.toolset.handle
    )
 
    # 4. Append messages to context
    self._context.append_message(result.assistant_message)
    for tool_result in tool_results:
        self._context.append_message(tool_result.message)
 
    # 5. Check for tool rejection
    if any(r.rejected for r in tool_results):
        return StepOutcome("tool_rejected")
 
    # 6. Check for pending D-Mail
    dmail = self._agent.runtime.denwa_renji.fetch_pending_dmail()
    if dmail:
        raise BackToTheFuture(dmail.checkpoint_id, dmail.messages)
 
    # 7. Return outcome if no more tool calls
    if not result.has_tool_calls:
        return StepOutcome("complete", result.final_message)
 
    return None  # Continue loop

Flow Diagram

User Input
    │
    ▼
┌─────────────────────────────────────────────────────────────┐
│                    KimiSoul.run()                            │
│                                                              │
│  ┌──────────────────────────────────────────────────────┐   │
│  │ Is slash command?                                     │   │
│  │   /skill:name → SkillRunner                          │   │
│  │   /flow:name  → FlowRunner                           │   │
│  │   /builtin    → Built-in handler                     │   │
│  └──────────────────────────────────────────────────────┘   │
│                         │ No                                 │
│                         ▼                                    │
│  ┌──────────────────────────────────────────────────────┐   │
│  │                    _turn()                            │   │
│  │  1. Validate LLM configured                          │   │
│  │  2. Create checkpoint                                 │   │
│  │  3. Append user message                              │   │
│  │  4. Enter _agent_loop()                              │   │
│  └──────────────────────────────────────────────────────┘   │
│                         │                                    │
│                         ▼                                    │
│  ┌──────────────────────────────────────────────────────┐   │
│  │               _agent_loop()                           │   │
│  │                                                       │   │
│  │  for step in range(max_steps):                       │   │
│  │    ┌────────────────────────────────────────────┐    │   │
│  │    │ Check context size → compact if needed      │    │   │
│  │    └────────────────────────────────────────────┘    │   │
│  │    ┌────────────────────────────────────────────┐    │   │
│  │    │ Create checkpoint                           │    │   │
│  │    └────────────────────────────────────────────┘    │   │
│  │    ┌────────────────────────────────────────────┐    │   │
│  │    │ _step()                                     │    │   │
│  │    │   ├── kosong.step() → LLM call             │    │   │
│  │    │   ├── tool_results() → parallel execution  │    │   │
│  │    │   ├── append messages to context           │    │   │
│  │    │   ├── check for rejections                 │    │   │
│  │    │   └── check for D-Mail                     │    │   │
│  │    └────────────────────────────────────────────┘    │   │
│  │    ┌────────────────────────────────────────────┐    │   │
│  │    │ StepOutcome?                                │    │   │
│  │    │   Yes → Return TurnOutcome                  │    │   │
│  │    │   No  → Continue loop                       │    │   │
│  │    └────────────────────────────────────────────┘    │   │
│  │    ┌────────────────────────────────────────────┐    │   │
│  │    │ BackToTheFuture caught?                     │    │   │
│  │    │   → Revert context to checkpoint            │    │   │
│  │    │   → Append D-Mail message                   │    │   │
│  │    │   → Continue loop                           │    │   │
│  │    └────────────────────────────────────────────┘    │   │
│  └──────────────────────────────────────────────────────┘   │
└─────────────────────────────────────────────────────────────┘
    │
    ▼
Wire Messages → UI

LLM Integration via kosong

The kosong library handles LLM interactions:

LLM Creation (llm.py:106)

def create_llm(
    provider: LLMProvider,
    model: LLMModel,
    *,
    thinking: bool | ThinkingEffort = False,
    session_id: str | None = None,
    oauth: OAuthManager | None = None,
) -> LLM | None:
    """Create LLM instance for the given provider."""
 
    match provider.type:
        case "kimi":
            from kosong.chat_provider.kimi import Kimi
            chat_provider = Kimi(
                model=model.model,
                base_url=provider.base_url,
                api_key=provider.api_key,
                custom_headers=provider.custom_headers,
            )
 
        case "openai_legacy":
            from kosong.contrib.chat_provider.openai_legacy import OpenAILegacy
            chat_provider = OpenAILegacy(...)
 
        case "anthropic":
            from kosong.contrib.chat_provider.anthropic import Anthropic
            chat_provider = Anthropic(...)
 
        # ... other providers
 
    return LLM(
        chat_provider=chat_provider,
        max_context_size=model.max_context_size,
        capabilities=model.capabilities or {},
    )

kosong.step()

The core LLM call:

result = await kosong.step(
    llm=llm,                           # LLM instance
    system_prompt=system_prompt,       # Agent's system prompt
    messages=context.messages,         # Conversation history
    tools=toolset.tools,              # Available tools
)

Returns a StepResult containing:

Tool Execution

Tools are executed via KimiToolset.handle():

# toolset.py
async def handle(self, tool_call: ToolCall) -> ToolResult:
    """Execute a tool call."""
    # Set context variable for tool access
    current_tool_call.set(tool_call)
 
    try:
        # Find and invoke tool
        tool = self.find(tool_call.name)
        result = await tool(tool_call.arguments)
        return ToolResult(tool_call.id, result)
    finally:
        current_tool_call.set(None)

Approval Flow

Tools that modify state request approval:

# In a tool like Shell
async def __call__(self, command: str) -> ToolReturnValue:
    approved = await self._approval.request(
        sender="Shell",
        action="execute",
        description=f"Run: {command}",
    )
 
    if not approved:
        return ToolRejectedError("User rejected command")
 
    # Execute command...

Context Compaction

When context approaches the token limit:

# kimisoul.py:480
async def compact_context(self) -> AsyncIterator[WireMessage]:
    """Compact context when approaching token limit."""
    yield WireMessage.CompactionBegin()
 
    # Use compaction strategy (SimpleCompaction by default)
    compacted_messages = await self._compaction.compact(
        messages=self._context.messages,
        system_prompt=self._agent.system_prompt,
    )
 
    # Replace context
    self._context.clear()
    self._checkpoint()
    self._context.append_messages(compacted_messages)
 
    yield WireMessage.CompactionEnd()

D-Mail Time-Travel

The D-Mail system enables reverting to past checkpoints:

# SendDMail tool
async def __call__(self, checkpoint_id: int, message: str):
    self._denwa_renji.send_dmail(DMail(
        checkpoint_id=checkpoint_id,
        messages=[Message(role="user", content=message)],
    ))
    return ToolOk("D-Mail sent")
 
# In _step() after tool execution
dmail = self._agent.runtime.denwa_renji.fetch_pending_dmail()
if dmail:
    raise BackToTheFuture(dmail.checkpoint_id, dmail.messages)
 
# In _agent_loop() exception handler
except BackToTheFuture as btf:
    self._context.revert_to(btf.checkpoint_id)
    self._checkpoint()
    self._context.append_message(btf.messages)
    # Loop continues from reverted state

Wire Protocol

Communication between soul and UI:

# Wire message types (wire/types.py)
class WireMessage:
    TurnBegin      # Start of turn
    StepBegin      # Start of step
    MessagePart    # Text, image, thinking content
    ToolCallRequest # Tool being called
    ToolCallResult  # Tool execution result
    ApprovalRequest # Approval needed
    ApprovalResponse # Approval decision
    CompactionBegin # Starting compaction
    CompactionEnd   # Compaction complete
    TurnEnd        # Turn finished