docs: 更新 HTTP 错误码、用户积分、占卜运行及用户资料协议文档

This commit is contained in:
qzl
2026-04-10 16:45:45 +08:00
parent 1bc8bc6a27
commit 17ef460391
78 changed files with 18680 additions and 25 deletions
+82
View File
@@ -0,0 +1,82 @@
"""
Common utilities for Trellis workflow scripts.
This module provides shared functionality used by other Trellis scripts.
"""
import io
import sys
# =============================================================================
# Windows Encoding Fix (MUST be at top, before any other output)
# =============================================================================
# On Windows, stdout defaults to the system code page (often GBK/CP936).
# This causes UnicodeEncodeError when printing non-ASCII characters.
#
# Any script that imports from common will automatically get this fix.
# =============================================================================
def _configure_stream(stream: object) -> object:
"""Configure a stream for UTF-8 encoding on Windows."""
# Try reconfigure() first (Python 3.7+, more reliable)
if hasattr(stream, "reconfigure"):
stream.reconfigure(encoding="utf-8", errors="replace") # type: ignore[union-attr]
return stream
# Fallback: detach and rewrap with TextIOWrapper
elif hasattr(stream, "detach"):
return io.TextIOWrapper(
stream.detach(), # type: ignore[union-attr]
encoding="utf-8",
errors="replace",
)
return stream
if sys.platform == "win32":
sys.stdout = _configure_stream(sys.stdout) # type: ignore[assignment]
sys.stderr = _configure_stream(sys.stderr) # type: ignore[assignment]
sys.stdin = _configure_stream(sys.stdin) # type: ignore[assignment]
def configure_encoding() -> None:
"""
Configure stdout/stderr/stdin for UTF-8 encoding on Windows.
This is automatically called when importing from common,
but can be called manually for scripts that don't import common.
Safe to call multiple times.
"""
global sys
if sys.platform == "win32":
sys.stdout = _configure_stream(sys.stdout) # type: ignore[assignment]
sys.stderr = _configure_stream(sys.stderr) # type: ignore[assignment]
sys.stdin = _configure_stream(sys.stdin) # type: ignore[assignment]
from .paths import (
DIR_WORKFLOW,
DIR_WORKSPACE,
DIR_TASKS,
DIR_ARCHIVE,
DIR_SPEC,
DIR_SCRIPTS,
FILE_DEVELOPER,
FILE_CURRENT_TASK,
FILE_TASK_JSON,
FILE_JOURNAL_PREFIX,
get_repo_root,
get_developer,
check_developer,
get_tasks_dir,
get_workspace_dir,
get_active_journal_file,
count_lines,
get_current_task,
get_current_task_abs,
set_current_task,
clear_current_task,
has_current_task,
generate_task_date_prefix,
)
+628
View File
@@ -0,0 +1,628 @@
"""
CLI Adapter for Multi-Platform Support.
Abstracts differences between Claude Code, OpenCode, Cursor, iFlow, Codex, Kilo, Kiro Code, Gemini CLI, Antigravity, and Qoder interfaces.
Supported platforms:
- claude: Claude Code (default)
- opencode: OpenCode
- cursor: Cursor IDE
- iflow: iFlow CLI
- codex: Codex CLI (skills-based)
- kilo: Kilo CLI
- kiro: Kiro Code (skills-based)
- gemini: Gemini CLI
- antigravity: Antigravity (workflow-based)
- qoder: Qoder
Usage:
from common.cli_adapter import CLIAdapter
adapter = CLIAdapter("opencode")
cmd = adapter.build_run_command(
agent="dispatch",
session_id="abc123",
prompt="Start the pipeline"
)
"""
from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
from typing import ClassVar, Literal
Platform = Literal[
"claude",
"opencode",
"cursor",
"iflow",
"codex",
"kilo",
"kiro",
"gemini",
"antigravity",
"qoder",
]
@dataclass
class CLIAdapter:
"""Adapter for different AI coding CLI tools."""
platform: Platform
# =========================================================================
# Agent Name Mapping
# =========================================================================
# OpenCode has built-in agents that cannot be overridden
# See: https://github.com/sst/opencode/issues/4271
# Note: Class-level constant, not a dataclass field
_AGENT_NAME_MAP: ClassVar[dict[Platform, dict[str, str]]] = {
"claude": {}, # No mapping needed
"opencode": {
"plan": "trellis-plan", # 'plan' is built-in in OpenCode
},
}
def get_agent_name(self, agent: str) -> str:
"""Get platform-specific agent name.
Args:
agent: Original agent name (e.g., 'plan', 'dispatch')
Returns:
Platform-specific agent name (e.g., 'trellis-plan' for OpenCode)
"""
mapping = self._AGENT_NAME_MAP.get(self.platform, {})
return mapping.get(agent, agent)
# =========================================================================
# Agent Path
# =========================================================================
@property
def config_dir_name(self) -> str:
"""Get platform-specific config directory name.
Returns:
Directory name ('.claude', '.opencode', '.cursor', '.iflow', '.agents', '.kilocode', '.kiro', '.gemini', '.agent', or '.qoder')
"""
if self.platform == "opencode":
return ".opencode"
elif self.platform == "cursor":
return ".cursor"
elif self.platform == "iflow":
return ".iflow"
elif self.platform == "codex":
return ".agents"
elif self.platform == "kilo":
return ".kilocode"
elif self.platform == "kiro":
return ".kiro"
elif self.platform == "gemini":
return ".gemini"
elif self.platform == "antigravity":
return ".agent"
elif self.platform == "qoder":
return ".qoder"
else:
return ".claude"
def get_config_dir(self, project_root: Path) -> Path:
"""Get platform-specific config directory.
Args:
project_root: Project root directory
Returns:
Path to config directory (.claude, .opencode, .cursor, .iflow, .agents, .kilocode, .kiro, .gemini, .agent, or .qoder)
"""
return project_root / self.config_dir_name
def get_agent_path(self, agent: str, project_root: Path) -> Path:
"""Get path to agent definition file.
Args:
agent: Agent name (original, before mapping)
project_root: Project root directory
Returns:
Path to agent .md file
"""
mapped_name = self.get_agent_name(agent)
return self.get_config_dir(project_root) / "agents" / f"{mapped_name}.md"
def get_commands_path(self, project_root: Path, *parts: str) -> Path:
"""Get path to commands directory or specific command file.
Args:
project_root: Project root directory
*parts: Additional path parts (e.g., 'trellis', 'finish-work.md')
Returns:
Path to commands directory or file
Note:
Cursor uses prefix naming: .cursor/commands/trellis-<name>.md
Antigravity uses workflow directory: .agent/workflows/<name>.md
Claude/OpenCode use subdirectory: .claude/commands/trellis/<name>.md
"""
if self.platform in ("antigravity", "kilo"):
workflow_dir = self.get_config_dir(project_root) / "workflows"
if not parts:
return workflow_dir
if len(parts) >= 2 and parts[0] == "trellis":
filename = parts[-1]
return workflow_dir / filename
return workflow_dir / Path(*parts)
if not parts:
return self.get_config_dir(project_root) / "commands"
# Cursor uses prefix naming instead of subdirectory
if self.platform == "cursor" and len(parts) >= 2 and parts[0] == "trellis":
# Convert trellis/<name>.md to trellis-<name>.md
filename = parts[-1]
return (
self.get_config_dir(project_root) / "commands" / f"trellis-{filename}"
)
return self.get_config_dir(project_root) / "commands" / Path(*parts)
def get_trellis_command_path(self, name: str) -> str:
"""Get relative path to a trellis command file.
Args:
name: Command name without extension (e.g., 'finish-work', 'check-backend')
Returns:
Relative path string for use in JSONL entries
Note:
Cursor: .cursor/commands/trellis-<name>.md
Codex: .agents/skills/<name>/SKILL.md
Kiro: .kiro/skills/<name>/SKILL.md
Gemini: .gemini/commands/trellis/<name>.toml
Antigravity: .agent/workflows/<name>.md
Others: .{platform}/commands/trellis/<name>.md
"""
if self.platform == "cursor":
return f".cursor/commands/trellis-{name}.md"
elif self.platform == "codex":
return f".agents/skills/{name}/SKILL.md"
elif self.platform == "kiro":
return f".kiro/skills/{name}/SKILL.md"
elif self.platform == "gemini":
return f".gemini/commands/trellis/{name}.toml"
elif self.platform == "antigravity":
return f".agent/workflows/{name}.md"
elif self.platform == "kilo":
return f".kilocode/workflows/{name}.md"
else:
return f"{self.config_dir_name}/commands/trellis/{name}.md"
# =========================================================================
# Environment Variables
# =========================================================================
def get_non_interactive_env(self) -> dict[str, str]:
"""Get environment variables for non-interactive mode.
Returns:
Dict of environment variables to set
"""
if self.platform == "opencode":
return {"OPENCODE_NON_INTERACTIVE": "1"}
elif self.platform == "iflow":
return {"IFLOW_NON_INTERACTIVE": "1"}
elif self.platform == "codex":
return {"CODEX_NON_INTERACTIVE": "1"}
elif self.platform == "kiro":
return {"KIRO_NON_INTERACTIVE": "1"}
elif self.platform == "gemini":
return {} # Gemini CLI doesn't have a non-interactive env var
elif self.platform == "antigravity":
return {}
elif self.platform == "qoder":
return {}
else:
return {"CLAUDE_NON_INTERACTIVE": "1"}
# =========================================================================
# CLI Command Building
# =========================================================================
def build_run_command(
self,
agent: str,
prompt: str,
session_id: str | None = None,
skip_permissions: bool = True,
verbose: bool = True,
json_output: bool = True,
) -> list[str]:
"""Build CLI command for running an agent.
Args:
agent: Agent name (will be mapped if needed)
prompt: Prompt to send to the agent
session_id: Optional session ID (Claude Code only for creation)
skip_permissions: Whether to skip permission prompts
verbose: Whether to enable verbose output
json_output: Whether to use JSON output format
Returns:
List of command arguments
"""
mapped_agent = self.get_agent_name(agent)
if self.platform == "opencode":
cmd = ["opencode", "run"]
cmd.extend(["--agent", mapped_agent])
# Note: OpenCode 'run' mode is non-interactive by default
# No equivalent to Claude Code's --dangerously-skip-permissions
# See: https://github.com/anomalyco/opencode/issues/9070
if json_output:
cmd.extend(["--format", "json"])
if verbose:
cmd.extend(["--log-level", "DEBUG", "--print-logs"])
# Note: OpenCode doesn't support --session-id on creation
# Session ID must be extracted from logs after startup
cmd.append(prompt)
elif self.platform == "iflow":
cmd = ["iflow", "-p"]
cmd.extend(["-y", "--agent", mapped_agent])
# iFlow doesn't support --session-id on creation
if verbose:
cmd.append("--verbose")
cmd.append(prompt)
elif self.platform == "codex":
cmd = ["codex", "exec"]
cmd.append(prompt)
elif self.platform == "kiro":
cmd = ["kiro", "run", prompt]
elif self.platform == "gemini":
cmd = ["gemini"]
cmd.append(prompt)
elif self.platform == "antigravity":
raise ValueError(
"Antigravity workflows are UI slash commands; CLI agent run is not supported."
)
elif self.platform == "qoder":
cmd = ["qodercli", "-p", prompt]
else: # claude
cmd = ["claude", "-p"]
cmd.extend(["--agent", mapped_agent])
if session_id:
cmd.extend(["--session-id", session_id])
if skip_permissions:
cmd.append("--dangerously-skip-permissions")
if json_output:
cmd.extend(["--output-format", "stream-json"])
if verbose:
cmd.append("--verbose")
cmd.append(prompt)
return cmd
def build_resume_command(self, session_id: str) -> list[str]:
"""Build CLI command for resuming a session.
Args:
session_id: Session ID to resume (ignored for iFlow)
Returns:
List of command arguments
"""
if self.platform == "opencode":
return ["opencode", "run", "--session", session_id]
elif self.platform == "iflow":
# iFlow uses -c to continue most recent conversation
# session_id is ignored as iFlow doesn't support session IDs
return ["iflow", "-c"]
elif self.platform == "codex":
return ["codex", "resume", session_id]
elif self.platform == "kiro":
return ["kiro", "resume", session_id]
elif self.platform == "gemini":
return ["gemini", "--resume", session_id]
elif self.platform == "antigravity":
raise ValueError(
"Antigravity workflows are UI slash commands; CLI resume is not supported."
)
elif self.platform == "qoder":
return ["qodercli", "--resume", session_id]
else:
return ["claude", "--resume", session_id]
def get_resume_command_str(self, session_id: str, cwd: str | None = None) -> str:
"""Get human-readable resume command string.
Args:
session_id: Session ID to resume
cwd: Optional working directory to cd into
Returns:
Command string for display
"""
cmd = self.build_resume_command(session_id)
cmd_str = " ".join(cmd)
if cwd:
return f"cd {cwd} && {cmd_str}"
return cmd_str
# =========================================================================
# Platform Detection Helpers
# =========================================================================
@property
def is_opencode(self) -> bool:
"""Check if platform is OpenCode."""
return self.platform == "opencode"
@property
def is_claude(self) -> bool:
"""Check if platform is Claude Code."""
return self.platform == "claude"
@property
def is_cursor(self) -> bool:
"""Check if platform is Cursor."""
return self.platform == "cursor"
@property
def is_iflow(self) -> bool:
"""Check if platform is iFlow CLI."""
return self.platform == "iflow"
@property
def cli_name(self) -> str:
"""Get CLI executable name.
Note: Cursor doesn't have a CLI tool, returns None-like value.
"""
if self.is_opencode:
return "opencode"
elif self.is_cursor:
return "cursor" # Note: Cursor is IDE-only, no CLI
elif self.platform == "iflow":
return "iflow"
elif self.platform == "kiro":
return "kiro"
elif self.platform == "gemini":
return "gemini"
elif self.platform == "antigravity":
return "agy"
elif self.platform == "qoder":
return "qodercli"
else:
return "claude"
@property
def supports_cli_agents(self) -> bool:
"""Check if platform supports running agents via CLI.
Claude Code, OpenCode, and iFlow support CLI agent execution.
Cursor is IDE-only and doesn't support CLI agents.
"""
return self.platform in ("claude", "opencode", "iflow")
# =========================================================================
# Session ID Handling
# =========================================================================
@property
def supports_session_id_on_create(self) -> bool:
"""Check if platform supports specifying session ID on creation.
Claude Code: Yes (--session-id)
OpenCode: No (auto-generated, extract from logs)
iFlow: No (no session ID support)
"""
return self.platform == "claude"
def extract_session_id_from_log(self, log_content: str) -> str | None:
"""Extract session ID from log output (OpenCode only).
OpenCode generates session IDs in format: ses_xxx
Args:
log_content: Log file content
Returns:
Session ID if found, None otherwise
"""
import re
# OpenCode session ID pattern
match = re.search(r"ses_[a-zA-Z0-9]+", log_content)
if match:
return match.group(0)
return None
# =============================================================================
# Factory Function
# =============================================================================
def get_cli_adapter(platform: str = "claude") -> CLIAdapter:
"""Get CLI adapter for the specified platform.
Args:
platform: Platform name ('claude', 'opencode', 'cursor', 'iflow', 'codex', 'kilo', 'kiro', 'gemini', 'antigravity', or 'qoder')
Returns:
CLIAdapter instance
Raises:
ValueError: If platform is not supported
"""
if platform not in (
"claude",
"opencode",
"cursor",
"iflow",
"codex",
"kilo",
"kiro",
"gemini",
"antigravity",
"qoder",
):
raise ValueError(
f"Unsupported platform: {platform} (must be 'claude', 'opencode', 'cursor', 'iflow', 'codex', 'kilo', 'kiro', 'gemini', 'antigravity', or 'qoder')"
)
return CLIAdapter(platform=platform) # type: ignore
def detect_platform(project_root: Path) -> Platform:
"""Auto-detect platform based on existing config directories.
Detection order:
1. TRELLIS_PLATFORM environment variable (if set)
2. .opencode directory exists → opencode
3. .iflow directory exists → iflow
4. .cursor directory exists (without .claude) → cursor
5. .agents/skills exists and no other platform dirs → codex
6. .kilocode directory exists → kilo
7. .kiro/skills exists and no other platform dirs → kiro
8. .gemini directory exists → gemini
9. .agent/workflows exists and no other platform dirs → antigravity
10. .qoder directory exists → qoder
11. Default → claude
Args:
project_root: Project root directory
Returns:
Detected platform ('claude', 'opencode', 'cursor', 'iflow', 'codex', 'kilo', 'kiro', 'gemini', 'antigravity', or 'qoder')
"""
import os
# Check environment variable first
env_platform = os.environ.get("TRELLIS_PLATFORM", "").lower()
if env_platform in (
"claude",
"opencode",
"cursor",
"iflow",
"codex",
"kilo",
"kiro",
"gemini",
"antigravity",
"qoder",
):
return env_platform # type: ignore
# Check for .opencode directory (OpenCode-specific)
# Note: .claude might exist in both platforms during migration
if (project_root / ".opencode").is_dir():
return "opencode"
# Check for .iflow directory (iFlow-specific)
# Note: .claude might exist in both platforms during migration
if (project_root / ".iflow").is_dir():
return "iflow"
# Check for .cursor directory (Cursor-specific)
# Only detect as cursor if .claude doesn't exist (to avoid confusion)
if (project_root / ".cursor").is_dir() and not (project_root / ".claude").is_dir():
return "cursor"
# Check for .gemini directory (Gemini CLI-specific)
if (project_root / ".gemini").is_dir():
return "gemini"
# Check for Codex skills directory only when no other platform config exists
other_platform_dirs_codex = (
".claude",
".cursor",
".iflow",
".opencode",
".kilocode",
".kiro",
".gemini",
".agent",
)
has_other_platform_config = any(
(project_root / directory).is_dir() for directory in other_platform_dirs_codex
)
if (project_root / ".agents" / "skills").is_dir() and not has_other_platform_config:
return "codex"
# Check for .kilocode directory (Kilo-specific)
if (project_root / ".kilocode").is_dir():
return "kilo"
# Check for Kiro skills directory only when no other platform config exists
other_platform_dirs_kiro = (
".claude",
".cursor",
".iflow",
".opencode",
".agents",
".kilocode",
".gemini",
".agent",
)
has_other_platform_config = any(
(project_root / directory).is_dir() for directory in other_platform_dirs_kiro
)
if (project_root / ".kiro" / "skills").is_dir() and not has_other_platform_config:
return "kiro"
# Check for Antigravity workflow directory only when no other platform config exists
other_platform_dirs_antigravity = (
".claude",
".cursor",
".iflow",
".opencode",
".agents",
".kilocode",
".kiro",
)
has_other_platform_config = any(
(project_root / directory).is_dir()
for directory in other_platform_dirs_antigravity
)
if (
project_root / ".agent" / "workflows"
).is_dir() and not has_other_platform_config:
return "antigravity"
# Check for .qoder directory (Qoder-specific)
if (project_root / ".qoder").is_dir():
return "qoder"
return "claude"
def get_cli_adapter_auto(project_root: Path) -> CLIAdapter:
"""Get CLI adapter with auto-detected platform.
Args:
project_root: Project root directory
Returns:
CLIAdapter instance for detected platform
"""
platform = detect_platform(project_root)
return CLIAdapter(platform=platform)
+72
View File
@@ -0,0 +1,72 @@
#!/usr/bin/env python3
"""
Trellis configuration reader.
Reads settings from .trellis/config.yaml with sensible defaults.
"""
from __future__ import annotations
from pathlib import Path
from .paths import DIR_WORKFLOW, get_repo_root
from .worktree import parse_simple_yaml
# Defaults
DEFAULT_SESSION_COMMIT_MESSAGE = "chore: record journal"
DEFAULT_MAX_JOURNAL_LINES = 2000
CONFIG_FILE = "config.yaml"
def _get_config_path(repo_root: Path | None = None) -> Path:
"""Get path to config.yaml."""
root = repo_root or get_repo_root()
return root / DIR_WORKFLOW / CONFIG_FILE
def _load_config(repo_root: Path | None = None) -> dict:
"""Load and parse config.yaml. Returns empty dict on any error."""
config_file = _get_config_path(repo_root)
try:
content = config_file.read_text(encoding="utf-8")
return parse_simple_yaml(content)
except (OSError, IOError):
return {}
def get_session_commit_message(repo_root: Path | None = None) -> str:
"""Get the commit message for auto-committing session records."""
config = _load_config(repo_root)
return config.get("session_commit_message", DEFAULT_SESSION_COMMIT_MESSAGE)
def get_max_journal_lines(repo_root: Path | None = None) -> int:
"""Get the maximum lines per journal file."""
config = _load_config(repo_root)
value = config.get("max_journal_lines", DEFAULT_MAX_JOURNAL_LINES)
try:
return int(value)
except (ValueError, TypeError):
return DEFAULT_MAX_JOURNAL_LINES
def get_hooks(event: str, repo_root: Path | None = None) -> list[str]:
"""Get hook commands for a lifecycle event.
Args:
event: Event name (e.g. "after_create", "after_archive").
repo_root: Repository root path.
Returns:
List of shell commands to execute, empty if none configured.
"""
config = _load_config(repo_root)
hooks = config.get("hooks")
if not isinstance(hooks, dict):
return []
commands = hooks.get(event)
if isinstance(commands, list):
return [str(c) for c in commands]
return []
+190
View File
@@ -0,0 +1,190 @@
#!/usr/bin/env python3
"""
Developer management utilities.
Provides:
init_developer - Initialize developer
ensure_developer - Ensure developer is initialized (exit if not)
show_developer_info - Show developer information
"""
from __future__ import annotations
import sys
from datetime import datetime
from pathlib import Path
from .paths import (
DIR_WORKFLOW,
DIR_WORKSPACE,
DIR_TASKS,
FILE_DEVELOPER,
FILE_JOURNAL_PREFIX,
get_repo_root,
get_developer,
check_developer,
)
# =============================================================================
# Developer Initialization
# =============================================================================
def init_developer(name: str, repo_root: Path | None = None) -> bool:
"""Initialize developer.
Creates:
- .trellis/.developer file with developer info
- .trellis/workspace/<name>/ directory structure
- Initial journal file and index.md
Args:
name: Developer name.
repo_root: Repository root path. Defaults to auto-detected.
Returns:
True on success, False on error.
"""
if not name:
print("Error: developer name is required", file=sys.stderr)
return False
if repo_root is None:
repo_root = get_repo_root()
dev_file = repo_root / DIR_WORKFLOW / FILE_DEVELOPER
workspace_dir = repo_root / DIR_WORKFLOW / DIR_WORKSPACE / name
# Create .developer file
initialized_at = datetime.now().isoformat()
try:
dev_file.write_text(
f"name={name}\ninitialized_at={initialized_at}\n",
encoding="utf-8"
)
except (OSError, IOError) as e:
print(f"Error: Failed to create .developer file: {e}", file=sys.stderr)
return False
# Create workspace directory structure
try:
workspace_dir.mkdir(parents=True, exist_ok=True)
except (OSError, IOError) as e:
print(f"Error: Failed to create workspace directory: {e}", file=sys.stderr)
return False
# Create initial journal file
journal_file = workspace_dir / f"{FILE_JOURNAL_PREFIX}1.md"
if not journal_file.exists():
today = datetime.now().strftime("%Y-%m-%d")
journal_content = f"""# Journal - {name} (Part 1)
> AI development session journal
> Started: {today}
---
"""
try:
journal_file.write_text(journal_content, encoding="utf-8")
except (OSError, IOError) as e:
print(f"Error: Failed to create journal file: {e}", file=sys.stderr)
return False
# Create index.md with markers for auto-update
index_file = workspace_dir / "index.md"
if not index_file.exists():
index_content = f"""# Workspace Index - {name}
> Journal tracking for AI development sessions.
---
## Current Status
<!-- @@@auto:current-status -->
- **Active File**: `journal-1.md`
- **Total Sessions**: 0
- **Last Active**: -
<!-- @@@/auto:current-status -->
---
## Active Documents
<!-- @@@auto:active-documents -->
| File | Lines | Status |
|------|-------|--------|
| `journal-1.md` | ~0 | Active |
<!-- @@@/auto:active-documents -->
---
## Session History
<!-- @@@auto:session-history -->
| # | Date | Title | Commits |
|---|------|-------|---------|
<!-- @@@/auto:session-history -->
---
## Notes
- Sessions are appended to journal files
- New journal file created when current exceeds 2000 lines
- Use `add_session.py` to record sessions
"""
try:
index_file.write_text(index_content, encoding="utf-8")
except (OSError, IOError) as e:
print(f"Error: Failed to create index.md: {e}", file=sys.stderr)
return False
print(f"Developer initialized: {name}")
print(f" .developer file: {dev_file}")
print(f" Workspace dir: {workspace_dir}")
return True
def ensure_developer(repo_root: Path | None = None) -> None:
"""Ensure developer is initialized, exit if not.
Args:
repo_root: Repository root path. Defaults to auto-detected.
"""
if repo_root is None:
repo_root = get_repo_root()
if not check_developer(repo_root):
print("Error: Developer not initialized.", file=sys.stderr)
print(f"Run: python3 ./{DIR_WORKFLOW}/scripts/init_developer.py <your-name>", file=sys.stderr)
sys.exit(1)
def show_developer_info(repo_root: Path | None = None) -> None:
"""Show developer information.
Args:
repo_root: Repository root path. Defaults to auto-detected.
"""
if repo_root is None:
repo_root = get_repo_root()
developer = get_developer(repo_root)
if not developer:
print("Developer: (not initialized)")
else:
print(f"Developer: {developer}")
print(f"Workspace: {DIR_WORKFLOW}/{DIR_WORKSPACE}/{developer}/")
print(f"Tasks: {DIR_WORKFLOW}/{DIR_TASKS}/")
# =============================================================================
# Main Entry (for testing)
# =============================================================================
if __name__ == "__main__":
show_developer_info()
+641
View File
@@ -0,0 +1,641 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Git and Session Context utilities.
Provides:
output_json - Output context in JSON format
output_text - Output context in text format
"""
from __future__ import annotations
import json
import subprocess
from pathlib import Path
from .paths import (
DIR_SCRIPTS,
DIR_SPEC,
DIR_TASKS,
DIR_WORKFLOW,
DIR_WORKSPACE,
FILE_TASK_JSON,
count_lines,
get_active_journal_file,
get_current_task,
get_developer,
get_repo_root,
get_tasks_dir,
)
# =============================================================================
# Helper Functions
# =============================================================================
def _run_git_command(args: list[str], cwd: Path | None = None) -> tuple[int, str, str]:
"""Run a git command and return (returncode, stdout, stderr).
Uses UTF-8 encoding with -c i18n.logOutputEncoding=UTF-8 to ensure
consistent output across all platforms (Windows, macOS, Linux).
"""
try:
# Force git to output UTF-8 for consistent cross-platform behavior
git_args = ["git", "-c", "i18n.logOutputEncoding=UTF-8"] + args
result = subprocess.run(
git_args,
cwd=cwd,
capture_output=True,
text=True,
encoding="utf-8",
errors="replace",
)
return result.returncode, result.stdout, result.stderr
except Exception as e:
return 1, "", str(e)
def _read_json_file(path: Path) -> dict | None:
"""Read and parse a JSON file."""
try:
return json.loads(path.read_text(encoding="utf-8"))
except (FileNotFoundError, json.JSONDecodeError, OSError):
return None
# =============================================================================
# JSON Output
# =============================================================================
def get_context_json(repo_root: Path | None = None) -> dict:
"""Get context as a dictionary.
Args:
repo_root: Repository root path. Defaults to auto-detected.
Returns:
Context dictionary.
"""
if repo_root is None:
repo_root = get_repo_root()
developer = get_developer(repo_root)
tasks_dir = get_tasks_dir(repo_root)
journal_file = get_active_journal_file(repo_root)
journal_lines = 0
journal_relative = ""
if journal_file and developer:
journal_lines = count_lines(journal_file)
journal_relative = (
f"{DIR_WORKFLOW}/{DIR_WORKSPACE}/{developer}/{journal_file.name}"
)
# Git info
_, branch_out, _ = _run_git_command(["branch", "--show-current"], cwd=repo_root)
branch = branch_out.strip() or "unknown"
_, status_out, _ = _run_git_command(["status", "--porcelain"], cwd=repo_root)
git_status_count = len([line for line in status_out.splitlines() if line.strip()])
is_clean = git_status_count == 0
# Recent commits
_, log_out, _ = _run_git_command(["log", "--oneline", "-5"], cwd=repo_root)
commits = []
for line in log_out.splitlines():
if line.strip():
parts = line.split(" ", 1)
if len(parts) >= 2:
commits.append({"hash": parts[0], "message": parts[1]})
elif len(parts) == 1:
commits.append({"hash": parts[0], "message": ""})
# Tasks
tasks = []
if tasks_dir.is_dir():
for d in tasks_dir.iterdir():
if d.is_dir() and d.name != "archive":
task_json_path = d / FILE_TASK_JSON
if task_json_path.is_file():
data = _read_json_file(task_json_path)
if data:
tasks.append(
{
"dir": d.name,
"name": data.get("name") or data.get("id") or "unknown",
"status": data.get("status", "unknown"),
"children": data.get("children", []),
"parent": data.get("parent"),
}
)
return {
"developer": developer or "",
"git": {
"branch": branch,
"isClean": is_clean,
"uncommittedChanges": git_status_count,
"recentCommits": commits,
},
"tasks": {
"active": tasks,
"directory": f"{DIR_WORKFLOW}/{DIR_TASKS}",
},
"journal": {
"file": journal_relative,
"lines": journal_lines,
"nearLimit": journal_lines > 1800,
},
}
def output_json(repo_root: Path | None = None) -> None:
"""Output context in JSON format.
Args:
repo_root: Repository root path. Defaults to auto-detected.
"""
context = get_context_json(repo_root)
print(json.dumps(context, indent=2, ensure_ascii=False))
# =============================================================================
# Text Output
# =============================================================================
def get_context_text(repo_root: Path | None = None) -> str:
"""Get context as formatted text.
Args:
repo_root: Repository root path. Defaults to auto-detected.
Returns:
Formatted text output.
"""
if repo_root is None:
repo_root = get_repo_root()
lines = []
lines.append("========================================")
lines.append("SESSION CONTEXT")
lines.append("========================================")
lines.append("")
developer = get_developer(repo_root)
# Developer section
lines.append("## DEVELOPER")
if not developer:
lines.append(
f"ERROR: Not initialized. Run: python3 ./{DIR_WORKFLOW}/{DIR_SCRIPTS}/init_developer.py <name>"
)
return "\n".join(lines)
lines.append(f"Name: {developer}")
lines.append("")
# Git status
lines.append("## GIT STATUS")
_, branch_out, _ = _run_git_command(["branch", "--show-current"], cwd=repo_root)
branch = branch_out.strip() or "unknown"
lines.append(f"Branch: {branch}")
_, status_out, _ = _run_git_command(["status", "--porcelain"], cwd=repo_root)
status_lines = [line for line in status_out.splitlines() if line.strip()]
status_count = len(status_lines)
if status_count == 0:
lines.append("Working directory: Clean")
else:
lines.append(f"Working directory: {status_count} uncommitted change(s)")
lines.append("")
lines.append("Changes:")
_, short_out, _ = _run_git_command(["status", "--short"], cwd=repo_root)
for line in short_out.splitlines()[:10]:
lines.append(line)
lines.append("")
# Recent commits
lines.append("## RECENT COMMITS")
_, log_out, _ = _run_git_command(["log", "--oneline", "-5"], cwd=repo_root)
if log_out.strip():
for line in log_out.splitlines():
lines.append(line)
else:
lines.append("(no commits)")
lines.append("")
# Current task
lines.append("## CURRENT TASK")
current_task = get_current_task(repo_root)
if current_task:
current_task_dir = repo_root / current_task
task_json_path = current_task_dir / FILE_TASK_JSON
lines.append(f"Path: {current_task}")
if task_json_path.is_file():
data = _read_json_file(task_json_path)
if data:
t_name = data.get("name") or data.get("id") or "unknown"
t_status = data.get("status", "unknown")
t_created = data.get("createdAt", "unknown")
t_desc = data.get("description", "")
lines.append(f"Name: {t_name}")
lines.append(f"Status: {t_status}")
lines.append(f"Created: {t_created}")
if t_desc:
lines.append(f"Description: {t_desc}")
# Check for prd.md
prd_file = current_task_dir / "prd.md"
if prd_file.is_file():
lines.append("")
lines.append("[!] This task has prd.md - read it for task details")
else:
lines.append("(none)")
lines.append("")
# Active tasks
lines.append("## ACTIVE TASKS")
tasks_dir = get_tasks_dir(repo_root)
task_count = 0
# Collect all task data for hierarchy display
all_task_data: dict[str, dict] = {}
if tasks_dir.is_dir():
for d in sorted(tasks_dir.iterdir()):
if d.is_dir() and d.name != "archive":
dir_name = d.name
t_json = d / FILE_TASK_JSON
status = "unknown"
assignee = "-"
children: list[str] = []
parent: str | None = None
if t_json.is_file():
data = _read_json_file(t_json)
if data:
status = data.get("status", "unknown")
assignee = data.get("assignee", "-")
children = data.get("children", [])
parent = data.get("parent")
all_task_data[dir_name] = {
"status": status,
"assignee": assignee,
"children": children,
"parent": parent,
}
def _children_progress(children_list: list[str]) -> str:
if not children_list:
return ""
done = 0
for c in children_list:
if c in all_task_data and all_task_data[c]["status"] in ("completed", "done"):
done += 1
return f" [{done}/{len(children_list)} done]"
def _print_task_tree(name: str, indent: int = 0) -> None:
nonlocal task_count
info = all_task_data[name]
progress = _children_progress(info["children"]) if info["children"] else ""
prefix = " " * indent
lines.append(f"{prefix}- {name}/ ({info['status']}){progress} @{info['assignee']}")
task_count += 1
for child in info["children"]:
if child in all_task_data:
_print_task_tree(child, indent + 1)
for dir_name in sorted(all_task_data.keys()):
if not all_task_data[dir_name]["parent"]:
_print_task_tree(dir_name)
if task_count == 0:
lines.append("(no active tasks)")
lines.append(f"Total: {task_count} active task(s)")
lines.append("")
# My tasks
lines.append("## MY TASKS (Assigned to me)")
my_task_count = 0
if tasks_dir.is_dir():
for d in sorted(tasks_dir.iterdir()):
if d.is_dir() and d.name != "archive":
t_json = d / FILE_TASK_JSON
if t_json.is_file():
data = _read_json_file(t_json)
if data:
assignee = data.get("assignee", "")
status = data.get("status", "planning")
if assignee == developer and status != "done":
title = data.get("title") or data.get("name") or "unknown"
priority = data.get("priority", "P2")
children_list = data.get("children", [])
progress = _children_progress(children_list) if children_list else ""
lines.append(f"- [{priority}] {title} ({status}){progress}")
my_task_count += 1
if my_task_count == 0:
lines.append("(no tasks assigned to you)")
lines.append("")
# Journal file
lines.append("## JOURNAL FILE")
journal_file = get_active_journal_file(repo_root)
if journal_file:
journal_lines = count_lines(journal_file)
relative = f"{DIR_WORKFLOW}/{DIR_WORKSPACE}/{developer}/{journal_file.name}"
lines.append(f"Active file: {relative}")
lines.append(f"Line count: {journal_lines} / 2000")
if journal_lines > 1800:
lines.append("[!] WARNING: Approaching 2000 line limit!")
else:
lines.append("No journal file found")
lines.append("")
# Paths
lines.append("## PATHS")
lines.append(f"Workspace: {DIR_WORKFLOW}/{DIR_WORKSPACE}/{developer}/")
lines.append(f"Tasks: {DIR_WORKFLOW}/{DIR_TASKS}/")
lines.append(f"Spec: {DIR_WORKFLOW}/{DIR_SPEC}/")
lines.append("")
lines.append("========================================")
return "\n".join(lines)
def get_context_record_json(repo_root: Path | None = None) -> dict:
"""Get record-mode context as a dictionary.
Focused on: my active tasks, git status, current task.
"""
if repo_root is None:
repo_root = get_repo_root()
developer = get_developer(repo_root)
tasks_dir = get_tasks_dir(repo_root)
# Git info
_, branch_out, _ = _run_git_command(["branch", "--show-current"], cwd=repo_root)
branch = branch_out.strip() or "unknown"
_, status_out, _ = _run_git_command(["status", "--porcelain"], cwd=repo_root)
git_status_count = len([line for line in status_out.splitlines() if line.strip()])
_, log_out, _ = _run_git_command(["log", "--oneline", "-5"], cwd=repo_root)
commits = []
for line in log_out.splitlines():
if line.strip():
parts = line.split(" ", 1)
if len(parts) >= 2:
commits.append({"hash": parts[0], "message": parts[1]})
# My tasks
my_tasks = []
all_task_statuses: dict[str, str] = {}
if tasks_dir.is_dir():
for d in sorted(tasks_dir.iterdir()):
if d.is_dir() and d.name != "archive":
t_json = d / FILE_TASK_JSON
if t_json.is_file():
data = _read_json_file(t_json)
if data:
all_task_statuses[d.name] = data.get("status", "unknown")
if tasks_dir.is_dir():
for d in sorted(tasks_dir.iterdir()):
if d.is_dir() and d.name != "archive":
t_json = d / FILE_TASK_JSON
if t_json.is_file():
data = _read_json_file(t_json)
if data and data.get("assignee") == developer:
children_list = data.get("children", [])
done = sum(1 for c in children_list if all_task_statuses.get(c) in ("completed", "done"))
my_tasks.append({
"dir": d.name,
"title": data.get("title") or data.get("name") or "unknown",
"status": data.get("status", "unknown"),
"priority": data.get("priority", "P2"),
"children": children_list,
"childrenDone": done,
"parent": data.get("parent"),
"meta": data.get("meta", {}),
})
# Current task
current_task_info = None
current_task = get_current_task(repo_root)
if current_task:
task_json_path = (repo_root / current_task) / FILE_TASK_JSON
if task_json_path.is_file():
data = _read_json_file(task_json_path)
if data:
current_task_info = {
"path": current_task,
"name": data.get("name") or data.get("id") or "unknown",
"status": data.get("status", "unknown"),
}
return {
"developer": developer or "",
"git": {
"branch": branch,
"isClean": git_status_count == 0,
"uncommittedChanges": git_status_count,
"recentCommits": commits,
},
"myTasks": my_tasks,
"currentTask": current_task_info,
}
def get_context_text_record(repo_root: Path | None = None) -> str:
"""Get context as formatted text for record-session mode.
Focused output: MY ACTIVE TASKS first (with [!!!] emphasis),
then GIT STATUS, RECENT COMMITS, CURRENT TASK.
Args:
repo_root: Repository root path. Defaults to auto-detected.
Returns:
Formatted text output for record-session.
"""
if repo_root is None:
repo_root = get_repo_root()
lines: list[str] = []
lines.append("========================================")
lines.append("SESSION CONTEXT (RECORD MODE)")
lines.append("========================================")
lines.append("")
developer = get_developer(repo_root)
if not developer:
lines.append(
f"ERROR: Not initialized. Run: python3 ./{DIR_WORKFLOW}/{DIR_SCRIPTS}/init_developer.py <name>"
)
return "\n".join(lines)
# MY ACTIVE TASKS — first and prominent
lines.append(f"## [!!!] MY ACTIVE TASKS (Assigned to {developer})")
lines.append("[!] Review whether any should be archived before recording this session.")
lines.append("")
tasks_dir = get_tasks_dir(repo_root)
my_task_count = 0
# Collect task data for children progress
all_task_statuses: dict[str, str] = {}
if tasks_dir.is_dir():
for d in sorted(tasks_dir.iterdir()):
if d.is_dir() and d.name != "archive":
t_json = d / FILE_TASK_JSON
if t_json.is_file():
data = _read_json_file(t_json)
if data:
all_task_statuses[d.name] = data.get("status", "unknown")
def _record_children_progress(children_list: list[str]) -> str:
if not children_list:
return ""
done = 0
for c in children_list:
if all_task_statuses.get(c) in ("completed", "done"):
done += 1
return f" [{done}/{len(children_list)} done]"
if tasks_dir.is_dir():
for d in sorted(tasks_dir.iterdir()):
if d.is_dir() and d.name != "archive":
t_json = d / FILE_TASK_JSON
if t_json.is_file():
data = _read_json_file(t_json)
if data:
assignee = data.get("assignee", "")
status = data.get("status", "planning")
if assignee == developer:
title = data.get("title") or data.get("name") or "unknown"
priority = data.get("priority", "P2")
children_list = data.get("children", [])
progress = _record_children_progress(children_list) if children_list else ""
lines.append(f"- [{priority}] {title} ({status}){progress}{d.name}")
my_task_count += 1
if my_task_count == 0:
lines.append("(no active tasks assigned to you)")
lines.append("")
# GIT STATUS
lines.append("## GIT STATUS")
_, branch_out, _ = _run_git_command(["branch", "--show-current"], cwd=repo_root)
branch = branch_out.strip() or "unknown"
lines.append(f"Branch: {branch}")
_, status_out, _ = _run_git_command(["status", "--porcelain"], cwd=repo_root)
status_lines = [line for line in status_out.splitlines() if line.strip()]
status_count = len(status_lines)
if status_count == 0:
lines.append("Working directory: Clean")
else:
lines.append(f"Working directory: {status_count} uncommitted change(s)")
lines.append("")
lines.append("Changes:")
_, short_out, _ = _run_git_command(["status", "--short"], cwd=repo_root)
for line in short_out.splitlines()[:10]:
lines.append(line)
lines.append("")
# RECENT COMMITS
lines.append("## RECENT COMMITS")
_, log_out, _ = _run_git_command(["log", "--oneline", "-5"], cwd=repo_root)
if log_out.strip():
for line in log_out.splitlines():
lines.append(line)
else:
lines.append("(no commits)")
lines.append("")
# CURRENT TASK
lines.append("## CURRENT TASK")
current_task = get_current_task(repo_root)
if current_task:
current_task_dir = repo_root / current_task
task_json_path = current_task_dir / FILE_TASK_JSON
lines.append(f"Path: {current_task}")
if task_json_path.is_file():
data = _read_json_file(task_json_path)
if data:
t_name = data.get("name") or data.get("id") or "unknown"
t_status = data.get("status", "unknown")
lines.append(f"Name: {t_name}")
lines.append(f"Status: {t_status}")
else:
lines.append("(none)")
lines.append("")
lines.append("========================================")
return "\n".join(lines)
def output_text(repo_root: Path | None = None) -> None:
"""Output context in text format.
Args:
repo_root: Repository root path. Defaults to auto-detected.
"""
print(get_context_text(repo_root))
# =============================================================================
# Main Entry
# =============================================================================
def main() -> None:
"""CLI entry point."""
import argparse
parser = argparse.ArgumentParser(description="Get Session Context for AI Agent")
parser.add_argument(
"--json",
"-j",
action="store_true",
help="Output in JSON format (works with any --mode)",
)
parser.add_argument(
"--mode",
"-m",
choices=["default", "record"],
default="default",
help="Output mode: default (full context) or record (for record-session)",
)
args = parser.parse_args()
if args.mode == "record":
if args.json:
print(json.dumps(get_context_record_json(), indent=2, ensure_ascii=False))
else:
print(get_context_text_record())
else:
if args.json:
output_json()
else:
output_text()
if __name__ == "__main__":
main()
+347
View File
@@ -0,0 +1,347 @@
#!/usr/bin/env python3
"""
Common path utilities for Trellis workflow.
Provides:
get_repo_root - Get repository root directory
get_developer - Get developer name
get_workspace_dir - Get developer workspace directory
get_tasks_dir - Get tasks directory
get_active_journal_file - Get current journal file
"""
from __future__ import annotations
import re
from datetime import datetime
from pathlib import Path
# =============================================================================
# Path Constants (change here to rename directories)
# =============================================================================
# Directory names
DIR_WORKFLOW = ".trellis"
DIR_WORKSPACE = "workspace"
DIR_TASKS = "tasks"
DIR_ARCHIVE = "archive"
DIR_SPEC = "spec"
DIR_SCRIPTS = "scripts"
# File names
FILE_DEVELOPER = ".developer"
FILE_CURRENT_TASK = ".current-task"
FILE_TASK_JSON = "task.json"
FILE_JOURNAL_PREFIX = "journal-"
# =============================================================================
# Repository Root
# =============================================================================
def get_repo_root(start_path: Path | None = None) -> Path:
"""Find the nearest directory containing .trellis/ folder.
This handles nested git repos correctly (e.g., test project inside another repo).
Args:
start_path: Starting directory to search from. Defaults to current directory.
Returns:
Path to repository root, or current directory if no .trellis/ found.
"""
current = (start_path or Path.cwd()).resolve()
while current != current.parent:
if (current / DIR_WORKFLOW).is_dir():
return current
current = current.parent
# Fallback to current directory if no .trellis/ found
return Path.cwd().resolve()
# =============================================================================
# Developer
# =============================================================================
def get_developer(repo_root: Path | None = None) -> str | None:
"""Get developer name from .developer file.
Args:
repo_root: Repository root path. Defaults to auto-detected.
Returns:
Developer name or None if not initialized.
"""
if repo_root is None:
repo_root = get_repo_root()
dev_file = repo_root / DIR_WORKFLOW / FILE_DEVELOPER
if not dev_file.is_file():
return None
try:
content = dev_file.read_text(encoding="utf-8")
for line in content.splitlines():
if line.startswith("name="):
return line.split("=", 1)[1].strip()
except (OSError, IOError):
pass
return None
def check_developer(repo_root: Path | None = None) -> bool:
"""Check if developer is initialized.
Args:
repo_root: Repository root path. Defaults to auto-detected.
Returns:
True if developer is initialized.
"""
return get_developer(repo_root) is not None
# =============================================================================
# Tasks Directory
# =============================================================================
def get_tasks_dir(repo_root: Path | None = None) -> Path:
"""Get tasks directory path.
Args:
repo_root: Repository root path. Defaults to auto-detected.
Returns:
Path to tasks directory.
"""
if repo_root is None:
repo_root = get_repo_root()
return repo_root / DIR_WORKFLOW / DIR_TASKS
# =============================================================================
# Workspace Directory
# =============================================================================
def get_workspace_dir(repo_root: Path | None = None) -> Path | None:
"""Get developer workspace directory.
Args:
repo_root: Repository root path. Defaults to auto-detected.
Returns:
Path to workspace directory or None if developer not set.
"""
if repo_root is None:
repo_root = get_repo_root()
developer = get_developer(repo_root)
if developer:
return repo_root / DIR_WORKFLOW / DIR_WORKSPACE / developer
return None
# =============================================================================
# Journal File
# =============================================================================
def get_active_journal_file(repo_root: Path | None = None) -> Path | None:
"""Get the current active journal file.
Args:
repo_root: Repository root path. Defaults to auto-detected.
Returns:
Path to active journal file or None if not found.
"""
if repo_root is None:
repo_root = get_repo_root()
workspace_dir = get_workspace_dir(repo_root)
if workspace_dir is None or not workspace_dir.is_dir():
return None
latest: Path | None = None
highest = 0
for f in workspace_dir.glob(f"{FILE_JOURNAL_PREFIX}*.md"):
if not f.is_file():
continue
# Extract number from filename
name = f.stem # e.g., "journal-1"
match = re.search(r"(\d+)$", name)
if match:
num = int(match.group(1))
if num > highest:
highest = num
latest = f
return latest
def count_lines(file_path: Path) -> int:
"""Count lines in a file.
Args:
file_path: Path to file.
Returns:
Number of lines, or 0 if file doesn't exist.
"""
if not file_path.is_file():
return 0
try:
return len(file_path.read_text(encoding="utf-8").splitlines())
except (OSError, IOError):
return 0
# =============================================================================
# Current Task Management
# =============================================================================
def _get_current_task_file(repo_root: Path | None = None) -> Path:
"""Get .current-task file path.
Args:
repo_root: Repository root path. Defaults to auto-detected.
Returns:
Path to .current-task file.
"""
if repo_root is None:
repo_root = get_repo_root()
return repo_root / DIR_WORKFLOW / FILE_CURRENT_TASK
def get_current_task(repo_root: Path | None = None) -> str | None:
"""Get current task directory path (relative to repo_root).
Args:
repo_root: Repository root path. Defaults to auto-detected.
Returns:
Relative path to current task directory or None.
"""
current_file = _get_current_task_file(repo_root)
if not current_file.is_file():
return None
try:
return current_file.read_text(encoding="utf-8").strip()
except (OSError, IOError):
return None
def get_current_task_abs(repo_root: Path | None = None) -> Path | None:
"""Get current task directory absolute path.
Args:
repo_root: Repository root path. Defaults to auto-detected.
Returns:
Absolute path to current task directory or None.
"""
if repo_root is None:
repo_root = get_repo_root()
relative = get_current_task(repo_root)
if relative:
return repo_root / relative
return None
def set_current_task(task_path: str, repo_root: Path | None = None) -> bool:
"""Set current task.
Args:
task_path: Task directory path (relative to repo_root).
repo_root: Repository root path. Defaults to auto-detected.
Returns:
True on success, False on error.
"""
if repo_root is None:
repo_root = get_repo_root()
if not task_path:
return False
# Verify task directory exists
full_path = repo_root / task_path
if not full_path.is_dir():
return False
current_file = _get_current_task_file(repo_root)
try:
current_file.write_text(task_path, encoding="utf-8")
return True
except (OSError, IOError):
return False
def clear_current_task(repo_root: Path | None = None) -> bool:
"""Clear current task.
Args:
repo_root: Repository root path. Defaults to auto-detected.
Returns:
True on success.
"""
current_file = _get_current_task_file(repo_root)
try:
if current_file.is_file():
current_file.unlink()
return True
except (OSError, IOError):
return False
def has_current_task(repo_root: Path | None = None) -> bool:
"""Check if has current task.
Args:
repo_root: Repository root path. Defaults to auto-detected.
Returns:
True if current task is set.
"""
return get_current_task(repo_root) is not None
# =============================================================================
# Task ID Generation
# =============================================================================
def generate_task_date_prefix() -> str:
"""Generate task ID based on date (MM-DD format).
Returns:
Date prefix string (e.g., "01-21").
"""
return datetime.now().strftime("%m-%d")
# =============================================================================
# Main Entry (for testing)
# =============================================================================
if __name__ == "__main__":
repo = get_repo_root()
print(f"Repository root: {repo}")
print(f"Developer: {get_developer(repo)}")
print(f"Tasks dir: {get_tasks_dir(repo)}")
print(f"Workspace dir: {get_workspace_dir(repo)}")
print(f"Journal file: {get_active_journal_file(repo)}")
print(f"Current task: {get_current_task(repo)}")
+253
View File
@@ -0,0 +1,253 @@
#!/usr/bin/env python3
"""
Phase Management Utilities.
Centralized phase tracking for multi-agent pipeline.
Provides:
get_current_phase - Returns current phase number
get_total_phases - Returns total phase count
get_phase_action - Returns action name for phase
get_phase_info - Returns "N/M (action)" format
set_phase - Sets current_phase
advance_phase - Advances to next phase
get_phase_for_action - Returns phase number for action
map_subagent_to_action - Map subagent type to action name
is_phase_completed - Check if phase is completed
is_current_action - Check if at specific action
"""
from __future__ import annotations
import json
from pathlib import Path
def _read_json_file(path: Path) -> dict | None:
"""Read and parse a JSON file."""
try:
return json.loads(path.read_text(encoding="utf-8"))
except (FileNotFoundError, json.JSONDecodeError, OSError):
return None
def _write_json_file(path: Path, data: dict) -> bool:
"""Write dict to JSON file."""
try:
path.write_text(json.dumps(data, indent=2, ensure_ascii=False), encoding="utf-8")
return True
except (OSError, IOError):
return False
# =============================================================================
# Phase Functions
# =============================================================================
def get_current_phase(task_json: Path) -> int:
"""Get current phase number.
Args:
task_json: Path to task.json file.
Returns:
Current phase number, or 0 if not found.
"""
data = _read_json_file(task_json)
if not data:
return 0
return data.get("current_phase", 0) or 0
def get_total_phases(task_json: Path) -> int:
"""Get total number of phases.
Args:
task_json: Path to task.json file.
Returns:
Total phase count, or 0 if not found.
"""
data = _read_json_file(task_json)
if not data:
return 0
next_action = data.get("next_action", [])
if isinstance(next_action, list):
return len(next_action)
return 0
def get_phase_action(task_json: Path, phase: int) -> str:
"""Get action name for a specific phase.
Args:
task_json: Path to task.json file.
phase: Phase number.
Returns:
Action name, or "unknown" if not found.
"""
data = _read_json_file(task_json)
if not data:
return "unknown"
next_action = data.get("next_action", [])
if isinstance(next_action, list):
for item in next_action:
if isinstance(item, dict) and item.get("phase") == phase:
return item.get("action", "unknown")
return "unknown"
def get_phase_info(task_json: Path) -> str:
"""Get formatted phase info: "N/M (action)".
Args:
task_json: Path to task.json file.
Returns:
Formatted string like "1/4 (implement)".
"""
data = _read_json_file(task_json)
if not data:
return "N/A"
current_phase = data.get("current_phase", 0) or 0
total_phases = get_total_phases(task_json)
action_name = get_phase_action(task_json, current_phase)
if current_phase == 0 or current_phase is None:
return f"0/{total_phases} (pending)"
else:
return f"{current_phase}/{total_phases} ({action_name})"
def set_phase(task_json: Path, phase: int) -> bool:
"""Set current phase to a specific value.
Args:
task_json: Path to task.json file.
phase: Phase number to set.
Returns:
True on success, False on error.
"""
data = _read_json_file(task_json)
if not data:
return False
data["current_phase"] = phase
return _write_json_file(task_json, data)
def advance_phase(task_json: Path) -> bool:
"""Advance to next phase.
Args:
task_json: Path to task.json file.
Returns:
True on success, False on error or at final phase.
"""
data = _read_json_file(task_json)
if not data:
return False
current = data.get("current_phase", 0) or 0
total = get_total_phases(task_json)
next_phase = current + 1
if next_phase > total:
return False # Already at final phase
data["current_phase"] = next_phase
return _write_json_file(task_json, data)
def get_phase_for_action(task_json: Path, action: str) -> int:
"""Get phase number for a specific action name.
Args:
task_json: Path to task.json file.
action: Action name.
Returns:
Phase number, or 0 if not found.
"""
data = _read_json_file(task_json)
if not data:
return 0
next_action = data.get("next_action", [])
if isinstance(next_action, list):
for item in next_action:
if isinstance(item, dict) and item.get("action") == action:
return item.get("phase", 0)
return 0
def map_subagent_to_action(subagent_type: str) -> str:
"""Map subagent type to action name.
Used by hooks to determine which action a subagent corresponds to.
Args:
subagent_type: Subagent type string.
Returns:
Corresponding action name.
"""
mapping = {
"implement": "implement",
"check": "check",
"debug": "debug",
"research": "research",
}
return mapping.get(subagent_type, subagent_type)
def is_phase_completed(task_json: Path, phase: int) -> bool:
"""Check if a phase is completed (current_phase > phase).
Args:
task_json: Path to task.json file.
phase: Phase number to check.
Returns:
True if phase is completed.
"""
current = get_current_phase(task_json)
return current > phase
def is_current_action(task_json: Path, action: str) -> bool:
"""Check if we're at a specific action.
Args:
task_json: Path to task.json file.
action: Action name to check.
Returns:
True if current phase matches the action.
"""
current = get_current_phase(task_json)
action_phase = get_phase_for_action(task_json, action)
return current == action_phase
# =============================================================================
# Main Entry (for testing)
# =============================================================================
if __name__ == "__main__":
import sys
if len(sys.argv) > 1:
path = Path(sys.argv[1])
print(f"Task JSON: {path}")
print(f"Phase info: {get_phase_info(path)}")
print(f"Current phase: {get_current_phase(path)}")
print(f"Total phases: {get_total_phases(path)}")
else:
print("Usage: python3 phase.py <task.json>")
+366
View File
@@ -0,0 +1,366 @@
#!/usr/bin/env python3
"""
Registry utility functions for multi-agent pipeline.
Provides:
registry_get_file - Get registry file path
registry_get_agent_by_id - Find agent by ID
registry_get_agent_by_worktree - Find agent by worktree path
registry_get_task_dir - Get task dir for a worktree
registry_remove_by_id - Remove agent by ID
registry_remove_by_worktree - Remove agent by worktree path
registry_add_agent - Add agent to registry
registry_search_agent - Search agent by ID or task_dir
registry_list_agents - List all agents
"""
from __future__ import annotations
import json
from datetime import datetime
from pathlib import Path
from .paths import get_repo_root
from .worktree import get_agents_dir
def _read_json_file(path: Path) -> dict | None:
"""Read and parse a JSON file."""
try:
return json.loads(path.read_text(encoding="utf-8"))
except (FileNotFoundError, json.JSONDecodeError, OSError):
return None
def _write_json_file(path: Path, data: dict) -> bool:
"""Write dict to JSON file."""
try:
path.write_text(json.dumps(data, indent=2, ensure_ascii=False), encoding="utf-8")
return True
except (OSError, IOError):
return False
# =============================================================================
# Registry File Access
# =============================================================================
def registry_get_file(repo_root: Path | None = None) -> Path | None:
"""Get registry file path.
Args:
repo_root: Repository root path. Defaults to auto-detected.
Returns:
Path to registry.json, or None if agents dir not found.
"""
if repo_root is None:
repo_root = get_repo_root()
agents_dir = get_agents_dir(repo_root)
if agents_dir:
return agents_dir / "registry.json"
return None
def _ensure_registry(repo_root: Path | None = None) -> Path | None:
"""Ensure registry file exists with valid structure.
Args:
repo_root: Repository root path. Defaults to auto-detected.
Returns:
Path to registry file, or None if cannot create.
"""
if repo_root is None:
repo_root = get_repo_root()
registry_file = registry_get_file(repo_root)
if not registry_file:
return None
agents_dir = registry_file.parent
try:
agents_dir.mkdir(parents=True, exist_ok=True)
if not registry_file.exists():
_write_json_file(registry_file, {"agents": []})
return registry_file
except (OSError, IOError):
return None
# =============================================================================
# Agent Lookup
# =============================================================================
def registry_get_agent_by_id(
agent_id: str,
repo_root: Path | None = None
) -> dict | None:
"""Get agent by ID.
Args:
agent_id: Agent ID.
repo_root: Repository root path. Defaults to auto-detected.
Returns:
Agent dict, or None if not found.
"""
if repo_root is None:
repo_root = get_repo_root()
registry_file = registry_get_file(repo_root)
if not registry_file or not registry_file.is_file():
return None
data = _read_json_file(registry_file)
if not data:
return None
for agent in data.get("agents", []):
if agent.get("id") == agent_id:
return agent
return None
def registry_get_agent_by_worktree(
worktree_path: str,
repo_root: Path | None = None
) -> dict | None:
"""Get agent by worktree path.
Args:
worktree_path: Worktree path.
repo_root: Repository root path. Defaults to auto-detected.
Returns:
Agent dict, or None if not found.
"""
if repo_root is None:
repo_root = get_repo_root()
registry_file = registry_get_file(repo_root)
if not registry_file or not registry_file.is_file():
return None
data = _read_json_file(registry_file)
if not data:
return None
for agent in data.get("agents", []):
if agent.get("worktree_path") == worktree_path:
return agent
return None
def registry_search_agent(
search: str,
repo_root: Path | None = None
) -> dict | None:
"""Search agent by ID or task_dir containing search term.
Args:
search: Search term.
repo_root: Repository root path. Defaults to auto-detected.
Returns:
First matching agent dict, or None if not found.
"""
if repo_root is None:
repo_root = get_repo_root()
registry_file = registry_get_file(repo_root)
if not registry_file or not registry_file.is_file():
return None
data = _read_json_file(registry_file)
if not data:
return None
for agent in data.get("agents", []):
# Exact ID match
if agent.get("id") == search:
return agent
# Partial match on task_dir
task_dir = agent.get("task_dir", "")
if search in task_dir:
return agent
return None
def registry_get_task_dir(
worktree_path: str,
repo_root: Path | None = None
) -> str | None:
"""Get task directory for a worktree.
Args:
worktree_path: Worktree path.
repo_root: Repository root path. Defaults to auto-detected.
Returns:
Task directory path, or None if not found.
"""
agent = registry_get_agent_by_worktree(worktree_path, repo_root)
if agent:
return agent.get("task_dir")
return None
# =============================================================================
# Agent Modification
# =============================================================================
def registry_remove_by_id(agent_id: str, repo_root: Path | None = None) -> bool:
"""Remove agent by ID.
Args:
agent_id: Agent ID.
repo_root: Repository root path. Defaults to auto-detected.
Returns:
True on success.
"""
if repo_root is None:
repo_root = get_repo_root()
registry_file = registry_get_file(repo_root)
if not registry_file or not registry_file.is_file():
return True # Nothing to remove
data = _read_json_file(registry_file)
if not data:
return True
agents = data.get("agents", [])
data["agents"] = [a for a in agents if a.get("id") != agent_id]
return _write_json_file(registry_file, data)
def registry_remove_by_worktree(
worktree_path: str,
repo_root: Path | None = None
) -> bool:
"""Remove agent by worktree path.
Args:
worktree_path: Worktree path.
repo_root: Repository root path. Defaults to auto-detected.
Returns:
True on success.
"""
if repo_root is None:
repo_root = get_repo_root()
registry_file = registry_get_file(repo_root)
if not registry_file or not registry_file.is_file():
return True # Nothing to remove
data = _read_json_file(registry_file)
if not data:
return True
agents = data.get("agents", [])
data["agents"] = [a for a in agents if a.get("worktree_path") != worktree_path]
return _write_json_file(registry_file, data)
def registry_add_agent(
agent_id: str,
worktree_path: str,
pid: int,
task_dir: str,
repo_root: Path | None = None,
platform: str = "claude",
) -> bool:
"""Add agent to registry (replaces if same ID exists).
Args:
agent_id: Agent ID.
worktree_path: Worktree path.
pid: Process ID.
task_dir: Task directory path.
repo_root: Repository root path. Defaults to auto-detected.
platform: Platform used (e.g., 'claude', 'opencode', 'codex', 'kiro', 'antigravity'). Defaults to 'claude'.
Returns:
True on success.
"""
if repo_root is None:
repo_root = get_repo_root()
registry_file = _ensure_registry(repo_root)
if not registry_file:
return False
data = _read_json_file(registry_file)
if not data:
data = {"agents": []}
# Remove existing agent with same ID
agents = data.get("agents", [])
agents = [a for a in agents if a.get("id") != agent_id]
# Create new agent record
started_at = datetime.now().isoformat()
new_agent = {
"id": agent_id,
"worktree_path": worktree_path,
"pid": pid,
"started_at": started_at,
"task_dir": task_dir,
"platform": platform,
}
agents.append(new_agent)
data["agents"] = agents
return _write_json_file(registry_file, data)
def registry_list_agents(repo_root: Path | None = None) -> list[dict]:
"""List all agents.
Args:
repo_root: Repository root path. Defaults to auto-detected.
Returns:
List of agent dicts.
"""
if repo_root is None:
repo_root = get_repo_root()
registry_file = registry_get_file(repo_root)
if not registry_file or not registry_file.is_file():
return []
data = _read_json_file(registry_file)
if not data:
return []
return data.get("agents", [])
# =============================================================================
# Main Entry (for testing)
# =============================================================================
if __name__ == "__main__":
import json as json_mod
repo = get_repo_root()
print(f"Repository root: {repo}")
print(f"Registry file: {registry_get_file(repo)}")
print()
print("Agents:")
agents = registry_list_agents(repo)
print(json_mod.dumps(agents, indent=2))
+259
View File
@@ -0,0 +1,259 @@
#!/usr/bin/env python3
"""
Task queue utility functions.
Provides:
list_tasks_by_status - List tasks by status
list_pending_tasks - List tasks with pending status
list_tasks_by_assignee - List tasks by assignee
list_my_tasks - List tasks assigned to current developer
get_task_stats - Get P0/P1/P2/P3 counts
"""
from __future__ import annotations
import json
from pathlib import Path
from .paths import (
FILE_TASK_JSON,
get_repo_root,
get_developer,
get_tasks_dir,
)
def _read_json_file(path: Path) -> dict | None:
"""Read and parse a JSON file."""
try:
return json.loads(path.read_text(encoding="utf-8"))
except (FileNotFoundError, json.JSONDecodeError, OSError):
return None
# =============================================================================
# Public Functions
# =============================================================================
def list_tasks_by_status(
filter_status: str | None = None,
repo_root: Path | None = None
) -> list[dict]:
"""List tasks by status.
Args:
filter_status: Optional status filter.
repo_root: Repository root path. Defaults to auto-detected.
Returns:
List of task info dicts with keys: priority, id, title, status, assignee.
"""
if repo_root is None:
repo_root = get_repo_root()
tasks_dir = get_tasks_dir(repo_root)
results = []
if not tasks_dir.is_dir():
return results
for d in tasks_dir.iterdir():
if not d.is_dir() or d.name == "archive":
continue
task_json = d / FILE_TASK_JSON
if not task_json.is_file():
continue
data = _read_json_file(task_json)
if not data:
continue
task_id = data.get("id", "")
title = data.get("title") or data.get("name", "")
priority = data.get("priority", "P2")
status = data.get("status", "planning")
assignee = data.get("assignee", "-")
# Apply filter
if filter_status and status != filter_status:
continue
results.append({
"priority": priority,
"id": task_id,
"title": title,
"status": status,
"assignee": assignee,
"dir": d.name,
"children": data.get("children", []),
"parent": data.get("parent"),
})
return results
def list_pending_tasks(repo_root: Path | None = None) -> list[dict]:
"""List pending tasks.
Args:
repo_root: Repository root path. Defaults to auto-detected.
Returns:
List of task info dicts.
"""
return list_tasks_by_status("planning", repo_root)
def list_tasks_by_assignee(
assignee: str,
filter_status: str | None = None,
repo_root: Path | None = None
) -> list[dict]:
"""List tasks assigned to a specific developer.
Args:
assignee: Developer name.
filter_status: Optional status filter.
repo_root: Repository root path. Defaults to auto-detected.
Returns:
List of task info dicts.
"""
if repo_root is None:
repo_root = get_repo_root()
tasks_dir = get_tasks_dir(repo_root)
results = []
if not tasks_dir.is_dir():
return results
for d in tasks_dir.iterdir():
if not d.is_dir() or d.name == "archive":
continue
task_json = d / FILE_TASK_JSON
if not task_json.is_file():
continue
data = _read_json_file(task_json)
if not data:
continue
task_assignee = data.get("assignee", "-")
# Apply assignee filter
if task_assignee != assignee:
continue
task_id = data.get("id", "")
title = data.get("title") or data.get("name", "")
priority = data.get("priority", "P2")
status = data.get("status", "planning")
# Apply status filter
if filter_status and status != filter_status:
continue
results.append({
"priority": priority,
"id": task_id,
"title": title,
"status": status,
"assignee": task_assignee,
"dir": d.name,
"children": data.get("children", []),
"parent": data.get("parent"),
})
return results
def list_my_tasks(
filter_status: str | None = None,
repo_root: Path | None = None
) -> list[dict]:
"""List tasks assigned to current developer.
Args:
filter_status: Optional status filter.
repo_root: Repository root path. Defaults to auto-detected.
Returns:
List of task info dicts.
Raises:
ValueError: If developer not set.
"""
if repo_root is None:
repo_root = get_repo_root()
developer = get_developer(repo_root)
if not developer:
raise ValueError("Developer not set")
return list_tasks_by_assignee(developer, filter_status, repo_root)
def get_task_stats(repo_root: Path | None = None) -> dict[str, int]:
"""Get task statistics.
Args:
repo_root: Repository root path. Defaults to auto-detected.
Returns:
Dict with keys: P0, P1, P2, P3, Total.
"""
if repo_root is None:
repo_root = get_repo_root()
tasks_dir = get_tasks_dir(repo_root)
stats = {"P0": 0, "P1": 0, "P2": 0, "P3": 0, "Total": 0}
if not tasks_dir.is_dir():
return stats
for d in tasks_dir.iterdir():
if not d.is_dir() or d.name == "archive":
continue
task_json = d / FILE_TASK_JSON
if not task_json.is_file():
continue
data = _read_json_file(task_json)
if not data:
continue
priority = data.get("priority", "P2")
if priority in stats:
stats[priority] += 1
stats["Total"] += 1
return stats
def format_task_stats(stats: dict[str, int]) -> str:
"""Format task stats as string.
Args:
stats: Stats dict from get_task_stats.
Returns:
Formatted string like "P0:0 P1:1 P2:2 P3:0 Total:3".
"""
return f"P0:{stats['P0']} P1:{stats['P1']} P2:{stats['P2']} P3:{stats['P3']} Total:{stats['Total']}"
# =============================================================================
# Main Entry (for testing)
# =============================================================================
if __name__ == "__main__":
stats = get_task_stats()
print(format_task_stats(stats))
print()
print("Pending tasks:")
for task in list_pending_tasks():
print(f" {task['priority']}|{task['id']}|{task['title']}|{task['status']}|{task['assignee']}")
+178
View File
@@ -0,0 +1,178 @@
#!/usr/bin/env python3
"""
Task utility functions.
Provides:
is_safe_task_path - Validate task path is safe to operate on
find_task_by_name - Find task directory by name
archive_task_dir - Archive task to monthly directory
"""
from __future__ import annotations
import shutil
import sys
from datetime import datetime
from pathlib import Path
from .paths import get_repo_root
# =============================================================================
# Path Safety
# =============================================================================
def is_safe_task_path(task_path: str, repo_root: Path | None = None) -> bool:
"""Check if a relative task path is safe to operate on.
Args:
task_path: Task path (relative to repo_root).
repo_root: Repository root path. Defaults to auto-detected.
Returns:
True if safe, False if dangerous.
"""
if repo_root is None:
repo_root = get_repo_root()
# Check empty or null
if not task_path or task_path == "null":
print("Error: empty or null task path", file=sys.stderr)
return False
# Reject absolute paths
if task_path.startswith("/"):
print(f"Error: absolute path not allowed: {task_path}", file=sys.stderr)
return False
# Reject ".", "..", paths starting with "./" or "../", or containing ".."
if task_path in (".", "..") or task_path.startswith("./") or task_path.startswith("../") or ".." in task_path:
print(f"Error: path traversal not allowed: {task_path}", file=sys.stderr)
return False
# Final check: ensure resolved path is not the repo root
abs_path = repo_root / task_path
if abs_path.exists():
try:
resolved = abs_path.resolve()
root_resolved = repo_root.resolve()
if resolved == root_resolved:
print(f"Error: path resolves to repo root: {task_path}", file=sys.stderr)
return False
except (OSError, IOError):
pass
return True
# =============================================================================
# Task Lookup
# =============================================================================
def find_task_by_name(task_name: str, tasks_dir: Path) -> Path | None:
"""Find task directory by name (exact or suffix match).
Args:
task_name: Task name to find.
tasks_dir: Tasks directory path.
Returns:
Absolute path to task directory, or None if not found.
"""
if not task_name or not tasks_dir or not tasks_dir.is_dir():
return None
# Try exact match first
exact_match = tasks_dir / task_name
if exact_match.is_dir():
return exact_match
# Try suffix match (e.g., "my-task" matches "01-21-my-task")
for d in tasks_dir.iterdir():
if d.is_dir() and d.name.endswith(f"-{task_name}"):
return d
return None
# =============================================================================
# Archive Operations
# =============================================================================
def archive_task_dir(task_dir_abs: Path, repo_root: Path | None = None) -> Path | None:
"""Archive a task directory to archive/{YYYY-MM}/.
Args:
task_dir_abs: Absolute path to task directory.
repo_root: Repository root path. Defaults to auto-detected.
Returns:
Path to archived directory, or None on error.
"""
if not task_dir_abs.is_dir():
print(f"Error: task directory not found: {task_dir_abs}", file=sys.stderr)
return None
# Get tasks directory (parent of the task)
tasks_dir = task_dir_abs.parent
archive_dir = tasks_dir / "archive"
year_month = datetime.now().strftime("%Y-%m")
month_dir = archive_dir / year_month
# Create archive directory
try:
month_dir.mkdir(parents=True, exist_ok=True)
except (OSError, IOError) as e:
print(f"Error: Failed to create archive directory: {e}", file=sys.stderr)
return None
# Move task to archive
task_name = task_dir_abs.name
dest = month_dir / task_name
try:
shutil.move(str(task_dir_abs), str(dest))
except (OSError, IOError, shutil.Error) as e:
print(f"Error: Failed to move task to archive: {e}", file=sys.stderr)
return None
return dest
def archive_task_complete(
task_dir_abs: Path,
repo_root: Path | None = None
) -> dict[str, str]:
"""Complete archive workflow: archive directory.
Args:
task_dir_abs: Absolute path to task directory.
repo_root: Repository root path. Defaults to auto-detected.
Returns:
Dict with archive result info.
"""
if not task_dir_abs.is_dir():
print(f"Error: task directory not found: {task_dir_abs}", file=sys.stderr)
return {}
archive_dest = archive_task_dir(task_dir_abs, repo_root)
if archive_dest:
return {"archived_to": str(archive_dest)}
return {}
# =============================================================================
# Main Entry (for testing)
# =============================================================================
if __name__ == "__main__":
from .paths import get_tasks_dir
repo = get_repo_root()
tasks = get_tasks_dir(repo)
print(f"Tasks dir: {tasks}")
print(f"is_safe_task_path('.trellis/tasks/test'): {is_safe_task_path('.trellis/tasks/test', repo)}")
print(f"is_safe_task_path('../test'): {is_safe_task_path('../test', repo)}")
+305
View File
@@ -0,0 +1,305 @@
#!/usr/bin/env python3
"""
Worktree utilities for Multi-Agent Pipeline.
Provides:
get_worktree_config - Get worktree.yaml path
get_worktree_base_dir - Get worktree storage directory
get_worktree_copy_files - Get files to copy list
get_worktree_post_create_hooks - Get post-create hooks
get_agents_dir - Get agents registry directory
"""
from __future__ import annotations
from pathlib import Path
from .paths import (
DIR_WORKFLOW,
get_repo_root,
get_workspace_dir,
)
# =============================================================================
# YAML Simple Parser (no dependencies)
# =============================================================================
def _unquote(s: str) -> str:
"""Remove exactly one layer of matching surrounding quotes.
Unlike str.strip('"'), this only removes the outermost pair,
preserving any nested quotes inside the value.
Examples:
_unquote('"hello"') -> 'hello'
_unquote("'hello'") -> 'hello'
_unquote('"echo \\'hi\\'"') -> "echo 'hi'"
_unquote('hello') -> 'hello'
_unquote('"hello\\'') -> '"hello\\'' (mismatched, unchanged)
"""
if len(s) >= 2 and s[0] == s[-1] and s[0] in ('"', "'"):
return s[1:-1]
return s
def parse_simple_yaml(content: str) -> dict:
"""Parse simple YAML with nested dict support (no dependencies).
Supports:
- key: value (string)
- key: (followed by list items)
- item1
- item2
- key: (followed by nested dict)
nested_key: value
nested_key2:
- item
Uses indentation to detect nesting (2+ spaces deeper = child).
Args:
content: YAML content string.
Returns:
Parsed dict (values can be str, list[str], or dict).
"""
lines = content.splitlines()
result: dict = {}
_parse_yaml_block(lines, 0, 0, result)
return result
def _parse_yaml_block(
lines: list[str], start: int, min_indent: int, target: dict
) -> int:
"""Parse a YAML block into target dict, returning next line index."""
i = start
current_list: list | None = None
while i < len(lines):
line = lines[i]
stripped = line.strip()
# Skip empty lines and comments
if not stripped or stripped.startswith("#"):
i += 1
continue
# Calculate indentation
indent = len(line) - len(line.lstrip())
# If dedented past our block, we're done
if indent < min_indent:
break
if stripped.startswith("- "):
if current_list is not None:
current_list.append(_unquote(stripped[2:].strip()))
i += 1
elif ":" in stripped:
key, _, value = stripped.partition(":")
key = key.strip()
value = _unquote(value.strip())
current_list = None
if value:
# key: value
target[key] = value
i += 1
else:
# key: (no value) — peek ahead to determine list vs nested dict
next_i, next_line = _next_content_line(lines, i + 1)
if next_i >= len(lines):
target[key] = {}
i = next_i
elif next_line.strip().startswith("- "):
# It's a list
current_list = []
target[key] = current_list
i += 1
else:
next_indent = len(next_line) - len(next_line.lstrip())
if next_indent > indent:
# It's a nested dict
nested: dict = {}
target[key] = nested
i = _parse_yaml_block(lines, i + 1, next_indent, nested)
else:
# Empty value, same or less indent follows
target[key] = {}
i += 1
else:
i += 1
return i
def _next_content_line(lines: list[str], start: int) -> tuple[int, str]:
"""Find the next non-empty, non-comment line."""
i = start
while i < len(lines):
stripped = lines[i].strip()
if stripped and not stripped.startswith("#"):
return i, lines[i]
i += 1
return i, ""
def _yaml_get_value(config_file: Path, key: str) -> str | None:
"""Read simple value from worktree.yaml.
Args:
config_file: Path to config file.
key: Key to read.
Returns:
Value string or None.
"""
try:
content = config_file.read_text(encoding="utf-8")
data = parse_simple_yaml(content)
value = data.get(key)
if isinstance(value, str):
return value
except (OSError, IOError):
pass
return None
def _yaml_get_list(config_file: Path, section: str) -> list[str]:
"""Read list from worktree.yaml.
Args:
config_file: Path to config file.
section: Section name.
Returns:
List of items.
"""
try:
content = config_file.read_text(encoding="utf-8")
data = parse_simple_yaml(content)
value = data.get(section)
if isinstance(value, list):
return [str(item) for item in value]
except (OSError, IOError):
pass
return []
# =============================================================================
# Worktree Configuration
# =============================================================================
# Worktree config file relative path (relative to repo root)
WORKTREE_CONFIG_PATH = f"{DIR_WORKFLOW}/worktree.yaml"
def get_worktree_config(repo_root: Path | None = None) -> Path:
"""Get worktree.yaml config file path.
Args:
repo_root: Repository root path. Defaults to auto-detected.
Returns:
Absolute path to config file.
"""
if repo_root is None:
repo_root = get_repo_root()
return repo_root / WORKTREE_CONFIG_PATH
def get_worktree_base_dir(repo_root: Path | None = None) -> Path:
"""Get worktree base directory.
Args:
repo_root: Repository root path. Defaults to auto-detected.
Returns:
Absolute path to worktree base directory.
"""
if repo_root is None:
repo_root = get_repo_root()
config = get_worktree_config(repo_root)
worktree_dir = _yaml_get_value(config, "worktree_dir")
# Default value
if not worktree_dir:
worktree_dir = "../worktrees"
# Handle relative path
if worktree_dir.startswith("../") or worktree_dir.startswith("./"):
# Relative to repo_root
return repo_root / worktree_dir
else:
# Absolute path
return Path(worktree_dir)
def get_worktree_copy_files(repo_root: Path | None = None) -> list[str]:
"""Get files to copy list.
Args:
repo_root: Repository root path. Defaults to auto-detected.
Returns:
List of file paths to copy.
"""
if repo_root is None:
repo_root = get_repo_root()
config = get_worktree_config(repo_root)
return _yaml_get_list(config, "copy")
def get_worktree_post_create_hooks(repo_root: Path | None = None) -> list[str]:
"""Get post_create hooks.
Args:
repo_root: Repository root path. Defaults to auto-detected.
Returns:
List of commands to run.
"""
if repo_root is None:
repo_root = get_repo_root()
config = get_worktree_config(repo_root)
return _yaml_get_list(config, "post_create")
# =============================================================================
# Agents Registry
# =============================================================================
def get_agents_dir(repo_root: Path | None = None) -> Path | None:
"""Get agents directory for current developer.
Args:
repo_root: Repository root path. Defaults to auto-detected.
Returns:
Absolute path to agents directory, or None if no workspace.
"""
if repo_root is None:
repo_root = get_repo_root()
workspace_dir = get_workspace_dir(repo_root)
if workspace_dir:
return workspace_dir / ".agents"
return None
# =============================================================================
# Main Entry (for testing)
# =============================================================================
if __name__ == "__main__":
repo = get_repo_root()
print(f"Repository root: {repo}")
print(f"Worktree config: {get_worktree_config(repo)}")
print(f"Worktree base dir: {get_worktree_base_dir(repo)}")
print(f"Copy files: {get_worktree_copy_files(repo)}")
print(f"Post create hooks: {get_worktree_post_create_hooks(repo)}")
print(f"Agents dir: {get_agents_dir(repo)}")