# Butler-Shell 实施计划

> **For agentic workers:** REQUIRED: Use superpowers:subagent-driven-development (if subagents available) or superpowers:executing-plans to implement this plan. Steps use checkbox (`- [ ]`) syntax for tracking.

**Goal:** 构建基于 Inngest 与 tmux 的持久化 Shell 助手，通过 QQ 接收指令并维护持久化会话

**Architecture:** 分层单体架构 - QQ Gateway → Inngest Workflows → Session Manager (TmuxWrapper) → tmux sessions

**Tech Stack:** Python 3.11+, Inngest (Python SDK), tmux, NapCatQQ (WebSocket), Pydantic, SQLite, FastAPI

---

## 文件结构映射

### 新建文件清单

```
src/butler/
├── __init__.py                 # 包初始化
├── main.py                     # FastAPI 入口 + Inngest Serve
├── config.py                   # 配置管理
│
├── session/
│   ├── __init__.py
│   ├── state.py                # SessionState, SessionMode, PtyState
│   ├── wrapper.py              # TmuxWrapper 核心类
│   └── detector.py             # 模式检测器
│
├── gateway/
│   ├── __init__.py
│   ├── events.py               # NapCatEvent 消息定义
│   └── napcat.py               # NapCatQQ WebSocket 客户端
│
├── workflows/
│   ├── __init__.py
│   ├── handle_message.py       # 主消息处理流
│   ├── guardrail.py            # 命令准入审批
│   └── compaction.py           # 上下文压缩
│
├── skills/
│   ├── __init__.py
│   ├── base.py                 # SkillBase 基类
│   ├── log_skill.py            # 日志记录 Skill
│   └── system_skill.py         # 系统信息 Skill
│
└── security/
    ├── __init__.py
    ├── auth.py                 # 认证授权
    └── sanitizer.py            # 敏感信息过滤

tests/
├── conftest.py                 # pytest fixtures
├── test_wrapper.py             # TmuxWrapper 单元测试
├── test_detector.py            # 模式检测单元测试
├── test_workflows.py           # 工作流集成测试
├── test_skills.py              # Skill 单元测试
└── test_security.py            # 安全模块测试

config.yaml                     # 配置文件
docker-compose.yml              # Docker 部署配置
requirements.txt                # Python 依赖
pyproject.toml                  # 项目配置
```

---

## Chunk 1: 项目基础设施

### Task 1.1: 项目初始化

**Files:**
- Create: `pyproject.toml`
- Create: `requirements.txt`
- Create: `config.yaml`
- Create: `.env.example`
- Create: `src/butler/__init__.py`

- [ ] **Step 1: 创建 pyproject.toml**

```toml
[project]
name = "butler-shell"
version = "0.1.0"
description = "Persistent Shell Assistant with QQ Bridge"
requires-python = ">=3.11"
dependencies = [
    "inngest>=0.2.0",
    "pydantic>=2.0.0",
    "pydantic-settings>=2.0.0",
    "fastapi>=0.109.0",
    "uvicorn>=0.27.0",
    "websockets>=12.0",
    "aiosqlite>=0.19.0",
    "pyyaml>=6.0",
    "python-dotenv>=1.0.0",
    "httpx>=0.26.0",
]

[project.optional-dependencies]
dev = [
    "pytest>=7.4.0",
    "pytest-asyncio>=0.23.0",
    "pytest-cov>=4.1.0",
    "ruff>=0.1.0",
    "mypy>=1.8.0",
]

[build-system]
requires = ["setuptools>=68.0"]
build-backend = "setuptools.build_meta"

[tool.setuptools.packages.find]
where = ["src"]

[tool.ruff]
line-length = 100
target-version = "py311"

[tool.mypy]
python_version = "3.11"
strict = true

[tool.pytest.ini_options]
asyncio_mode = "auto"
testpaths = ["tests"]
```

- [ ] **Step 2: 创建 requirements.txt**

```text
inngest>=0.2.0
pydantic>=2.0.0
pydantic-settings>=2.0.0
fastapi>=0.109.0
uvicorn>=0.27.0
websockets>=12.0
aiosqlite>=0.19.0
pyyaml>=6.0
python-dotenv>=1.0.0
httpx>=0.26.0
anthropic>=0.18.0
```

- [ ] **Step 3: 创建 config.yaml**

```yaml
butler:
  session:
    idle_timeout: 3600
    max_per_user: 1
    max_total: 100
    prefix: "butler_"

  guardrail:
    enabled: true
    approval_timeout: 300

  security:
    allowed_users: []

  logging:
    level: INFO
    format: "%(asctime)s - %(name)s - %(levelname)s - %(message)s"

  napcat:
    ws_url: "ws://localhost:3001"
    token: ""

  inngest:
    event_key: ""
    signing_key: ""
```

- [ ] **Step 4: 创建 .env.example**

```bash
# Butler-Shell 环境变量配置

# NapCatQQ 配置
NAPCAT_WS_URL=ws://localhost:3001
NAPCAT_TOKEN=your_napcat_token

# Inngest 配置
INNGEST_EVENT_KEY=your_event_key
INNGEST_SIGNING_KEY=your_signing_key

# 安全配置
BUTLER_ALLOWED_USERS=12345678,87654321

# Claude API (可选，用于上下文压缩)
ANTHROPIC_API_KEY=your_claude_api_key
```

- [ ] **Step 5: 创建 src/butler/__init__.py**

```python
"""Butler-Shell: Persistent Shell Assistant with QQ Bridge."""

__version__ = "0.1.0"
```

- [ ] **Step 6: 提交基础设施**

```bash
git add pyproject.toml requirements.txt config.yaml .env.example src/butler/__init__.py
git commit -m "chore: initialize project structure"
```

---

### Task 1.2: 配置管理模块

**Files:**
- Create: `src/butler/config.py`
- Create: `tests/test_config.py`

- [ ] **Step 1: 编写配置测试**

```python
# tests/test_config.py
import os
import pytest
from pathlib import Path
from butler.config import Settings, load_config


def test_default_settings():
    """测试默认配置值"""
    settings = Settings()
    assert settings.session_idle_timeout == 3600
    assert settings.session_max_per_user == 1
    assert settings.guardrail_enabled is True


def test_env_override(monkeypatch):
    """测试环境变量覆盖"""
    monkeypatch.setenv("BUTLER_ALLOWED_USERS", "123,456")
    settings = Settings()
    assert "123" in settings.allowed_users
    assert "456" in settings.allowed_users


def test_load_config_from_yaml(tmp_path):
    """测试从 YAML 加载配置"""
    config_file = tmp_path / "config.yaml"
    config_file.write_text("""
butler:
  session:
    idle_timeout: 7200
""")
    config = load_config(config_file)
    assert config["butler"]["session"]["idle_timeout"] == 7200
```

- [ ] **Step 2: 运行测试确认失败**

```bash
pytest tests/test_config.py -v
```

Expected: FAIL (模块不存在)

- [ ] **Step 3: 实现配置模块**

```python
# src/butler/config.py
"""配置管理模块。"""

import os
from pathlib import Path
from typing import Any

import yaml
from pydantic import Field
from pydantic_settings import BaseSettings


class Settings(BaseSettings):
    """应用配置。"""

    # 会话配置
    session_idle_timeout: int = Field(default=3600, description="会话空闲超时(秒)")
    session_max_per_user: int = Field(default=1, description="每用户最大会话数")
    session_max_total: int = Field(default=100, description="全局最大会话数")
    session_prefix: str = Field(default="butler_", description="会话名前缀")

    # 审批配置
    guardrail_enabled: bool = Field(default=True, description="是否启用命令审批")
    guardrail_approval_timeout: int = Field(default=300, description="审批超时(秒)")

    # 安全配置
    allowed_users: list[str] = Field(default_factory=list, description="允许的用户ID列表")

    # NapCat 配置
    napcat_ws_url: str = Field(default="ws://localhost:3001", description="NapCat WebSocket URL")
    napcat_token: str = Field(default="", description="NapCat 访问令牌")

    # Inngest 配置
    inngest_event_key: str = Field(default="", description="Inngest Event Key")
    inngest_signing_key: str = Field(default="", description="Inngest Signing Key")

    # 日志配置
    log_level: str = Field(default="INFO", description="日志级别")

    # Claude API
    anthropic_api_key: str = Field(default="", description="Anthropic API Key")

    model_config = {"env_prefix": "BUTLER_", "env_file": ".env", "extra": "ignore"}

    def is_user_allowed(self, user_id: str) -> bool:
        """检查用户是否在白名单中。"""
        if not self.allowed_users:
            return True  # 未配置白名单时允许所有用户
        return user_id in self.allowed_users


def load_config(config_path: Path | None = None) -> dict[str, Any]:
    """从 YAML 文件加载配置。"""
    if config_path is None:
        config_path = Path("config.yaml")

    if not config_path.exists():
        return {}

    with open(config_path) as f:
        return yaml.safe_load(f) or {}


def create_settings(config_path: Path | None = None) -> Settings:
    """创建配置实例，合并 YAML 和环境变量。"""
    config = load_config(config_path)
    butler_config = config.get("butler", {})

    # 展开嵌套配置
    flat_config: dict[str, Any] = {}

    session_cfg = butler_config.get("session", {})
    flat_config["session_idle_timeout"] = session_cfg.get("idle_timeout", 3600)
    flat_config["session_max_per_user"] = session_cfg.get("max_per_user", 1)
    flat_config["session_max_total"] = session_cfg.get("max_total", 100)
    flat_config["session_prefix"] = session_cfg.get("prefix", "butler_")

    guardrail_cfg = butler_config.get("guardrail", {})
    flat_config["guardrail_enabled"] = guardrail_cfg.get("enabled", True)
    flat_config["guardrail_approval_timeout"] = guardrail_cfg.get("approval_timeout", 300)

    security_cfg = butler_config.get("security", {})
    allowed = security_cfg.get("allowed_users", [])
    flat_config["allowed_users"] = allowed if allowed else []

    napcat_cfg = butler_config.get("napcat", {})
    flat_config["napcat_ws_url"] = napcat_cfg.get("ws_url", "ws://localhost:3001")
    flat_config["napcat_token"] = napcat_cfg.get("token", "")

    logging_cfg = butler_config.get("logging", {})
    flat_config["log_level"] = logging_cfg.get("level", "INFO")

    return Settings(**flat_config)


# 全局配置实例
settings = create_settings()
```

