Hermes Agent - Gateway & Messaging Architecture
Overview
The gateway is a single process that connects to 20+ messaging platforms simultaneously, routing messages to the agent core and streaming responses back. It manages sessions, handles commands, executes cron jobs, and provides graceful lifecycle management.
Architecture
┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐
│ Telegram │ │ Discord │ │ Slack │ │ WhatsApp │ │ Signal │ ...
│ Bot API │ │WebSocket │ │ Bolt │ │ Baileys │ │ HTTP API │
└────┬─────┘ └────┬─────┘ └────┬─────┘ └────┬─────┘ └────┬─────┘
│ │ │ │ │
▼ ▼ ▼ ▼ ▼
┌─────────────────────────────────────────────────────────────────┐
│ Platform Adapters │
│ (inherit BasePlatformAdapter) │
│ │
│ Each adapter: │
│ - Connects to platform API │
│ - Parses platform-specific message format │
│ - Normalizes to MessageEvent │
│ - Calls self._message_handler(event) │
│ - Formats and sends responses back │
└─────────────────────────────┬───────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ GatewayRunner │
│ (gateway/run.py) │
│ │
│ _handle_message(event): │
│ ├── Authorization check (user allowlist) │
│ ├── Command parsing (/new, /model, /stop, etc.) │
│ ├── Session lookup or creation │
│ ├── Context injection (platform, chat name, etc.) │
│ ├── Agent invocation (AIAgent.run_conversation) │
│ └── Response delivery (via adapter.send or stream_consumer) │
│ │
│ Background tasks: │
│ ├── Session expiry watcher (every 300s) │
│ ├── Platform reconnect watcher (exp backoff) │
│ └── Cron ticker (every 60s) │
└─────────────────────────────────────────────────────────────────┘
Entry Points
| Path | Invocation |
|---|---|
cli.py --gateway |
asyncio.run(start_gateway()) |
hermes gateway run |
Same as above |
| systemd service | hermes-gateway.service |
# cli.py:9870-9875
if gateway:
import asyncio
from gateway.run import start_gateway
print("Starting Hermes Gateway (messaging platforms)...")
asyncio.run(start_gateway())GatewayRunner (gateway/run.py, 9,800 lines)
Startup Sequence (async def start(), line 1727)
1. Load configuration
├── .env from ~/.hermes/.env
├── Bridge config.yaml values to environment variables
└── SSL certificate auto-detection (NixOS compat)
2. Initialize platform adapters
├── Loop through enabled platforms
├── Create adapters via _create_adapter() (line 2409)
├── Call adapter.connect()
└── Set message handler: adapter.set_message_handler(self._handle_message)
3. Spawn background tasks
├── Session expiry watcher (every 300s)
│ └── Proactively flush expired session memories
├── Platform reconnect watcher
│ └── Retries failed platforms with exponential backoff (max 300s, 20 retries)
└── Cron ticker (every 60s)
└── Executes due cron jobs, delivers results via DeliveryRouter
4. Hook system
├── Discover hooks from ~/.hermes/hooks/
└── Emit gateway:startup event
Message Processing Pipeline (_handle_message, line 2681)
Incoming MessageEvent
│
├── Step 1: Authorization
│ ├── Check user against allowlist (per-platform)
│ ├── If GATEWAY_ALLOW_ALL_USERS=false (default): reject unknown users
│ └── Pairing code flow for new users
│
├── Step 2: Command Detection
│ ├── event.get_command() extracts slash command
│ └── Route to dedicated handler (e.g., _handle_reset, _handle_model)
│
├── Step 3: Session Management
│ ├── Derive session_key from platform + chat_id + user_id
│ ├── Lookup or create session in cache
│ ├── Apply reset policy (both/idle/daily/none)
│ └── Load conversation history from SessionDB
│
├── Step 4: Context Injection
│ ├── Platform info (telegram/discord/slack/etc.)
│ ├── Chat name and type (group/DM/channel)
│ ├── Thread context
│ └── Voice memo transcription
│
├── Step 5: Agent Invocation
│ ├── Create or reuse AIAgent instance
│ ├── Call run_conversation(user_message)
│ └── Attach GatewayStreamConsumer for progressive editing
│
└── Step 6: Response Delivery
├── Stream tokens via progressive message editing
├── Handle media attachments (images, voice, documents)
└── Platform-specific formatting (Markdown escaping, etc.)
Slash Commands (line 2750+)
| Command | Handler Location | Purpose |
|---|---|---|
/status |
line 4411 | Show session status, running agents |
/reset, /new |
line 4281 | Reset session, clear history |
/stop |
line 4447 | Interrupt running agent |
/restart |
line 4479 | Gracefully restart gateway |
/help |
line 4518 | List available commands |
/model <name> |
line 4593 | Switch inference model |
/provider <name> |
line 4920 | Change provider |
/personality <name> |
line 4977 | Set system prompt personality |
/retry |
line 5052 | Rerun last message |
/undo |
line 5088 | Remove last exchange |
/set-home <target> |
line 5113 | Set default delivery destination |
/voice <mode> |
line 5156 | Enable/disable voice |
/compress |
- | Compress context |
/usage |
- | Show token usage |
/insights [days] |
- | Usage analytics |
/skills |
- | Browse available skills |
/platforms |
- | Platform status |
Platform Adapters (gateway/platforms/)
Base Class (gateway/platforms/base.py, 2,133 lines)
All adapters inherit BasePlatformAdapter:
class BasePlatformAdapter:
async def connect(self) # Establish platform connection
async def disconnect(self) # Clean shutdown
async def send(self, chat_id, content) # Send message
async def handle_message(self, event) # Entry point (line 1521)
async def send_image(self, chat_id, ...) # Platform-specific media
async def send_voice(self, chat_id, ...)
async def send_document(self, chat_id, ...)
def set_message_handler(self, handler) # Gateway sets thisSupported Platforms
| Platform | File | Connection Method | Notes |
|---|---|---|---|
| Telegram | telegram.py (1,800 lines) |
Bot API (long-poll/webhook) | Voice memos, forum topics |
| Discord | discord.py (2,000 lines) |
WebSocket | Voice channels, per-guild config |
| Slack | slack.py |
Bolt framework + Socket Mode | Thread support |
whatsapp.py |
Baileys (Node.js bridge) | E2E encrypted | |
| Signal | signal.py |
HTTP API | Direct messaging |
| Matrix | matrix.py |
nio library | E2E encryption, room management |
| Mattermost | mattermost.py |
WebSocket | Self-hosted Slack alternative |
email.py |
IMAP/SMTP | Full email conversations | |
| SMS | sms.py |
Twilio | Text messaging |
| Home Assistant | homeassistant.py |
WebSocket | IoT integration |
| BlueBubbles | bluebubbles.py |
HTTP | iMessage bridge |
| WeChat Work | wecom.py |
HTTP callback | Enterprise WeChat |
weixin.py |
HTTP | WeChat + Moments | |
| DingTalk | dingtalk.py |
HTTP | Alibaba messaging |
| Feishu | feishu.py |
HTTP | ByteDance platform |
| QQBot | qqbot.py |
WebSocket/HTTP | QQ platform |
| API Server | api_server.py |
REST HTTP | Generic webhook |
| Webhook | webhook.py |
HTTP POST | Generic receiver |
Message Queueing
Photo Burst Handling:
├── Photos received during active agent run are queued
├── Stored in _pending_messages dict
└── Processed after current agent finishes
Concurrent Messages:
├── Each message spawns _process_message_background()
├── Active sessions tracked in _active_sessions dict
├── New messages to same chat: interrupt current agent
└── Interruption via asyncio.Event flag
Streaming (gateway/stream_consumer.py, 300+ lines)
GatewayStreamConsumer (line 48)
Progressive message editing for real-time response delivery:
class GatewayStreamConsumer:
def __init__(self, adapter, chat_id, config):
self.buffer = ""
self.edit_interval = 0.3 # seconds between edits
self.buffer_threshold = 40 # chars before first edit
async def on_delta(self, text_chunk):
"""Buffer streamed tokens, rate-limit edits to avoid flooding."""
self.buffer += text_chunk
if should_edit():
await self.adapter.edit_message(self.message_id, self.buffer)Fallback: If edit transport fails, falls back to send-once (full message after completion).
Session Management (gateway/session.py, 2,000+ lines)
SessionSource (line 65)
@dataclass
class SessionSource:
platform: str # "telegram", "discord", etc.
chat_id: str # Platform-specific chat identifier
user_id: str # Platform-specific user identifier
thread_id: str # Thread/topic identifier (if applicable)Session Reset Policies (gateway/config.py:100)
| Policy | Behavior |
|---|---|
both |
Reset on idle timeout AND daily at configured hour |
idle |
Reset only after idle timeout (default: 1440 min = 24h) |
daily |
Reset daily at configured hour (default: 4 AM) |
none |
Never auto-reset |
PII Redaction
Session context tracks user/chat IDs with hashed values for logging, preventing PII from appearing in log files.
Session Expiry Watcher (gateway/run.py:2022)
Runs every 300 seconds:
- Scans for sessions past idle timeout
- Proactively flushes memories to disk
- Prevents blocking on next message arrival
Cron System (cron/)
Architecture
Cron Ticker (every 60s, background thread)
│
├── cron/scheduler.py:tick()
│ ├── Load jobs from ~/.hermes/cron/jobs.json
│ ├── Filter for due jobs (croniter)
│ └── Execute each due job
│
├── Job Execution
│ ├── Spawn subprocess with job's prompt
│ ├── Capture output
│ └── Save to ~/.hermes/cron/output/{job_id}/{timestamp}.md
│
└── Delivery (via DeliveryRouter)
├── "local" → Save to files only
├── "origin" → Back to originating platform
├── "telegram" → Use home channel
├── "telegram:123456" → Specific chat ID
└── "discord:789" → Specific channel
DeliveryRouter (gateway/delivery.py)
@dataclass
class DeliveryTarget:
platform: str # "telegram", "discord", etc.
chat_id: str # Target chat identifier
class DeliveryRouter:
async def deliver(self, target: DeliveryTarget, content: str):
adapter = self.adapters[target.platform]
await adapter.send(target.chat_id, content)Authorization & Pairing (gateway/pairing.py)
User Authorization Flow
New message from unknown user
│
├── GATEWAY_ALLOW_ALL_USERS=true?
│ └── Yes → Allow
│
├── User in {PLATFORM}_ALLOWED_USERS?
│ └── Yes → Allow
│
└── No → Generate pairing code
├── Display code to admin
├── User sends code back
└── On match → Add to allowlist
Per-Platform Allowlists
TELEGRAM_ALLOWED_USERS="123456,789012"
DISCORD_ALLOWED_USERS="user#1234,user#5678"
SLACK_ALLOWED_USERS="U12345,U67890"MCP Server Interface (mcp_serve.py, 300+ lines)
Exposes gateway conversations to external MCP clients (Claude Code, Cursor, etc.):
| Tool | Purpose |
|---|---|
conversations_list() |
List all active sessions |
conversation_get(id) |
Get session history |
messages_read() |
Read message transcripts |
messages_send() |
Send messages to chats |
events_poll() |
Poll for new events |
permissions_list_open() |
List pending approvals |
permissions_respond() |
Approve/deny permissions |
Hook System (gateway/hooks.py)
Event Lifecycle
| Event | When |
|---|---|
gateway:startup |
Gateway process starts |
session:start |
New session created |
session:end |
Session expires/resets |
agent:start |
Agent begins processing |
agent:end |
Agent finishes response |
command:* |
Slash command executed |
Hook Discovery
~/.hermes/hooks/
├── gateway-startup.sh
├── session-start.py
└── agent-end.sh
Hooks are discovered from ~/.hermes/hooks/ and fired via HookRegistry.
CLI/TUI Interface (cli.py, 10,000 lines)
HermesCLI Class (line 1577)
The interactive terminal interface:
- prompt_toolkit REPL with multiline editing, autocomplete, history
- Rich formatting for panels, progress bars, streaming output
- Tool progress tracking (spinner, status updates)
- Session persistence across conversations
- Slash command handling (shared with gateway)
Key Features
| Feature | Implementation |
|---|---|
| Multiline editing | prompt_toolkit with Enter to send, Alt+Enter for newline |
| Autocomplete | Slash commands, model names, skill names |
| Streaming display | Line-buffered rendering with partial updates |
| Interrupt handling | Ctrl+C interrupts current agent run |
| History | SQLite-backed conversation history |
| Tool progress | Live spinner with tool name and status |