CodeDocs Vault

Hermes Agent - Gateway & Messaging Architecture

Overview

The gateway is a single process that connects to 20+ messaging platforms simultaneously, routing messages to the agent core and streaming responses back. It manages sessions, handles commands, executes cron jobs, and provides graceful lifecycle management.

Architecture

    ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐
    │ Telegram │ │ Discord  │ │  Slack   │ │ WhatsApp │ │  Signal  │ ...
    │  Bot API │ │WebSocket │ │  Bolt    │ │ Baileys  │ │ HTTP API │
    └────┬─────┘ └────┬─────┘ └────┬─────┘ └────┬─────┘ └────┬─────┘
         │            │            │            │            │
         ▼            ▼            ▼            ▼            ▼
    ┌─────────────────────────────────────────────────────────────────┐
    │                    Platform Adapters                             │
    │              (inherit BasePlatformAdapter)                       │
    │                                                                  │
    │   Each adapter:                                                  │
    │   - Connects to platform API                                    │
    │   - Parses platform-specific message format                     │
    │   - Normalizes to MessageEvent                                  │
    │   - Calls self._message_handler(event)                          │
    │   - Formats and sends responses back                            │
    └─────────────────────────────┬───────────────────────────────────┘
                                  │
                                  ▼
    ┌─────────────────────────────────────────────────────────────────┐
    │                      GatewayRunner                               │
    │                    (gateway/run.py)                               │
    │                                                                  │
    │   _handle_message(event):                                       │
    │   ├── Authorization check (user allowlist)                      │
    │   ├── Command parsing (/new, /model, /stop, etc.)              │
    │   ├── Session lookup or creation                                │
    │   ├── Context injection (platform, chat name, etc.)            │
    │   ├── Agent invocation (AIAgent.run_conversation)               │
    │   └── Response delivery (via adapter.send or stream_consumer)  │
    │                                                                  │
    │   Background tasks:                                             │
    │   ├── Session expiry watcher (every 300s)                      │
    │   ├── Platform reconnect watcher (exp backoff)                 │
    │   └── Cron ticker (every 60s)                                  │
    └─────────────────────────────────────────────────────────────────┘

Entry Points

Path Invocation
cli.py --gateway asyncio.run(start_gateway())
hermes gateway run Same as above
systemd service hermes-gateway.service
# cli.py:9870-9875
if gateway:
    import asyncio
    from gateway.run import start_gateway
    print("Starting Hermes Gateway (messaging platforms)...")
    asyncio.run(start_gateway())

GatewayRunner (gateway/run.py, 9,800 lines)

Startup Sequence (async def start(), line 1727)

1. Load configuration
   ├── .env from ~/.hermes/.env
   ├── Bridge config.yaml values to environment variables
   └── SSL certificate auto-detection (NixOS compat)

2. Initialize platform adapters
   ├── Loop through enabled platforms
   ├── Create adapters via _create_adapter() (line 2409)
   ├── Call adapter.connect()
   └── Set message handler: adapter.set_message_handler(self._handle_message)

3. Spawn background tasks
   ├── Session expiry watcher (every 300s)
   │   └── Proactively flush expired session memories
   ├── Platform reconnect watcher
   │   └── Retries failed platforms with exponential backoff (max 300s, 20 retries)
   └── Cron ticker (every 60s)
       └── Executes due cron jobs, delivers results via DeliveryRouter

4. Hook system
   ├── Discover hooks from ~/.hermes/hooks/
   └── Emit gateway:startup event

Message Processing Pipeline (_handle_message, line 2681)