- [ ] **Step 4: 运行测试确认通过**

```bash
pytest tests/test_config.py -v
```

Expected: PASS

- [ ] **Step 5: 提交配置模块**

```bash
git add src/butler/config.py tests/test_config.py
git commit -m "feat: add configuration management module"
```

---

### Task 1.3: 核心数据模型

**Files:**
- Create: `src/butler/session/__init__.py`
- Create: `src/butler/session/state.py`
- Create: `tests/test_state.py`

- [ ] **Step 1: 编写数据模型测试**

```python
# tests/test_state.py
import pytest
from datetime import datetime
from butler.session.state import SessionMode, PtyState, SessionState, MessageContext


def test_session_mode_enum():
    """测试会话模式枚举"""
    assert SessionMode.NATURAL_LANGUAGE.value == "nl"
    assert SessionMode.INTERACTIVE.value == "interactive"


def test_pty_state_enum():
    """测试 PTY 状态枚举"""
    assert PtyState.IDLE.value == "idle"
    assert PtyState.RUNNING.value == "running"
    assert PtyState.BLOCKED.value == "blocked"
    assert PtyState.UNKNOWN.value == "unknown"


def test_session_state_creation():
    """测试会话状态创建"""
    state = SessionState(
        session_id="butler_test123",
        user_id="test123",
        pwd="/home/user",
        env={"HOME": "/home/user"},
    )
    assert state.session_id == "butler_test123"
    assert state.mode == SessionMode.NATURAL_LANGUAGE
    assert state.pwd == "/home/user"
    assert state.env["HOME"] == "/home/user"


def test_session_state_default_mode():
    """测试默认模式为自然语言模式"""
    state = SessionState(session_id="test", user_id="test")
    assert state.mode == SessionMode.NATURAL_LANGUAGE


def test_message_context_creation():
    """测试消息上下文创建"""
    ctx = MessageContext(
        message_id="msg123",
        user_id="user123",
        content="ls -la",
    )
    assert ctx.message_id == "msg123"
    assert ctx.is_command is False
    assert ctx.requires_approval is False


def test_message_context_command_detection():
    """测试命令检测"""
    ctx = MessageContext(
        message_id="msg123",
        user_id="user123",
        content="/log recent",
        is_command=True,
    )
    assert ctx.is_command is True
```

- [ ] **Step 2: 运行测试确认失败**

```bash
pytest tests/test_state.py -v
```

Expected: FAIL (模块不存在)

- [ ] **Step 3: 创建 session 包初始化**

```python
# src/butler/session/__init__.py
"""会话管理模块。"""

from butler.session.state import MessageContext, PtyState, SessionMode, SessionState

__all__ = ["SessionMode", "PtyState", "SessionState", "MessageContext"]
```

- [ ] **Step 4: 实现数据模型**

```python
# src/butler/session/state.py
"""会话状态数据模型。"""

from datetime import datetime
from enum import Enum
from typing import Optional

from pydantic import BaseModel, Field


class SessionMode(str, Enum):
    """会话运行模式。"""

    NATURAL_LANGUAGE = "nl"  # 自然语言模式 - 通过 LLM 解析
    INTERACTIVE = "interactive"  # 交互模式 - 直接透传到 PTY


class PtyState(str, Enum):
    """PTY 进程状态。"""

    IDLE = "idle"  # 空闲，等待输入
    RUNNING = "running"  # 正在执行命令
    BLOCKED = "blocked"  # 阻塞中（如等待 I/O）
    UNKNOWN = "unknown"  # 状态检测失败


class SessionState(BaseModel):
    """会话状态快照。"""

    session_id: str = Field(..., description="tmux session 名称")
    user_id: str = Field(..., description="QQ 用户 ID")
    mode: SessionMode = Field(default=SessionMode.NATURAL_LANGUAGE, description="当前模式")

    # Shell 状态
    pwd: str = Field(default="", description="当前工作目录")
    env: dict[str, str] = Field(default_factory=dict, description="关键环境变量")
    last_output: str = Field(default="", description="最近终端输出摘要")

    # 交互模式追踪
    interactive_process: Optional[str] = Field(
        default=None, description="当前交互进程名 (vim, top, etc.)"
    )

    # 元数据
    created_at: datetime = Field(default_factory=datetime.utcnow, description="创建时间")
    updated_at: datetime = Field(default_factory=datetime.utcnow, description="更新时间")
    last_active_at: datetime = Field(default_factory=datetime.utcnow, description="最后活跃时间")

    def touch(self) -> None:
        """更新活跃时间。"""
        self.updated_at = datetime.utcnow()
        self.last_active_at = datetime.utcnow()

    def idle_seconds(self) -> int:
        """计算空闲秒数。"""
        delta = datetime.utcnow() - self.last_active_at
        return int(delta.total_seconds())


class MessageContext(BaseModel):
    """消息处理上下文。"""

    message_id: str = Field(..., description="消息唯一 ID")
    user_id: str = Field(..., description="QQ 用户 ID")
    content: str = Field(..., description="消息内容")

    is_command: bool = Field(default=False, description="是否以 / 开头")
    requires_approval: bool = Field(default=False, description="危险命令标记")
    session: Optional[SessionState] = Field(default=None, description="关联的会话状态")

    model_config = {"frozen": False}


class NapCatEvent(BaseModel):
    """NapCatQQ 事件格式。"""

    time: int = Field(..., description="事件时间戳")
    self_id: int = Field(..., description="机器人 QQ 号")
    post_type: str = Field(..., description="事件类型")
    message_type: str = Field(default="", description="消息类型")
    user_id: int = Field(default=0, description="发送者 ID")
    message: str = Field(default="", description="消息内容")
    raw_message: str = Field(default="", description="原始消息")

    @property
    def is_private_message(self) -> bool:
        """是否是私聊消息。"""
        return self.post_type == "message" and self.message_type == "private"
```

- [ ] **Step 5: 运行测试确认通过**

```bash
pytest tests/test_state.py -v
```

Expected: PASS

- [ ] **Step 6: 提交数据模型**

```bash
git add src/butler/session/ tests/test_state.py
git commit -m "feat: add core data models (SessionState, PtyState, MessageContext)"
```

---

## Chunk 2: 安全模块

### Task 2.1: 认证授权模块

**Files:**
- Create: `src/butler/security/__init__.py`
- Create: `src/butler/security/auth.py`
- Create: `tests/test_auth.py`

- [ ] **Step 1: 编写认证测试**

```python
# tests/test_auth.py
import pytest
from butler.security.auth import is_authorized, check_user_permission


def test_is_authorized_with_empty_whitelist():
    """空白名单允许所有用户"""
    assert is_authorized("any_user", []) is True


def test_is_authorized_with_whitelist():
    """白名单限制访问"""
    whitelist = ["user123", "user456"]
    assert is_authorized("user123", whitelist) is True
    assert is_authorized("user789", whitelist) is False


def test_check_user_permission_allowed():
    """允许的用户通过检查"""
    result, msg = check_user_permission("user123", ["user123"])
    assert result is True
    assert msg is None


def test_check_user_permission_denied():
    """禁止的用户被拒绝"""
    result, msg = check_user_permission("user789", ["user123"])
    assert result is False
    assert "无权限" in msg
```

- [ ] **Step 2: 运行测试确认失败**

```bash
pytest tests/test_auth.py -v
```

Expected: FAIL

- [ ] **Step 3: 实现认证模块**

```python
# src/butler/security/__init__.py
"""安全模块。"""

from butler.security.auth import check_user_permission, is_authorized

__all__ = ["is_authorized", "check_user_permission"]
```

```python
# src/butler/security/auth.py
"""认证授权模块。"""

from typing import Optional


def is_authorized(user_id: str, allowed_users: list[str]) -> bool:
    """
    检查用户是否被授权。

    Args:
        user_id: 用户 ID
        allowed_users: 允许的用户 ID 列表

    Returns:
        如果空白名单或用户在白名单中返回 True
    """
    if not allowed_users:
        return True
    return user_id in allowed_users


def check_user_permission(user_id: str, allowed_users: list[str]) -> tuple[bool, Optional[str]]:
    """
    检查用户权限并返回结果和消息。

    Args:
        user_id: 用户 ID
        allowed_users: 允许的用户 ID 列表

    Returns:
        (是否允许, 错误消息或 None)
    """
    if not is_authorized(user_id, allowed_users):
        return False, f"用户 {user_id} 无权限使用此服务"
    return True, None
```

- [ ] **Step 4: 运行测试确认通过**

```bash
pytest tests/test_auth.py -v
```

Expected: PASS

- [ ] **Step 5: 提交认证模块**

```bash
git add src/butler/security/ tests/test_auth.py
git commit -m "feat: add authentication module"
```

---

### Task 2.2: 危险命令检测

**Files:**
- Create: `src/butler/security/guardrail.py`
- Create: `tests/test_guardrail.py`

- [ ] **Step 1: 编写危险命令检测测试**

```python
# tests/test_guardrail.py
import pytest
from butler.security.guardrail import is_dangerous_command, DANGEROUS_PATTERNS


class TestDangerousCommandDetection:
    """危险命令检测测试。"""

    def test_safe_commands(self):
        """安全命令不被标记"""
        assert is_dangerous_command("ls -la") is False
        assert is_dangerous_command("cat file.txt") is False
        assert is_dangerous_command("echo hello") is False
        assert is_dangerous_command("pwd") is False

    def test_rm_rf_detection(self):
        """检测 rm -rf"""
        assert is_dangerous_command("rm -rf /") is True
        assert is_dangerous_command("rm -rf ./node_modules") is True
        assert is_dangerous_command("rm -fr /tmp") is True

    def test_chmod_777_detection(self):
        """检测 chmod 777"""
        assert is_dangerous_command("chmod 777 /tmp") is True
        assert is_dangerous_command("chmod -R 777 /var") is True

    def test_sudo_detection(self):
        """检测 sudo 提权"""
        assert is_dangerous_command("sudo rm file") is True
        assert is_dangerous_command("sudo apt update") is True

    def test_command_injection_detection(self):
        """检测命令注入"""
        assert is_dangerous_command("sh -c 'rm -rf /'") is True
        assert is_dangerous_command("bash -c 'evil'") is True
        assert is_dangerous_command("echo 'rm -rf' | bash") is True

    def test_disk_operations_detection(self):
        """检测磁盘操作"""
        assert is_dangerous_command("dd if=/dev/zero of=/dev/sda") is True
        assert is_dangerous_command("mkfs.ext4 /dev/sda1") is True
        assert is_dangerous_command("fdisk /dev/sda") is True

    def test_system_operations_detection(self):
        """检测系统操作"""
        assert is_dangerous_command("shutdown now") is True
        assert is_dangerous_command("reboot") is True
        assert is_dangerous_command("iptables -F") is True

    def test_safe_rm_single_file(self):
        """单独删除文件是安全的"""
        assert is_dangerous_command("rm file.txt") is False
        assert is_dangerous_command("rm -i file.txt") is False
```

- [ ] **Step 2: 运行测试确认失败**

```bash
pytest tests/test_guardrail.py -v
```

Expected: FAIL

- [ ] **Step 3: 实现危险命令检测**

```python
# src/butler/security/guardrail.py
"""命令安全检测模块。"""

import re
import shlex
from typing import FrozenSet

# 危险命令正则模式
DANGEROUS_PATTERNS: list[str] = [
    # 基础危险命令
    r"\brm\s+(-[rf]+\s+|.*-rf)",  # rm -rf
    r"\bchmod\s+(-R\s+)?777",  # chmod 777
    r"\bchown\s+",  # chown
    r"\bdd\s+",  # dd
    r"\bmkfs\b",  # mkfs
    r"\b(fdisk|parted)\b",  # 分区工具
    r">\s*/dev/sd",  # 写入磁盘设备
    r"\biptables\b",  # iptables
    r"\bshutdown\b",  # shutdown
    r"\breboot\b",  # reboot
    # 绕过防护
    r"\bsudo\b",  # sudo 提权
    r"\bsh\s+-c\b",  # 命令嵌套
    r"\bbash\s+-c\b",  # 命令嵌套
    r"\|.*\b(bash|sh)\b",  # 管道执行
    r"\$\([^)]+\)",  # 命令替换 $()
    r"`[^`]+`",  # 反引号命令替换
]

# 危险命令集合（用于 shlex 解析后检查）
DANGEROUS_COMMANDS: FrozenSet[str] = frozenset([
    "rm", "chmod", "chown", "dd", "mkfs", "fdisk", "parted",
    "iptables", "shutdown", "reboot", "init",
])

# 编译正则表达式
_COMPILED_PATTERNS = [re.compile(p, re.IGNORECASE) for p in DANGEROUS_PATTERNS]


def is_dangerous_command(cmd: str) -> bool:
    """
    检查命令是否危险。

    使用多层检测：
    1. 正则匹配基础检测
    2. shlex 解析命令结构

    Args:
        cmd: 要检查的命令字符串

    Returns:
        如果命令危险返回 True
    """
    # 1. 正则匹配
    for pattern in _COMPILED_PATTERNS:
        if pattern.search(cmd):
            return True

    # 2. 解析命令结构
    try:
        parts = shlex.split(cmd)
        if parts and parts[0] in DANGEROUS_COMMANDS:
            # 单独 rm file 是安全的，需要额外参数才危险
            if parts[0] == "rm":
                # 检查是否有 -r/-f 递归/强制标志
                for part in parts[1:]:
                    if part.startswith("-") and ("r" in part or "f" in part):
                        return True
                return False
            return True
    except ValueError:
        # 解析失败，保守处理
        return True

    return False


def get_danger_reason(cmd: str) -> str | None:
    """
    获取命令被标记为危险的原因。

    Args:
        cmd: 命令字符串

    Returns:
        危险原因描述，如果安全则返回 None
    """
    if not is_dangerous_command(cmd):
        return None

    if "rm" in cmd and ("-rf" in cmd or "-fr" in cmd):
        return "递归强制删除文件"
    if "chmod" in cmd and "777" in cmd:
        return "开放所有权限"
    if "sudo" in cmd:
        return "需要 root 权限"
    if "dd" in cmd:
        return "磁盘操作"
    if any(x in cmd for x in ["shutdown", "reboot"]):
        return "系统操作"

    return "潜在危险命令"
```

- [ ] **Step 4: 运行测试确认通过**

```bash
pytest tests/test_guardrail.py -v
```

Expected: PASS

- [ ] **Step 5: 提交危险命令检测**

```bash
git add src/butler/security/guardrail.py tests/test_guardrail.py
git commit -m "feat: add dangerous command detection"
```

---

### Task 2.3: 敏感信息过滤

**Files:**
- Create: `src/butler/security/sanitizer.py`
- Create: `tests/test_sanitizer.py`

- [ ] **Step 1: 编写敏感信息过滤测试**

```python
# tests/test_sanitizer.py
import pytest
from butler.security.sanitizer import sanitize_output, SENSITIVE_PATTERNS


def test_sanitize_password():
    """过滤密码"""
    output = "password=secret123"
    assert sanitize_output(output) == "password=[REDACTED]"

    output = "export DB_PASSWORD=mysecretpass"
    assert "[REDACTED]" in sanitize_output(output)


def test_sanitize_api_key():
    """过滤 API Key"""
    output = "api_key=sk-1234567890abcdef"
    assert sanitize_output(output) == "api_key=[REDACTED]"

    output = "API_KEY = my_secret_key"
    assert "[REDACTED]" in sanitize_output(output)


def test_sanitize_bearer_token():
    """过滤 Bearer Token"""
    output = "Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9"
    assert "[REDACTED]" in sanitize_output(output)


def test_sanitize_private_key():
    """过滤私钥"""
    output = "-----BEGIN RSA PRIVATE KEY-----\nMIIEpAIBAAKCAQEA..."
    assert "[REDACTED]" in sanitize_output(output)


def test_sanitize_preserves_safe_content():
    """保留安全内容"""
    output = "User logged in successfully\nFiles: 10\nDone"
    assert sanitize_output(output) == output


def test_sanitize_multiple_secrets():
    """过滤多个敏感信息"""
    output = "password=secret1 api_key=key1 token=tok1"
    sanitized = sanitize_output(output)
    assert "[REDACTED]" in sanitized
    assert "secret1" not in sanitized
    assert "key1" not in sanitized
```

- [ ] **Step 2: 运行测试确认失败**

```bash
pytest tests/test_sanitizer.py -v
```

Expected: FAIL

- [ ] **Step 3: 实现敏感信息过滤**

```python
# src/butler/security/sanitizer.py
"""敏感信息过滤模块。"""

import re
from typing import Final

# 敏感信息正则模式
SENSITIVE_PATTERNS: Final[list[str]] = [
    r"(password|passwd|pwd)\s*=\s*\S+",
    r"(api[_-]?key|apikey)\s*[=:]\s*\S+",
    r"(token|access_token|auth_token)\s*[=:]\s*\S+",
    r"(secret|secret_key)\s*[=:]\s*\S+",
    r"Bearer\s+\S+",
    r"-----BEGIN\s+.*PRIVATE\s+KEY-----",
    r"-----BEGIN\s+.*CERTIFICATE-----",
]

# 编译正则表达式
_COMPILED_SANITIZERS = [
    (re.compile(pattern, re.IGNORECASE), "[REDACTED]")
    for pattern in SENSITIVE_PATTERNS
]


def sanitize_output(output: str) -> str:
    """
    过滤输出中的敏感信息。

    将密码、API Key、Token 等替换为 [REDACTED]。

    Args:
        output: 原始输出字符串

    Returns:
        过滤后的安全输出
    """
    sanitized = output
    for pattern, replacement in _COMPILED_SANITIZERS:
        sanitized = pattern.sub(replacement, sanitized)
    return sanitized


def contains_secrets(text: str) -> bool:
    """
    检查文本是否包含敏感信息。

    Args:
        text: 要检查的文本

    Returns:
        如果包含敏感信息返回 True
    """
    for pattern, _ in _COMPILED_SANITIZERS:
        if pattern.search(text):
            return True
    return False
```

- [ ] **Step 4: 运行测试确认通过**

```bash
pytest tests/test_sanitizer.py -v
```

Expected: PASS

- [ ] **Step 5: 提交敏感信息过滤**

```bash
git add src/butler/security/sanitizer.py tests/test_sanitizer.py
git commit -m "feat: add sensitive information sanitizer"
```

---

## Chunk 3: TmuxWrapper 核心类

### Task 3.1: TmuxWrapper 基础功能

**Files:**
- Create: `src/butler/session/wrapper.py`
- Create: `tests/test_wrapper.py`
- Create: `tests/conftest.py`

- [ ] **Step 1: 创建 pytest fixtures**

```python
# tests/conftest.py
"""Pytest 全局 fixtures。"""

import pytest
from unittest.mock import Mock, patch
from butler.session.wrapper import TmuxWrapper


@pytest.fixture
def mock_subprocess():
    """Mock subprocess.run 用于测试 TmuxWrapper。"""
    with patch("subprocess.run") as mock_run:
        yield mock_run


@pytest.fixture
def tmux_wrapper():
    """创建 TmuxWrapper 实例。"""
    return TmuxWrapper(session_prefix="test_")