Incoming MessageEvent
    │
    ├── Step 1: Authorization
    │   ├── Check user against allowlist (per-platform)
    │   ├── If GATEWAY_ALLOW_ALL_USERS=false (default): reject unknown users
    │   └── Pairing code flow for new users
    │
    ├── Step 2: Command Detection
    │   ├── event.get_command() extracts slash command
    │   └── Route to dedicated handler (e.g., _handle_reset, _handle_model)
    │
    ├── Step 3: Session Management
    │   ├── Derive session_key from platform + chat_id + user_id
    │   ├── Lookup or create session in cache
    │   ├── Apply reset policy (both/idle/daily/none)
    │   └── Load conversation history from SessionDB
    │
    ├── Step 4: Context Injection
    │   ├── Platform info (telegram/discord/slack/etc.)
    │   ├── Chat name and type (group/DM/channel)
    │   ├── Thread context
    │   └── Voice memo transcription
    │
    ├── Step 5: Agent Invocation
    │   ├── Create or reuse AIAgent instance
    │   ├── Call run_conversation(user_message)
    │   └── Attach GatewayStreamConsumer for progressive editing
    │
    └── Step 6: Response Delivery
        ├── Stream tokens via progressive message editing
        ├── Handle media attachments (images, voice, documents)
        └── Platform-specific formatting (Markdown escaping, etc.)

Slash Commands (line 2750+)

Command Handler Location Purpose
/status line 4411 Show session status, running agents
/reset, /new line 4281 Reset session, clear history
/stop line 4447 Interrupt running agent
/restart line 4479 Gracefully restart gateway
/help line 4518 List available commands
/model <name> line 4593 Switch inference model
/provider <name> line 4920 Change provider
/personality <name> line 4977 Set system prompt personality
/retry line 5052 Rerun last message
/undo line 5088 Remove last exchange
/set-home <target> line 5113 Set default delivery destination
/voice <mode> line 5156 Enable/disable voice
/compress - Compress context
/usage - Show token usage
/insights [days] - Usage analytics
/skills - Browse available skills
/platforms - Platform status

Platform Adapters (gateway/platforms/)

Base Class (gateway/platforms/base.py, 2,133 lines)

All adapters inherit BasePlatformAdapter:

class BasePlatformAdapter:
    async def connect(self)                    # Establish platform connection
    async def disconnect(self)                 # Clean shutdown
    async def send(self, chat_id, content)     # Send message
    async def handle_message(self, event)      # Entry point (line 1521)
    async def send_image(self, chat_id, ...)   # Platform-specific media
    async def send_voice(self, chat_id, ...)
    async def send_document(self, chat_id, ...)
    
    def set_message_handler(self, handler)     # Gateway sets this

Supported Platforms

Platform File Connection Method Notes
Telegram telegram.py (1,800 lines) Bot API (long-poll/webhook) Voice memos, forum topics
Discord discord.py (2,000 lines) WebSocket Voice channels, per-guild config
Slack slack.py Bolt framework + Socket Mode Thread support
WhatsApp whatsapp.py Baileys (Node.js bridge) E2E encrypted
Signal signal.py HTTP API Direct messaging
Matrix matrix.py nio library E2E encryption, room management
Mattermost mattermost.py WebSocket Self-hosted Slack alternative
Email email.py IMAP/SMTP Full email conversations
SMS sms.py Twilio Text messaging
Home Assistant homeassistant.py WebSocket IoT integration
BlueBubbles bluebubbles.py HTTP iMessage bridge
WeChat Work wecom.py HTTP callback Enterprise WeChat
WeChat weixin.py HTTP WeChat + Moments
DingTalk dingtalk.py HTTP Alibaba messaging
Feishu feishu.py HTTP ByteDance platform
QQBot qqbot.py WebSocket/HTTP QQ platform
API Server api_server.py REST HTTP Generic webhook
Webhook webhook.py HTTP POST Generic receiver

Message Queueing

Photo Burst Handling:
├── Photos received during active agent run are queued
├── Stored in _pending_messages dict
└── Processed after current agent finishes

Concurrent Messages:
├── Each message spawns _process_message_background()
├── Active sessions tracked in _active_sessions dict
├── New messages to same chat: interrupt current agent
└── Interruption via asyncio.Event flag

Streaming (gateway/stream_consumer.py, 300+ lines)

GatewayStreamConsumer (line 48)

Progressive message editing for real-time response delivery:

class GatewayStreamConsumer:
    def __init__(self, adapter, chat_id, config):
        self.buffer = ""
        self.edit_interval = 0.3   # seconds between edits
        self.buffer_threshold = 40  # chars before first edit
    
    async def on_delta(self, text_chunk):
        """Buffer streamed tokens, rate-limit edits to avoid flooding."""
        self.buffer += text_chunk
        if should_edit():
            await self.adapter.edit_message(self.message_id, self.buffer)