@pytest.fixture
def sample_session_state():
    """创建示例会话状态。"""
    from butler.session.state import SessionState, SessionMode
    return SessionState(
        session_id="test_user123",
        user_id="user123",
        pwd="/home/user",
        env={"HOME": "/home/user"},
    )
```

- [ ] **Step 2: 编写 TmuxWrapper 基础测试**

```python
# tests/test_wrapper.py
"""TmuxWrapper 单元测试。"""

import pytest
from unittest.mock import Mock, patch
from butler.session.wrapper import TmuxWrapper, PtyState, SessionMode


class TestTmuxWrapperBasic:
    """TmuxWrapper 基础功能测试。"""

    def test_create_session(self, mock_subprocess, tmux_wrapper):
        """测试创建 tmux session。"""
        mock_subprocess.return_value = Mock(returncode=0)
        
        session_name = tmux_wrapper.create_session("user123")
        
        assert session_name == "test_user123"
        mock_subprocess.assert_called_once()
        args = mock_subprocess.call_args[0][0]
        assert "tmux" in args
        assert "new-session" in args
        assert "test_user123" in args

    def test_session_exists_true(self, mock_subprocess, tmux_wrapper):
        """测试 session 存在检测 - 存在。"""
        mock_subprocess.return_value = Mock(returncode=0)
        
        result = tmux_wrapper.session_exists("test_user123")
        
        assert result is True

    def test_session_exists_false(self, mock_subprocess, tmux_wrapper):
        """测试 session 存在检测 - 不存在。"""
        mock_subprocess.return_value = Mock(returncode=1)
        
        result = tmux_wrapper.session_exists("test_user123")
        
        assert result is False

    def test_kill_session(self, mock_subprocess, tmux_wrapper):
        """测试销毁 session。"""
        tmux_wrapper.kill_session("test_user123")
        
        args = mock_subprocess.call_args[0][0]
        assert "kill-session" in args
        assert "test_user123" in args

    def test_send_keys_normal(self, mock_subprocess, tmux_wrapper):
        """测试发送普通命令。"""
        tmux_wrapper.send_keys("test_session", "ls -la", enter=True)
        
        # 应该调用两次：send-keys -l 和 Enter
        assert mock_subprocess.call_count == 2

    def test_send_keys_no_enter(self, mock_subprocess, tmux_wrapper):
        """测试发送不含 Enter 的命令。"""
        tmux_wrapper.send_keys("test_session", "ls -la", enter=False)
        
        # 应该只调用一次
        assert mock_subprocess.call_count == 1

    def test_send_keys_special_key(self, mock_subprocess, tmux_wrapper):
        """测试发送特殊键。"""
        tmux_wrapper.send_keys("test_session", "C-c")
        
        args = mock_subprocess.call_args[0][0]
        assert "C-c" in args

    def test_send_raw(self, mock_subprocess, tmux_wrapper):
        """测试发送原始数据。"""
        tmux_wrapper.send_raw("test_session", "raw data")
        
        args = mock_subprocess.call_args[0][0]
        assert "-l" in args
        assert "raw data" in args

    def test_capture_pane(self, mock_subprocess, tmux_wrapper):
        """测试抓取终端内容。"""
        mock_subprocess.return_value = Mock(stdout="line1\nline2\nline3", returncode=0)
        
        result = tmux_wrapper.capture_pane("test_session")
        
        assert "line1" in result
        args = mock_subprocess.call_args[0][0]
        assert "capture-pane" in args
```

- [ ] **Step 3: 运行测试确认失败**

```bash
pytest tests/test_wrapper.py -v
```

Expected: FAIL (模块不存在)

- [ ] **Step 4: 实现 TmuxWrapper 基础类**

```python
# src/butler/session/wrapper.py
"""Tmux 会话封装器。"""

import subprocess
from pathlib import Path
from typing import Final, Optional

from butler.session.state import PtyState, SessionMode


class TmuxWrapper:
    """tmux 会话封装器，处理底层 PTY 交互。"""

    # 交互模式进程名单
    INTERACTIVE_PROCESSES: Final[frozenset[str]] = frozenset([
        "vim", "nvim", "vi",
        "top", "htop", "btop",
        "less", "more", "man",
        "claude", "aider",
        "python", "node", "irb", "ipython",
    ])

    # 快捷键映射
    SPECIAL_KEYS: Final[dict[str, str]] = {
        "C-c": "C-c", "C-d": "C-d", "C-z": "C-z",
        "C-a": "C-a", "C-e": "C-e", "C-u": "C-u",
        "C-k": "C-k", "C-w": "C-w", "C-r": "C-r",
        "Esc": "Escape", "Tab": "Tab",
        "Up": "Up", "Down": "Down", "Left": "Left", "Right": "Right",
        "Enter": "Enter", "Space": "Space",
    }

    def __init__(self, session_prefix: str = "butler_"):
        """
        初始化 TmuxWrapper。

        Args:
            session_prefix: tmux session 名称前缀
        """
        self.session_prefix = session_prefix

    # ========== 会话生命周期 ==========

    def create_session(self, user_id: str) -> str:
        """
        为用户创建独立 tmux session。

        Args:
            user_id: 用户 ID

        Returns:
            创建的 session 名称
        """
        session_name = f"{self.session_prefix}{user_id}"
        subprocess.run(
            ["tmux", "new-session", "-d", "-s", session_name],
            check=True,
            capture_output=True,
        )
        return session_name

    def session_exists(self, session_name: str) -> bool:
        """
        检查 session 是否存在。

        Args:
            session_name: tmux session 名称

        Returns:
            存在返回 True
        """
        result = subprocess.run(
            ["tmux", "has-session", "-t", session_name],
            capture_output=True,
        )
        return result.returncode == 0

    def kill_session(self, session_name: str) -> None:
        """
        销毁 session。

        Args:
            session_name: tmux session 名称
        """
        subprocess.run(
            ["tmux", "kill-session", "-t", session_name],
            capture_output=True,
        )

    def get_or_create_session(self, user_id: str) -> str:
        """
        获取或创建用户 session。

        Args:
            user_id: 用户 ID

        Returns:
            session 名称
        """
        session_name = f"{self.session_prefix}{user_id}"
        if not self.session_exists(session_name):
            return self.create_session(user_id)
        return session_name

    # ========== PTY 交互 ==========

    def send_keys(
        self,
        session_name: str,
        keys: str,
        enter: bool = True,
    ) -> None:
        """
        向 session 发送按键。

        Args:
            session_name: tmux session 名称
            keys: 要发送的内容（支持快捷键如 "C-c"）
            enter: 是否在末尾追加 Enter
        """
        # 检查是否是特殊键
        if keys in self.SPECIAL_KEYS:
            tmux_key = self.SPECIAL_KEYS[keys]
            subprocess.run(
                ["tmux", "send-keys", "-t", session_name, tmux_key],
                capture_output=True,
            )
        else:
            # 字面模式发送，避免 tmux 解释特殊字符
            subprocess.run(
                ["tmux", "send-keys", "-l", "-t", session_name, keys],
                capture_output=True,
            )
            if enter:
                subprocess.run(
                    ["tmux", "send-keys", "-t", session_name, "Enter"],
                    capture_output=True,
                )

    def send_raw(self, session_name: str, data: str) -> None:
        """
        发送原始数据（不含 Enter），用于交互模式精确控制。

        Args:
            session_name: tmux session 名称
            data: 原始数据
        """
        subprocess.run(
            ["tmux", "send-keys", "-l", "-t", session_name, data],
            capture_output=True,
        )

    def capture_pane(
        self,
        session_name: str,
        as_ansi: bool = False,
        start_line: int = -100,
    ) -> str:
        """
        抓取终端屏幕内容。

        Args:
            session_name: tmux session 名称
            as_ansi: 是否保留 ANSI 转义码
            start_line: 起始行（负数表示从底部计算）

        Returns:
            终端屏幕文本
        """
        cmd = [
            "tmux", "capture-pane",
            "-t", session_name,
            "-p",
            "-S", str(start_line),
        ]
        if as_ansi:
            cmd.append("-e")
        result = subprocess.run(cmd, capture_output=True, text=True)
        return result.stdout

    # ========== 模式检测 ==========

    def get_active_process(self, session_name: str) -> Optional[str]:
        """
        获取 tmux 窗口中的活跃进程名。

        Args:
            session_name: tmux session 名称

        Returns:
            进程名或 None
        """
        # 获取 pane 的 PID
        result = subprocess.run(
            ["tmux", "list-panes", "-t", session_name, "-F", "#{pane_pid}"],
            capture_output=True,
            text=True,
        )
        pane_pids = result.stdout.strip().split("\n")
        if not pane_pids or not pane_pids[0]:
            return None

        pane_pid = pane_pids[0]

        # 获取该 PID 的子进程
        try:
            ps_result = subprocess.run(
                ["ps", "--ppid", pane_pid, "-o", "comm=", "--no-headers"],
                capture_output=True,
                text=True,
            )
            processes = ps_result.stdout.strip().split("\n")

            # 返回最上层的非 shell 进程
            for proc in processes:
                proc = proc.strip()
                if proc and proc not in ("bash", "zsh", "sh", "fish"):
                    return proc
        except Exception:
            pass

        return None

    def get_pty_state(self, session_name: str) -> PtyState:
        """
        检测 PTY 是否在等待输入。

        通过 /proc/<pid>/stat 的状态字段判断。

        Args:
            session_name: tmux session 名称

        Returns:
            PTY 状态
        """
        # 获取 pane PID
        result = subprocess.run(
            ["tmux", "list-panes", "-t", session_name, "-F", "#{pane_pid}"],
            capture_output=True,
            text=True,
        )
        pane_pids = result.stdout.strip().split("\n")
        if not pane_pids or not pane_pids[0]:
            return PtyState.UNKNOWN

        pane_pid = pane_pids[0]

        # 读取进程状态
        try:
            stat_path = Path(f"/proc/{pane_pid}/stat")
            if not stat_path.exists():
                return PtyState.UNKNOWN

            stat_content = stat_path.read_text()
            parts = stat_content.split()
            if len(parts) < 3:
                return PtyState.UNKNOWN

            state = parts[2]

            # 检查是否有活跃子进程
            children_path = Path(f"/proc/{pane_pid}/task/{pane_pid}/children")
            if children_path.exists():
                children = children_path.read_text().strip().split()
                for child_pid in children:
                    if child_pid:
                        child_stat = Path(f"/proc/{child_pid}/stat")
                        if child_stat.exists():
                            child_state = child_stat.read_text().split()[2]
                            if child_state == "R":
                                return PtyState.RUNNING

            if state in ("S", "D"):
                return PtyState.IDLE
            elif state == "T":
                return PtyState.BLOCKED
            else:
                return PtyState.RUNNING

        except (FileNotFoundError, PermissionError, ProcessLookupError):
            return PtyState.UNKNOWN

    def detect_mode(self, session_name: str) -> SessionMode:
        """
        混合模式检测：进程名单匹配 + PTY 状态检测。

        Args:
            session_name: tmux session 名称

        Returns:
            检测到的会话模式
        """
        active_proc = self.get_active_process(session_name)

        # 进程名单匹配
        if active_proc and active_proc in self.INTERACTIVE_PROCESSES:
            return SessionMode.INTERACTIVE

        return SessionMode.NATURAL_LANGUAGE
```

- [ ] **Step 5: 运行测试确认通过**

```bash
pytest tests/test_wrapper.py -v
```

Expected: PASS

- [ ] **Step 6: 提交 TmuxWrapper 基础类**

```bash
git add src/butler/session/wrapper.py tests/test_wrapper.py tests/conftest.py
git commit -m "feat: add TmuxWrapper core class"
```

---

### Task 3.2: TmuxWrapper 模式检测测试

**Files:**
- Modify: `tests/test_wrapper.py`

- [ ] **Step 1: 添加模式检测测试**

```python
# 追加到 tests/test_wrapper.py

class TestTmuxWrapperModeDetection:
    """TmuxWrapper 模式检测测试。"""

    def test_get_active_process_vim(self, mock_subprocess, tmux_wrapper):
        """测试检测 vim 进程。"""
        # Mock list-panes 返回 PID
        mock_subprocess.side_effect = [
            Mock(stdout="12345\n", returncode=0),  # list-panes
            Mock(stdout="vim\nbash\n", returncode=0),  # ps
        ]
        
        proc = tmux_wrapper.get_active_process("test_session")
        assert proc == "vim"

    def test_get_active_process_top(self, mock_subprocess, tmux_wrapper):
        """测试检测 top 进程。"""
        mock_subprocess.side_effect = [
            Mock(stdout="12345\n", returncode=0),
            Mock(stdout="top\nbash\n", returncode=0),
        ]
        
        proc = tmux_wrapper.get_active_process("test_session")
        assert proc == "top"

    def test_get_active_process_shell_only(self, mock_subprocess, tmux_wrapper):
        """测试只有 shell 时返回 None。"""
        mock_subprocess.side_effect = [
            Mock(stdout="12345\n", returncode=0),
            Mock(stdout="bash\n", returncode=0),
        ]
        
        proc = tmux_wrapper.get_active_process("test_session")
        assert proc is None

    def test_detect_mode_vim(self, mock_subprocess, tmux_wrapper):
        """测试 vim 触发交互模式。"""
        mock_subprocess.side_effect = [
            Mock(stdout="12345\n", returncode=0),
            Mock(stdout="vim\nbash\n", returncode=0),
        ]
        
        mode = tmux_wrapper.detect_mode("test_session")
        assert mode == SessionMode.INTERACTIVE

    def test_detect_mode_normal(self, mock_subprocess, tmux_wrapper):
        """测试普通命令保持自然语言模式。"""
        mock_subprocess.side_effect = [
            Mock(stdout="12345\n", returncode=0),
            Mock(stdout="bash\n", returncode=0),
        ]
        
        mode = tmux_wrapper.detect_mode("test_session")
        assert mode == SessionMode.NATURAL_LANGUAGE
```

- [ ] **Step 2: 运行测试确认通过**

```bash
pytest tests/test_wrapper.py::TestTmuxWrapperModeDetection -v
```

Expected: PASS

- [ ] **Step 3: 提交模式检测测试**

```bash
git add tests/test_wrapper.py
git commit -m "test: add mode detection tests for TmuxWrapper"
```

---

## Chunk 4: Skill 扩展系统

### Task 4.1: Skill 基类

**Files:**
- Create: `src/butler/skills/__init__.py`
- Create: `src/butler/skills/base.py`
- Create: `tests/test_skills.py`

- [ ] **Step 1: 编写 Skill 基类测试**

```python
# tests/test_skills.py
"""Skill 系统测试。"""

import pytest
from butler.skills.base import SkillBase, SkillContext, SkillResult, SkillRegistry


class MockSkill(SkillBase):
    """测试用 Skill。"""
    
    name = "mock"
    description = "Mock skill for testing"
    triggers = ["/mock", "/test"]
    
    async def execute(self, ctx: SkillContext) -> SkillResult:
        return SkillResult(success=True, output=f"Executed with args: {ctx.args}")


class TestSkillBase:
    """Skill 基类测试。"""

    def test_skill_matches_trigger(self):
        """测试触发器匹配。"""
        skill = MockSkill()
        assert skill.matches("/mock arg1") is True
        assert skill.matches("/test") is True
        assert skill.matches("/other") is False

    def test_skill_context_creation(self):
        """测试上下文创建。"""
        ctx = SkillContext(
            session_name="test_session",
            user_id="user123",
            session_state={},
            args=["arg1", "arg2"],
        )
        assert ctx.session_name == "test_session"
        assert ctx.args == ["arg1", "arg2"]

    def test_skill_result_creation(self):
        """测试结果创建。"""
        result = SkillResult(success=True, output="Done", metadata={"key": "value"})
        assert result.success is True
        assert result.output == "Done"


class TestSkillRegistry:
    """Skill 注册表测试。"""

    def test_register_skill(self):
        """测试注册 Skill。"""
        registry = SkillRegistry()
        skill = MockSkill()
        
        registry.register(skill)
        
        assert "mock" in registry._skills

    def test_find_matching_skill(self):
        """测试查找匹配的 Skill。"""
        registry = SkillRegistry()
        registry.register(MockSkill())
        
        found = registry.find_matching("/mock arg1")
        assert found is not None
        assert found.name == "mock"

    def test_find_no_matching_skill(self):
        """测试无匹配时返回 None。"""
        registry = SkillRegistry()
        registry.register(MockSkill())
        
        found = registry.find_matching("/unknown")
        assert found is None
```

- [ ] **Step 2: 运行测试确认失败**

```bash
pytest tests/test_skills.py -v
```

Expected: FAIL

- [ ] **Step 3: 实现 Skill 基类**

```python
# src/butler/skills/__init__.py
"""Skill 扩展系统。"""

from butler.skills.base import SkillBase, SkillContext, SkillResult, SkillRegistry

__all__ = ["SkillBase", "SkillContext", "SkillResult", "SkillRegistry"]
```

```python
# src/butler/skills/base.py
"""Skill 基类和注册表。"""

from abc import ABC, abstractmethod
from typing import Any, Optional

from pydantic import BaseModel, Field


class SkillContext(BaseModel):
    """Skill 执行上下文。"""

    session_name: str = Field(..., description="会话名称")
    user_id: str = Field(..., description="用户 ID")
    session_state: dict[str, Any] = Field(default_factory=dict, description="会话状态")
    args: list[str] = Field(default_factory=list, description="参数列表")


class SkillResult(BaseModel):
    """Skill 执行结果。"""

    success: bool = Field(..., description="是否成功")
    output: str = Field(..., description="输出内容")
    metadata: dict[str, Any] = Field(default_factory=dict, description="额外元数据")


class SkillBase(ABC):
    """Skill 基类。"""

    name: str
    description: str
    triggers: list[str] = []

    @abstractmethod
    async def execute(self, ctx: SkillContext) -> SkillResult:
        """
        执行 Skill。

        Args:
            ctx: 执行上下文

        Returns:
            执行结果
        """
        pass

    def matches(self, content: str) -> bool:
        """
        检查是否匹配触发条件。

        Args:
            content: 用户输入内容

        Returns:
            是否匹配
        """
        content_stripped = content.strip()
        for trigger in self.triggers:
            if content_stripped.startswith(trigger):
                return True
        return False

    def parse_args(self, content: str) -> list[str]:
        """
        从内容中解析参数。

        Args:
            content: 用户输入内容

        Returns:
            参数列表
        """
        content_stripped = content.strip()
        for trigger in self.triggers:
            if content_stripped.startswith(trigger):
                args_str = content_stripped[len(trigger):].strip()
                if args_str:
                    return args_str.split()
                return []
        return []


class SkillRegistry:
    """Skill 注册表。"""

    def __init__(self) -> None:
        self._skills: dict[str, SkillBase] = {}

    def register(self, skill: SkillBase) -> None:
        """
        注册 Skill。

        Args:
            skill: Skill 实例
        """
        self._skills[skill.name] = skill

    def get(self, name: str) -> Optional[SkillBase]:
        """
        获取指定名称的 Skill。

        Args:
            name: Skill 名称

        Returns:
            Skill 实例或 None
        """
        return self._skills.get(name)

    def find_matching(self, content: str) -> Optional[SkillBase]:
        """
        查找匹配内容的 Skill。

        Args:
            content: 用户输入内容

        Returns:
            匹配的 Skill 或 None
        """
        for skill in self._skills.values():
            if skill.matches(content):
                return skill
        return None

    def list_skills(self) -> list[str]:
        """
        列出所有注册的 Skill 名称。

        Returns:
            Skill 名称列表
        """
        return list(self._skills.keys())