Fallback: If edit transport fails, falls back to send-once (full message after completion).

Session Management (gateway/session.py, 2,000+ lines)

SessionSource (line 65)

@dataclass
class SessionSource:
    platform: str       # "telegram", "discord", etc.
    chat_id: str        # Platform-specific chat identifier
    user_id: str        # Platform-specific user identifier
    thread_id: str      # Thread/topic identifier (if applicable)

Session Reset Policies (gateway/config.py:100)

Policy Behavior
both Reset on idle timeout AND daily at configured hour
idle Reset only after idle timeout (default: 1440 min = 24h)
daily Reset daily at configured hour (default: 4 AM)
none Never auto-reset

PII Redaction

Session context tracks user/chat IDs with hashed values for logging, preventing PII from appearing in log files.

Session Expiry Watcher (gateway/run.py:2022)

Runs every 300 seconds:

  1. Scans for sessions past idle timeout
  2. Proactively flushes memories to disk
  3. Prevents blocking on next message arrival

Cron System (cron/)

Architecture

Cron Ticker (every 60s, background thread)
    │
    ├── cron/scheduler.py:tick()
    │   ├── Load jobs from ~/.hermes/cron/jobs.json
    │   ├── Filter for due jobs (croniter)
    │   └── Execute each due job
    │
    ├── Job Execution
    │   ├── Spawn subprocess with job's prompt
    │   ├── Capture output
    │   └── Save to ~/.hermes/cron/output/{job_id}/{timestamp}.md
    │
    └── Delivery (via DeliveryRouter)
        ├── "local"              → Save to files only
        ├── "origin"             → Back to originating platform
        ├── "telegram"           → Use home channel
        ├── "telegram:123456"    → Specific chat ID
        └── "discord:789"        → Specific channel

DeliveryRouter (gateway/delivery.py)

@dataclass
class DeliveryTarget:
    platform: str      # "telegram", "discord", etc.
    chat_id: str       # Target chat identifier
 
class DeliveryRouter:
    async def deliver(self, target: DeliveryTarget, content: str):
        adapter = self.adapters[target.platform]
        await adapter.send(target.chat_id, content)

Authorization & Pairing (gateway/pairing.py)

User Authorization Flow

New message from unknown user
    │
    ├── GATEWAY_ALLOW_ALL_USERS=true?
    │   └── Yes → Allow
    │
    ├── User in {PLATFORM}_ALLOWED_USERS?
    │   └── Yes → Allow
    │
    └── No → Generate pairing code
        ├── Display code to admin
        ├── User sends code back
        └── On match → Add to allowlist

Per-Platform Allowlists

TELEGRAM_ALLOWED_USERS="123456,789012"
DISCORD_ALLOWED_USERS="user#1234,user#5678"
SLACK_ALLOWED_USERS="U12345,U67890"

MCP Server Interface (mcp_serve.py, 300+ lines)

Exposes gateway conversations to external MCP clients (Claude Code, Cursor, etc.):

Tool Purpose
conversations_list() List all active sessions
conversation_get(id) Get session history
messages_read() Read message transcripts
messages_send() Send messages to chats
events_poll() Poll for new events
permissions_list_open() List pending approvals
permissions_respond() Approve/deny permissions

Hook System (gateway/hooks.py)

Event Lifecycle

Event When
gateway:startup Gateway process starts
session:start New session created
session:end Session expires/resets
agent:start Agent begins processing
agent:end Agent finishes response
command:* Slash command executed

Hook Discovery

~/.hermes/hooks/
├── gateway-startup.sh
├── session-start.py
└── agent-end.sh

Hooks are discovered from ~/.hermes/hooks/ and fired via HookRegistry.

CLI/TUI Interface (cli.py, 10,000 lines)

HermesCLI Class (line 1577)

The interactive terminal interface:

Key Features

Feature Implementation
Multiline editing prompt_toolkit with Enter to send, Alt+Enter for newline
Autocomplete Slash commands, model names, skill names
Streaming display Line-buffered rendering with partial updates
Interrupt handling Ctrl+C interrupts current agent run
History SQLite-backed conversation history
Tool progress Live spinner with tool name and status