```

- [ ] **Step 4: 运行测试确认通过**

```bash
pytest tests/test_skills.py -v
```

Expected: PASS

- [ ] **Step 5: 提交 Skill 基类**

```bash
git add src/butler/skills/ tests/test_skills.py
git commit -m "feat: add Skill extension system base"
```

---

### Task 4.2: LogSkill 实现

**Files:**
- Create: `src/butler/skills/log_skill.py`
- Create: `data/.gitkeep`

- [ ] **Step 1: 编写 LogSkill 测试**

追加到 `tests/test_skills.py`:

```python
import tempfile
import pytest
from pathlib import Path
from butler.skills.log_skill import LogSkill


class TestLogSkill:
    """LogSkill 测试。"""

    def test_log_skill_triggers(self):
        """测试触发器。"""
        skill = LogSkill(db_path=":memory:")
        assert skill.matches("/log") is True
        assert skill.matches("/history") is True
        assert skill.matches("/log recent") is True

    @pytest.mark.asyncio
    async def test_log_command(self):
        """测试记录命令。"""
        skill = LogSkill(db_path=":memory:")
        skill.log_command("user123", "test_session", "ls -la", "output")
        
        ctx = SkillContext(
            session_name="test_session",
            user_id="user123",
            session_state={},
            args=["recent", "1"],
        )
        result = await skill.execute(ctx)
        assert result.success is True
        assert "ls -la" in result.output

    @pytest.mark.asyncio
    async def test_search_commands(self):
        """测试搜索命令。"""
        skill = LogSkill(db_path=":memory:")
        skill.log_command("user123", "test_session", "vim file.txt", "")
        skill.log_command("user123", "test_session", "cat other.txt", "")
        
        ctx = SkillContext(
            session_name="test_session",
            user_id="user123",
            session_state={},
            args=["search", "vim"],
        )
        result = await skill.execute(ctx)
        assert result.success is True
        assert "vim" in result.output
```

- [ ] **Step 2: 运行测试确认失败**

```bash
pytest tests/test_skills.py::TestLogSkill -v
```

Expected: FAIL

- [ ] **Step 3: 实现 LogSkill**

```python
# src/butler/skills/log_skill.py
"""日志记录 Skill。"""

import sqlite3
from datetime import datetime
from pathlib import Path
from typing import Optional

from butler.skills.base import SkillBase, SkillContext, SkillResult


class LogSkill(SkillBase):
    """记录和检索 Shell 交互日志。"""

    name = "log"
    description = "记录和检索 Shell 交互日志"
    triggers = ["/log", "/history"]

    def __init__(self, db_path: str = "data/logs.db"):
        """
        初始化 LogSkill。

        Args:
            db_path: SQLite 数据库路径
        """
        self.db_path = Path(db_path)
        if self.db_path != ":memory:":
            self.db_path.parent.mkdir(parents=True, exist_ok=True)
        self._init_db()

    def _init_db(self) -> None:
        """初始化数据库。"""
        conn = sqlite3.connect(self.db_path)
        conn.execute("""
            CREATE TABLE IF NOT EXISTS shell_logs (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                user_id TEXT NOT NULL,
                session_name TEXT NOT NULL,
                command TEXT NOT NULL,
                output TEXT,
                timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
            )
        """)
        conn.execute("CREATE INDEX IF NOT EXISTS idx_user ON shell_logs(user_id)")
        conn.execute("CREATE INDEX IF NOT EXISTS idx_timestamp ON shell_logs(timestamp)")
        conn.commit()
        conn.close()

    def log_command(
        self,
        user_id: str,
        session_name: str,
        command: str,
        output: str = "",
    ) -> None:
        """
        记录命令执行。

        Args:
            user_id: 用户 ID
            session_name: 会话名称
            command: 命令
            output: 输出
        """
        conn = sqlite3.connect(self.db_path)
        conn.execute(
            "INSERT INTO shell_logs (user_id, session_name, command, output) VALUES (?, ?, ?, ?)",
            (user_id, session_name, command, output),
        )
        conn.commit()
        conn.close()

    async def execute(self, ctx: SkillContext) -> SkillResult:
        """
        执行日志查询。

        Args:
            ctx: 执行上下文

        Returns:
            查询结果
        """
        args = ctx.args

        if not args or args[0] == "recent":
            limit = int(args[1]) if len(args) > 1 else 10
            return self._query_recent(ctx.user_id, limit)

        elif args[0] == "search":
            keyword = " ".join(args[1:]) if len(args) > 1 else ""
            return self._search_commands(ctx.user_id, keyword)

        return SkillResult(
            success=False,
            output="用法: /log [recent N|search 关键词]",
        )

    def _query_recent(self, user_id: str, limit: int) -> SkillResult:
        """查询最近记录。"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.execute(
            """
            SELECT command, timestamp FROM shell_logs
            WHERE user_id = ?
            ORDER BY timestamp DESC
            LIMIT ?
            """,
            (user_id, limit),
        )
        rows = cursor.fetchall()
        conn.close()

        output = "\n".join(
            f"[{row[1]}] {row[0]}"
            for row in rows
        )
        return SkillResult(
            success=True,
            output=output or "无日志记录",
        )

    def _search_commands(self, user_id: str, keyword: str) -> SkillResult:
        """搜索命令。"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.execute(
            """
            SELECT command, timestamp FROM shell_logs
            WHERE user_id = ? AND command LIKE ?
            ORDER BY timestamp DESC
            LIMIT 20
            """,
            (user_id, f"%{keyword}%"),
        )
        rows = cursor.fetchall()
        conn.close()

        output = "\n".join(
            f"[{row[1]}] {row[0]}"
            for row in rows
        )
        return SkillResult(
            success=True,
            output=output or "无匹配记录",
        )
```

- [ ] **Step 4: 创建 data 目录占位**

```bash
mkdir -p data && touch data/.gitkeep
```

- [ ] **Step 5: 运行测试确认通过**

```bash
pytest tests/test_skills.py::TestLogSkill -v
```

Expected: PASS

- [ ] **Step 6: 提交 LogSkill**

```bash
git add src/butler/skills/log_skill.py data/.gitkeep tests/test_skills.py
git commit -m "feat: add LogSkill for shell interaction logging"
```

---

### Task 4.3: SystemSkill 实现

**Files:**
- Create: `src/butler/skills/system_skill.py`

- [ ] **Step 1: 编写 SystemSkill 测试**

追加到 `tests/test_skills.py`:

```python
from butler.skills.system_skill import SystemSkill


class TestSystemSkill:
    """SystemSkill 测试。"""

    def test_system_skill_triggers(self):
        """测试触发器。"""
        skill = SystemSkill()
        assert skill.matches("/sys") is True
        assert skill.matches("/system") is True
        assert skill.matches("/load") is True

    @pytest.mark.asyncio
    async def test_system_load(self):
        """测试获取系统负载。"""
        skill = SystemSkill()
        ctx = SkillContext(
            session_name="test",
            user_id="user123",
            session_state={},
            args=["load"],
        )
        result = await skill.execute(ctx)
        assert result.success is True
        # uptime 输出应该包含 load average
        assert "load" in result.output.lower() or "up" in result.output.lower()

    @pytest.mark.asyncio
    async def test_system_disk(self):
        """测试获取磁盘信息。"""
        skill = SystemSkill()
        ctx = SkillContext(
            session_name="test",
            user_id="user123",
            session_state={},
            args=["disk"],
        )
        result = await skill.execute(ctx)
        assert result.success is True

    @pytest.mark.asyncio
    async def test_system_mem(self):
        """测试获取内存信息。"""
        skill = SystemSkill()
        ctx = SkillContext(
            session_name="test",
            user_id="user123",
            session_state={},
            args=["mem"],
        )
        result = await skill.execute(ctx)
        assert result.success is True

    @pytest.mark.asyncio
    async def test_system_help(self):
        """测试帮助信息。"""
        skill = SystemSkill()
        ctx = SkillContext(
            session_name="test",
            user_id="user123",
            session_state={},
            args=[],
        )
        result = await skill.execute(ctx)
        assert result.success is False
        assert "用法" in result.output
```

- [ ] **Step 2: 运行测试确认失败**

```bash
pytest tests/test_skills.py::TestSystemSkill -v
```

Expected: FAIL

- [ ] **Step 3: 实现 SystemSkill**

```python
# src/butler/skills/system_skill.py
"""系统信息 Skill。"""

import subprocess

from butler.skills.base import SkillBase, SkillContext, SkillResult


class SystemSkill(SkillBase):
    """获取系统负载和资源信息。"""

    name = "system"
    description = "获取系统负载和资源信息"
    triggers = ["/sys", "/system", "/load"]

    async def execute(self, ctx: SkillContext) -> SkillResult:
        """
        执行系统信息查询。

        Args:
            ctx: 执行上下文

        Returns:
            查询结果
        """
        args = ctx.args

        if not args or args[0] == "help":
            return SkillResult(
                success=False,
                output="用法: /system [load|disk|mem]",
            )

        cmd = args[0]

        if cmd == "load":
            return self._get_load()
        elif cmd == "disk":
            return self._get_disk()
        elif cmd == "mem":
            return self._get_memory()

        return SkillResult(
            success=False,
            output=f"未知命令: {cmd}\n用法: /system [load|disk|mem]",
        )

    def _get_load(self) -> SkillResult:
        """获取系统负载。"""
        try:
            result = subprocess.run(
                ["uptime"],
                capture_output=True,
                text=True,
                timeout=5,
            )
            return SkillResult(
                success=True,
                output=result.stdout.strip(),
            )
        except Exception as e:
            return SkillResult(
                success=False,
                output=f"获取系统负载失败: {e}",
            )

    def _get_disk(self) -> SkillResult:
        """获取磁盘使用情况。"""
        try:
            result = subprocess.run(
                ["df", "-h", "/"],
                capture_output=True,
                text=True,
                timeout=5,
            )
            return SkillResult(
                success=True,
                output=result.stdout.strip(),
            )
        except Exception as e:
            return SkillResult(
                success=False,
                output=f"获取磁盘信息失败: {e}",
            )

    def _get_memory(self) -> SkillResult:
        """获取内存使用情况。"""
        try:
            result = subprocess.run(
                ["free", "-h"],
                capture_output=True,
                text=True,
                timeout=5,
            )
            return SkillResult(
                success=True,
                output=result.stdout.strip(),
            )
        except Exception as e:
            return SkillResult(
                success=False,
                output=f"获取内存信息失败: {e}",
            )
```

- [ ] **Step 4: 运行测试确认通过**

```bash
pytest tests/test_skills.py::TestSystemSkill -v
```

Expected: PASS

- [ ] **Step 5: 提交 SystemSkill**

```bash
git add src/butler/skills/system_skill.py tests/test_skills.py
git commit -m "feat: add SystemSkill for system information"
```

---

## Chunk 5: Inngest 工作流

### Task 5.1: 工作流基础结构

**Files:**
- Create: `src/butler/workflows/__init__.py`
- Create: `src/butler/workflows/handle_message.py`

- [ ] **Step 1: 创建工作流模块初始化**

```python
# src/butler/workflows/__init__.py
"""Inngest 工作流模块。"""

import inngest

from butler.config import settings

# 创建 Inngest 客户端
inngest_client = inngest.Inngest(
    app_id="butler-shell",
    event_key=settings.inngest_event_key or "dev_key",
    signing_key=settings.inngest_signing_key or None,
)

__all__ = ["inngest_client"]
```

- [ ] **Step 2: 实现 handle_message 工作流骨架**

```python
# src/butler/workflows/handle_message.py
"""主消息处理工作流。"""

import uuid
from typing import Any

import inngest

from butler.config import settings
from butler.session import SessionMode, SessionState
from butler.session.wrapper import TmuxWrapper
from butler.security.auth import is_authorized
from butler.security.guardrail import is_dangerous_command, get_danger_reason
from butler.skills.base import SkillRegistry
from butler.skills.log_skill import LogSkill
from butler.skills.system_skill import SystemSkill
from butler.workflows import inngest_client

# 初始化组件
tmux = TmuxWrapper(session_prefix=settings.session_prefix)
skill_registry = SkillRegistry()
skill_registry.register(LogSkill())
skill_registry.register(SystemSkill())


@inngest_client.function(
    id="handle-im-message",
    name="Handle IM Message",
    trigger=inngest.TriggerEvent(event="im/message"),
)
async def handle_im_message(ctx: inngest.Context, step: inngest.Step) -> dict[str, Any]:
    """
    主消息处理流程。

    事件参数:
        user_id: QQ 用户 ID
        content: 消息内容
    """
    user_id = ctx.event.data.get("user_id", "")
    content = ctx.event.data.get("content", "")
    message_id = ctx.event.data.get("message_id", str(uuid.uuid4()))

    # Step 1: 权限检查
    if not is_authorized(user_id, settings.allowed_users):
        return {
            "status": "unauthorized",
            "message": f"用户 {user_id} 无权限使用此服务",
        }

    # Step 2: 获取或创建会话
    session_name = await step.run(
        "get-or-create-session",
        lambda: tmux.get_or_create_session(user_id),
    )

    # Step 3: 加载会话状态（简化版，实际应使用 ctx.state）
    state = SessionState(session_id=session_name, user_id=user_id)

    # Step 4: 检测当前模式
    current_mode = await step.run(
        "detect-mode",
        lambda: tmux.detect_mode(session_name),
    )

    # Step 5: 根据模式分流
    if current_mode == SessionMode.INTERACTIVE:
        # 交互模式：直接透传
        await step.run(
            "passthrough",
            lambda: tmux.send_keys(session_name, content, enter=False),
        )
        output = await step.run(
            "capture-output",
            lambda: tmux.capture_pane(session_name),
        )
        return {
            "status": "success",
            "mode": "interactive",
            "output": output[-2000:],  # 限制输出长度
        }

    # Step 6: 检查是否是 Skill
    skill = skill_registry.find_matching(content)
    if skill:
        args = skill.parse_args(content)
        from butler.skills.base import SkillContext
        skill_ctx = SkillContext(
            session_name=session_name,
            user_id=user_id,
            session_state=state.model_dump(),
            args=args,
        )
        result = await step.run(
            "execute-skill",
            lambda: skill.execute(skill_ctx),
        )
        return {
            "status": "success",
            "mode": "skill",
            "skill_name": skill.name,
            "output": result.output,
        }

    # Step 7: 危险命令检查
    if settings.guardrail_enabled and is_dangerous_command(content):
        reason = get_danger_reason(content) or "潜在危险命令"
        # 发送审批事件
        await step.send_event(
            "im/command-needs-approval",
            {
                "user_id": user_id,
                "command": content,
                "reason": reason,
                "session_name": session_name,
                "message_id": message_id,
            },
        )
        return {
            "status": "pending_approval",
            "mode": "nl",
            "reason": reason,
            "message": f"⚠️ 危险命令需要确认: {reason}",
        }

    # Step 8: 正常命令执行
    await step.run(
        "execute-command",
        lambda: tmux.send_keys(session_name, content, enter=True),
    )

    # Step 9: 捕获输出
    output = await step.run(
        "capture-output",
        lambda: tmux.capture_pane(session_name),
    )

    # Step 10: 记录日志
    log_skill = skill_registry.get("log")
    if log_skill:
        log_skill.log_command(user_id, session_name, content, output[:500])

    return {
        "status": "success",
        "mode": "nl",
        "output": output[-2000:],
    }
```

- [ ] **Step 3: 提交工作流骨架**

```bash
git add src/butler/workflows/
git commit -m "feat: add handle_message workflow skeleton"
```

---

### Task 5.2: 审批工作流

**Files:**
- Create: `src/butler/workflows/guardrail.py`

- [ ] **Step 1: 实现审批工作流**

```python
# src/butler/workflows/guardrail.py
"""命令准入审批工作流。"""

import inngest

from butler.config import settings
from butler.session.wrapper import TmuxWrapper
from butler.workflows import inngest_client

tmux = TmuxWrapper(session_prefix=settings.session_prefix)


@inngest_client.function(
    id="command-guardrail",
    name="Command Guardrail",
    trigger=inngest.TriggerEvent(event="im/command-needs-approval"),
)
async def command_guardrail(ctx: inngest.Context, step: inngest.Step) -> dict:
    """
    危险命令审批流程。

    事件参数:
        user_id: 用户 ID
        command: 命令内容
        reason: 危险原因
        session_name: 会话名称
    """
    user_id = ctx.event.data.get("user_id", "")
    command = ctx.event.data.get("command", "")
    reason = ctx.event.data.get("reason", "未知原因")
    session_name = ctx.event.data.get("session_name", "")

    # Step 1: 发送确认请求（通过回调到 QQ）
    # 这里简化处理，实际需要调用 gateway 发送消息
    confirmation_message = f"""⚠️ 危险命令检测

命令: `{command}`
原因: {reason}

回复 "确认" 执行，回复 "取消" 放弃
超时时间: {settings.guardrail_approval_timeout} 秒"""

    # Step 2: 等待用户回复
    try:
        approval = await step.wait_for_event(
            "wait-approval",
            event="im/approval-response",
            timeout=settings.guardrail_approval_timeout,
            filter={"user_id": user_id},
        )

        if approval.data.get("approved"):
            # 执行命令
            await step.run(
                "execute-approved",
                lambda: tmux.send_keys(session_name, command, enter=True),
            )
            output = await step.run(
                "capture-output",
                lambda: tmux.capture_pane(session_name),
            )
            return {
                "status": "approved",
                "executed": True,
                "output": output[-1000:],
            }
        else:
            return {
                "status": "rejected",
                "executed": False,
            }

    except inngest.errors.WaitForEventTimeout:
        return {
            "status": "timeout",
            "executed": False,
            "message": "审批超时，命令已取消",
        }
```

- [ ] **Step 2: 提交审批工作流**

```bash
git add src/butler/workflows/guardrail.py
git commit -m "feat: add command guardrail workflow"
```

---

## Chunk 6: QQ Gateway

### Task 6.1: Gateway 模块

**Files:**
- Create: `src/butler/gateway/__init__.py`
- Create: `src/butler/gateway/events.py`
- Create: `src/butler/gateway/napcat.py`

- [ ] **Step 1: 创建 Gateway 初始化**

```python
# src/butler/gateway/__init__.py
"""QQ Gateway 模块。"""

from butler.gateway.napcat import NapCatGateway

__all__ = ["NapCatGateway"]
```

```python
# src/butler/gateway/events.py
"""消息事件定义。"""

from butler.session.state import NapCatEvent

__all__ = ["NapCatEvent"]
```

- [ ] **Step 2: 实现 NapCat Gateway**

```python
# src/butler/gateway/napcat.py
"""NapCatQQ WebSocket 网关。"""

import asyncio
import json
from typing import Callable, Optional

import websockets
from websockets.client import WebSocketClientProtocol

from butler.session.state import NapCatEvent


class NapCatGateway:
    """NapCatQQ WebSocket 网关。"""

    def __init__(
        self,
        ws_url: str = "ws://localhost:3001",
        access_token: str = "",
    ):
        """
        初始化网关。

        Args:
            ws_url: WebSocket URL
            access_token: 访问令牌
        """
        self.ws_url = ws_url
        self.access_token = access_token
        self.message_handler: Optional[Callable[[NapCatEvent], None]] = None
        self._ws: Optional[WebSocketClientProtocol] = None
        self._running = False

    def on_message(self, handler: Callable[[NapCatEvent], None]) -> None:
        """
        设置消息处理器。

        Args:
            handler: 消息处理函数
        """
        self.message_handler = handler

    async def connect(self) -> None:
        """建立 WebSocket 连接并监听消息。"""
        headers = {}
        if self.access_token:
            headers["Authorization"] = f"Bearer {self.access_token}"

        self._running = True

        while self._running:
            try:
                async with websockets.connect(
                    self.ws_url,
                    extra_headers=headers,
                    ping_interval=30,
                    ping_timeout=10,
                ) as ws:
                    self._ws = ws
                    await self._listen(ws)
            except Exception as e:
                print(f"Gateway connection error: {e}")
                if self._running:
                    await asyncio.sleep(5)  # 重连间隔

    async def _listen(self, ws: WebSocketClientProtocol) -> None:
        """监听 WebSocket 消息。"""
        async for raw_message in ws:
            try:
                data = json.loads(raw_message)
                
                # 处理 NapCat 事件格式
                if isinstance(data, dict):
                    event = NapCatEvent(
                        time=data.get("time", 0),
                        self_id=data.get("self_id", 0),
                        post_type=data.get("post_type", ""),
                        message_type=data.get("message_type", ""),
                        user_id=data.get("user_id", 0),
                        message=self._extract_message(data),
                        raw_message=str(data),
                    )

                    # 只处理私聊消息
                    if event.is_private_message and self.message_handler:
                        self.message_handler(event)

            except Exception as e:
                print(f"Error processing message: {e}")

    def _extract_message(self, data: dict) -> str:
        """提取消息文本。"""
        message = data.get("message", "")
        if isinstance(message, str):
            return message
        elif isinstance(message, list):
            # 处理消息段格式
            texts = []
            for seg in message:
                if isinstance(seg, dict) and seg.get("type") == "text":
                    texts.append(seg.get("data", {}).get("text", ""))
            return "".join(texts)
        return str(message)

    async def send_private_message(self, user_id: int, message: str) -> None:
        """
        发送私聊消息。

        Args:
            user_id: 目标用户 ID
            message: 消息内容
        """
        if not self._ws:
            return

        payload = {
            "action": "send_private_msg",
            "params": {
                "user_id": user_id,
                "message": message,
            },
        }
        await self._ws.send(json.dumps(payload))

    async def stop(self) -> None:
        """停止网关。"""
        self._running = False
        if self._ws:
            await self._ws.close()
```

- [ ] **Step 3: 提交 Gateway 模块**

```bash
git add src/butler/gateway/
git commit -m "feat: add NapCatQQ WebSocket gateway"
```

---

## Chunk 7: 主入口和部署

### Task 7.1: FastAPI 主入口

**Files:**
- Create: `src/butler/main.py`

- [ ] **Step 1: 实现 FastAPI 主入口**

```python
# src/butler/main.py
"""FastAPI 主入口。"""

import asyncio
import logging
from contextlib import asynccontextmanager

import uvicorn
from fastapi import FastAPI
from fastapi.responses import JSONResponse

from butler.config import settings
from butler.gateway.napcat import NapCatGateway
from butler.workflows import inngest_client
from butler.workflows.handle_message import handle_im_message
from butler.workflows.guardrail import command_guardrail

# 配置日志
logging.basicConfig(
    level=getattr(logging, settings.log_level),
    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
logger = logging.getLogger(__name__)

# 初始化 Gateway
gateway = NapCatGateway(
    ws_url=settings.napcat_ws_url,
    access_token=settings.napcat_token,
)


async def run_gateway():
    """后台运行 Gateway。"""
    async def handle_message(event):
        """处理 QQ 消息。"""
        logger.info(f"Received message from {event.user_id}: {event.message[:50]}...")
        
        # 转发到 Inngest
        await inngest_client.send(
            inngest_client.Event(
                name="im/message",
                data={
                    "user_id": str(event.user_id),
                    "content": event.message,
                },
            )
        )

    gateway.on_message(handle_message)
    await gateway.connect()


@asynccontextmanager
async def lifespan(app: FastAPI):
    """应用生命周期管理。"""
    # 启动时
    logger.info("Starting Butler-Shell...")
    
    # 启动 Gateway 后台任务
    gateway_task = asyncio.create_task(run_gateway())
    
    yield
    
    # 关闭时
    logger.info("Shutting down Butler-Shell...")
    await gateway.stop()
    gateway_task.cancel()


# 创建 FastAPI 应用
app = FastAPI(
    title="Butler-Shell",
    description="Persistent Shell Assistant with QQ Bridge",
    version="0.1.0",
    lifespan=lifespan,
)


# 注册 Inngest 函数
inngest_client.register(handle_im_message)
inngest_client.register(command_guardrail)


# Inngest Webhook 端点
app.include_router(
    inngest_client.serve(),
    prefix="/api/inngest",
)


# 健康检查端点
@app.get("/health")
async def health():
    """服务存活检查。"""
    return {"status": "ok"}


@app.get("/ready")
async def ready():
    """服务就绪检查。"""
    # 检查 tmux 是否可用
    import subprocess
    try:
        subprocess.run(["tmux", "-V"], capture_output=True, check=True)
        tmux_ok = True
    except Exception:
        tmux_ok = False

    return {
        "status": "ready" if tmux_ok else "degraded",
        "tmux": tmux_ok,
        "gateway": gateway._ws is not None,
    }


# 用户审批端点（用于外部回调）
@app.post("/api/approval")
async def handle_approval(user_id: str, approved: bool):
    """处理用户审批响应。"""
    await inngest_client.send(
        inngest_client.Event(
            name="im/approval-response",
            data={
                "user_id": user_id,
                "approved": approved,
            },
        )
    )
    return {"status": "sent"}


def main():
    """启动服务。"""
    uvicorn.run(
        "butler.main:app",
        host="0.0.0.0",
        port=8000,
        reload=False,
    )


if __name__ == "__main__":
    main()
```

- [ ] **Step 2: 提交主入口**

```bash
git add src/butler/main.py
git commit -m "feat: add FastAPI main entry point"
```

---

### Task 7.2: Docker 部署配置

**Files:**
- Create: `docker-compose.yml`
- Create: `Dockerfile`

- [ ] **Step 1: 创建 Dockerfile**

```dockerfile
# Dockerfile
FROM python:3.11-slim

WORKDIR /app

# 安装系统依赖
RUN apt-get update && apt-get install -y \
    tmux \
    procps \
    && rm -rf /var/lib/apt/lists/*

# 安装 Python 依赖
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# 复制代码
COPY src/ ./src/
COPY config.yaml .
COPY .env.example .env

# 创建数据目录
RUN mkdir -p /app/data

# 暴露端口
EXPOSE 8000

# 启动命令
CMD ["python", "-m", "butler.main"]
```

- [ ] **Step 2: 创建 docker-compose.yml**

```yaml
# docker-compose.yml
version: '3.8'

services:
  butler:
    build: .
    ports:
      - "8000:8000"
    volumes:
      - ./data:/app/data
      - ./config.yaml:/app/config.yaml:ro
    environment:
      - BUTLER_ALLOWED_USERS=${BUTLER_ALLOWED_USERS:-}
      - NAPCAT_WS_URL=${NAPCAT_WS_URL:-ws://napcat:3001}
      - NAPCAT_TOKEN=${NAPCAT_TOKEN:-}
      - INNGEST_EVENT_KEY=${INNGEST_EVENT_KEY:-dev_key}
      - INNGEST_SIGNING_KEY=${INNGEST_SIGNING_KEY:-}
    depends_on:
      - inngest
    restart: unless-stopped

  inngest:
    image: inngest/inngest:latest
    ports:
      - "8288:8288"
    command: start --url http://butler:8000/api/inngest
    restart: unless-stopped

  napcat:
    image: mlikiowa/napcat-docker:latest
    ports:
      - "3001:3001"
    volumes:
      - ./napcat-data:/app/napcat/config
    restart: unless-stopped
```

- [ ] **Step 3: 提交部署配置**

```bash
git add docker-compose.yml Dockerfile
git commit -m "feat: add Docker deployment configuration"
```

---

### Task 7.3: Mock 测试脚本

**Files:**
- Create: `scripts/mock_test.py`

- [ ] **Step 1: 创建 Mock 测试脚本**

```python
#!/usr/bin/env python3
"""Mock 测试脚本 - 模拟 QQ 消息输入。"""

import asyncio
import httpx
import sys


async def send_mock_message(user_id: str, content: str):
    """发送模拟消息到 Butler-Shell。"""
    async with httpx.AsyncClient() as client:
        # 直接调用 Inngest 端点模拟消息
        response = await client.post(
            "http://localhost:8000/api/inngest",
            json={
                "name": "im/message",
                "data": {
                    "user_id": user_id,
                    "content": content,
                },
            },
            headers={
                "Content-Type": "application/json",
            },
        )
        print(f"Response: {response.status_code}")
        print(f"Body: {response.text}")


async def main():
    """主函数。"""
    if len(sys.argv) < 2:
        print("Usage: python mock_test.py <command> [user_id]")
        print("\nCommands:")
        print("  normal <cmd>   - 发送普通命令")
        print("  dangerous      - 发送危险命令测试")
        print("  skill          - 发送 Skill 命令")
        return

    user_id = sys.argv[2] if len(sys.argv) > 2 else "test_user_001"
    command = sys.argv[1]

    if command == "normal":
        content = sys.argv[3] if len(sys.argv) > 3 else "ls -la"
        await send_mock_message(user_id, content)
    elif command == "dangerous":
        await send_mock_message(user_id, "rm -rf /tmp/test")
    elif command == "skill":
        await send_mock_message(user_id, "/log recent 5")
    else:
        await send_mock_message(user_id, command)


if __name__ == "__main__":
    asyncio.run(main())
```

- [ ] **Step 2: 提交 Mock 测试脚本**

```bash
mkdir -p scripts
git add scripts/mock_test.py
git commit -m "feat: add mock test script for QQ message simulation"
```

---

## 完成检查清单

实施完成后，确保以下功能正常：

- [ ] 所有测试通过: `pytest tests/ -v`
- [ ] 类型检查通过: `mypy src/`
- [ ] 代码格式正确: `ruff check src/`
- [ ] 服务可启动: `python -m butler.main`
- [ ] 健康检查正常: `curl http://localhost:8000/health`
- [ ] Mock 测试可运行: `python scripts/mock_test.py normal "ls -la"`

---

**计划完成。准备执行？**
