747 lines
23 KiB
Python
747 lines
23 KiB
Python
|
|
#!/usr/bin/env python3
|
||
|
|
# -*- coding: utf-8 -*-
|
||
|
|
"""
|
||
|
|
Multi-Platform Sub-Agent Context Injection Hook
|
||
|
|
|
||
|
|
Injects task-specific context when sub-agents (implement, check, research) are spawned.
|
||
|
|
|
||
|
|
Core Design Philosophy:
|
||
|
|
- Hook is responsible for injecting all context, subagent works autonomously with complete info
|
||
|
|
- Each agent has a dedicated jsonl file defining its context
|
||
|
|
- No resume needed, no segmentation, behavior controlled by code not prompt
|
||
|
|
|
||
|
|
Trigger: PreToolUse (before Task tool call)
|
||
|
|
|
||
|
|
Context Source: Trellis active task resolver points to task directory
|
||
|
|
- implement.jsonl - Implement agent dedicated context
|
||
|
|
- check.jsonl - Check agent dedicated context
|
||
|
|
- prd.md - Requirements document
|
||
|
|
- info.md - Technical design
|
||
|
|
- codex-review-output.txt - Code Review results
|
||
|
|
"""
|
||
|
|
from __future__ import annotations
|
||
|
|
|
||
|
|
# IMPORTANT: Suppress all warnings FIRST
|
||
|
|
import warnings
|
||
|
|
warnings.filterwarnings("ignore")
|
||
|
|
|
||
|
|
import json
|
||
|
|
import os
|
||
|
|
import sys
|
||
|
|
from pathlib import Path
|
||
|
|
from typing import Any
|
||
|
|
|
||
|
|
# IMPORTANT: Force stdout to use UTF-8 on Windows
|
||
|
|
# This fixes UnicodeEncodeError when outputting non-ASCII characters
|
||
|
|
if sys.platform.startswith("win"):
|
||
|
|
import io as _io
|
||
|
|
if hasattr(sys.stdout, "reconfigure"):
|
||
|
|
sys.stdout.reconfigure(encoding="utf-8", errors="replace") # type: ignore[union-attr]
|
||
|
|
elif hasattr(sys.stdout, "detach"):
|
||
|
|
sys.stdout = _io.TextIOWrapper(sys.stdout.detach(), encoding="utf-8", errors="replace") # type: ignore[union-attr]
|
||
|
|
|
||
|
|
|
||
|
|
# =============================================================================
|
||
|
|
# Path Constants (change here to rename directories)
|
||
|
|
# =============================================================================
|
||
|
|
|
||
|
|
DIR_WORKFLOW = ".trellis"
|
||
|
|
DIR_SPEC = "spec"
|
||
|
|
FILE_TASK_JSON = "task.json"
|
||
|
|
|
||
|
|
# =============================================================================
|
||
|
|
# Subagent Constants (change here to rename subagent types)
|
||
|
|
# =============================================================================
|
||
|
|
|
||
|
|
AGENT_IMPLEMENT = "trellis-implement"
|
||
|
|
AGENT_CHECK = "trellis-check"
|
||
|
|
AGENT_RESEARCH = "trellis-research"
|
||
|
|
|
||
|
|
# Agents that require a task directory
|
||
|
|
AGENTS_REQUIRE_TASK = (AGENT_IMPLEMENT, AGENT_CHECK)
|
||
|
|
# All supported agents
|
||
|
|
AGENTS_ALL = (AGENT_IMPLEMENT, AGENT_CHECK, AGENT_RESEARCH)
|
||
|
|
|
||
|
|
|
||
|
|
def find_repo_root(start_path: str) -> str | None:
|
||
|
|
"""
|
||
|
|
Find git repo root from start_path upwards
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
Repo root path, or None if not found
|
||
|
|
"""
|
||
|
|
current = Path(start_path).resolve()
|
||
|
|
while current != current.parent:
|
||
|
|
if (current / ".git").exists():
|
||
|
|
return str(current)
|
||
|
|
current = current.parent
|
||
|
|
return None
|
||
|
|
|
||
|
|
|
||
|
|
def _detect_platform(input_data: dict) -> str | None:
|
||
|
|
if isinstance(input_data.get("cursor_version"), str):
|
||
|
|
return "cursor"
|
||
|
|
env_map = {
|
||
|
|
"CLAUDE_PROJECT_DIR": "claude",
|
||
|
|
"CURSOR_PROJECT_DIR": "cursor",
|
||
|
|
"CODEBUDDY_PROJECT_DIR": "codebuddy",
|
||
|
|
"FACTORY_PROJECT_DIR": "droid",
|
||
|
|
"GEMINI_PROJECT_DIR": "gemini",
|
||
|
|
"QODER_PROJECT_DIR": "qoder",
|
||
|
|
"KIRO_PROJECT_DIR": "kiro",
|
||
|
|
"COPILOT_PROJECT_DIR": "copilot",
|
||
|
|
}
|
||
|
|
for env_name, platform in env_map.items():
|
||
|
|
if os.environ.get(env_name):
|
||
|
|
return platform
|
||
|
|
script_parts = set(Path(sys.argv[0]).parts)
|
||
|
|
if ".claude" in script_parts:
|
||
|
|
return "claude"
|
||
|
|
if ".cursor" in script_parts:
|
||
|
|
return "cursor"
|
||
|
|
if ".gemini" in script_parts:
|
||
|
|
return "gemini"
|
||
|
|
if ".qoder" in script_parts:
|
||
|
|
return "qoder"
|
||
|
|
if ".codebuddy" in script_parts:
|
||
|
|
return "codebuddy"
|
||
|
|
if ".factory" in script_parts:
|
||
|
|
return "droid"
|
||
|
|
if ".kiro" in script_parts:
|
||
|
|
return "kiro"
|
||
|
|
return None
|
||
|
|
|
||
|
|
|
||
|
|
def get_current_task(repo_root: str, input_data: dict) -> str | None:
|
||
|
|
"""Resolve current task directory through the unified active task resolver."""
|
||
|
|
scripts_dir = Path(repo_root) / DIR_WORKFLOW / "scripts"
|
||
|
|
if str(scripts_dir) not in sys.path:
|
||
|
|
sys.path.insert(0, str(scripts_dir))
|
||
|
|
try:
|
||
|
|
from common.active_task import resolve_active_task # type: ignore[import-not-found]
|
||
|
|
except Exception:
|
||
|
|
return None
|
||
|
|
|
||
|
|
active = resolve_active_task(
|
||
|
|
Path(repo_root),
|
||
|
|
input_data,
|
||
|
|
platform=_detect_platform(input_data),
|
||
|
|
)
|
||
|
|
return active.task_path
|
||
|
|
|
||
|
|
|
||
|
|
def read_file_content(base_path: str, file_path: str) -> str | None:
|
||
|
|
"""Read file content, return None if file doesn't exist"""
|
||
|
|
full_path = os.path.join(base_path, file_path)
|
||
|
|
if os.path.exists(full_path) and os.path.isfile(full_path):
|
||
|
|
try:
|
||
|
|
with open(full_path, "r", encoding="utf-8") as f:
|
||
|
|
return f.read()
|
||
|
|
except Exception:
|
||
|
|
return None
|
||
|
|
return None
|
||
|
|
|
||
|
|
|
||
|
|
def read_directory_contents(
|
||
|
|
base_path: str, dir_path: str, max_files: int = 20
|
||
|
|
) -> list[tuple[str, str]]:
|
||
|
|
"""
|
||
|
|
Read all .md files in a directory
|
||
|
|
|
||
|
|
Args:
|
||
|
|
base_path: Base path (usually repo_root)
|
||
|
|
dir_path: Directory relative path
|
||
|
|
max_files: Max files to read (prevent huge directories)
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
[(file_path, content), ...]
|
||
|
|
"""
|
||
|
|
full_path = os.path.join(base_path, dir_path)
|
||
|
|
if not os.path.exists(full_path) or not os.path.isdir(full_path):
|
||
|
|
return []
|
||
|
|
|
||
|
|
results = []
|
||
|
|
try:
|
||
|
|
# Only read .md files, sorted by filename
|
||
|
|
md_files = sorted(
|
||
|
|
[
|
||
|
|
f
|
||
|
|
for f in os.listdir(full_path)
|
||
|
|
if f.endswith(".md") and os.path.isfile(os.path.join(full_path, f))
|
||
|
|
]
|
||
|
|
)
|
||
|
|
|
||
|
|
for filename in md_files[:max_files]:
|
||
|
|
file_full_path = os.path.join(full_path, filename)
|
||
|
|
relative_path = os.path.join(dir_path, filename)
|
||
|
|
try:
|
||
|
|
with open(file_full_path, "r", encoding="utf-8") as f:
|
||
|
|
content = f.read()
|
||
|
|
results.append((relative_path, content))
|
||
|
|
except Exception:
|
||
|
|
continue
|
||
|
|
except Exception:
|
||
|
|
pass
|
||
|
|
|
||
|
|
return results
|
||
|
|
|
||
|
|
|
||
|
|
def read_jsonl_entries(base_path: str, jsonl_path: str) -> list[tuple[str, str]]:
|
||
|
|
"""
|
||
|
|
Read all file/directory contents referenced in jsonl file
|
||
|
|
|
||
|
|
Schema:
|
||
|
|
{"file": "path/to/file.md", "reason": "..."}
|
||
|
|
{"file": "path/to/dir/", "type": "directory", "reason": "..."}
|
||
|
|
{"_example": "..."} # seed row — skipped (no `file` field)
|
||
|
|
|
||
|
|
Rows without a ``file`` field (e.g. the self-describing seed line written
|
||
|
|
by ``task.py create`` before the agent has curated entries) are skipped
|
||
|
|
silently. If the resulting entry list is empty, a stderr warning is
|
||
|
|
emitted so the operator can debug missing context.
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
[(path, content), ...]
|
||
|
|
"""
|
||
|
|
full_path = os.path.join(base_path, jsonl_path)
|
||
|
|
if not os.path.exists(full_path):
|
||
|
|
print(
|
||
|
|
f"[inject-subagent-context] WARN: {jsonl_path} not found — "
|
||
|
|
f"sub-agent will receive only prd.md",
|
||
|
|
file=sys.stderr,
|
||
|
|
)
|
||
|
|
return []
|
||
|
|
|
||
|
|
results = []
|
||
|
|
saw_real_entry = False
|
||
|
|
try:
|
||
|
|
with open(full_path, "r", encoding="utf-8") as f:
|
||
|
|
for line in f:
|
||
|
|
line = line.strip()
|
||
|
|
if not line:
|
||
|
|
continue
|
||
|
|
try:
|
||
|
|
item = json.loads(line)
|
||
|
|
file_path = item.get("file") or item.get("path")
|
||
|
|
entry_type = item.get("type", "file")
|
||
|
|
|
||
|
|
if not file_path:
|
||
|
|
# Seed / comment row — skip silently
|
||
|
|
continue
|
||
|
|
|
||
|
|
saw_real_entry = True
|
||
|
|
if entry_type == "directory":
|
||
|
|
# Read all .md files in directory
|
||
|
|
dir_contents = read_directory_contents(base_path, file_path)
|
||
|
|
results.extend(dir_contents)
|
||
|
|
else:
|
||
|
|
# Read single file
|
||
|
|
content = read_file_content(base_path, file_path)
|
||
|
|
if content:
|
||
|
|
results.append((file_path, content))
|
||
|
|
except json.JSONDecodeError:
|
||
|
|
continue
|
||
|
|
except Exception:
|
||
|
|
pass
|
||
|
|
|
||
|
|
if not saw_real_entry:
|
||
|
|
print(
|
||
|
|
f"[inject-subagent-context] WARN: {jsonl_path} has no curated "
|
||
|
|
f"entries (only seed / empty) — sub-agent will receive only "
|
||
|
|
f"prd.md. See workflow.md Phase 1.3 for curation guidance.",
|
||
|
|
file=sys.stderr,
|
||
|
|
)
|
||
|
|
|
||
|
|
return results
|
||
|
|
|
||
|
|
|
||
|
|
|
||
|
|
|
||
|
|
def get_agent_context(repo_root: str, task_dir: str, agent_type: str) -> str:
|
||
|
|
"""
|
||
|
|
Get context from {agent_type}.jsonl for the specified agent.
|
||
|
|
Only reads implement.jsonl or check.jsonl (the two JSONL files the task system creates).
|
||
|
|
"""
|
||
|
|
context_parts = []
|
||
|
|
|
||
|
|
agent_jsonl = f"{task_dir}/{agent_type}.jsonl"
|
||
|
|
for file_path, content in read_jsonl_entries(repo_root, agent_jsonl):
|
||
|
|
context_parts.append(f"=== {file_path} ===\n{content}")
|
||
|
|
|
||
|
|
return "\n\n".join(context_parts)
|
||
|
|
|
||
|
|
|
||
|
|
def get_implement_context(repo_root: str, task_dir: str) -> str:
|
||
|
|
"""
|
||
|
|
Complete context for Implement Agent
|
||
|
|
|
||
|
|
Read order:
|
||
|
|
1. All files in implement.jsonl (dev specs)
|
||
|
|
2. prd.md (requirements)
|
||
|
|
3. info.md (technical design)
|
||
|
|
"""
|
||
|
|
context_parts = []
|
||
|
|
|
||
|
|
# 1. Read implement.jsonl
|
||
|
|
base_context = get_agent_context(repo_root, task_dir, "implement")
|
||
|
|
if base_context:
|
||
|
|
context_parts.append(base_context)
|
||
|
|
|
||
|
|
# 2. Requirements document
|
||
|
|
prd_content = read_file_content(repo_root, f"{task_dir}/prd.md")
|
||
|
|
if prd_content:
|
||
|
|
context_parts.append(f"=== {task_dir}/prd.md (Requirements) ===\n{prd_content}")
|
||
|
|
|
||
|
|
# 3. Technical design
|
||
|
|
info_content = read_file_content(repo_root, f"{task_dir}/info.md")
|
||
|
|
if info_content:
|
||
|
|
context_parts.append(
|
||
|
|
f"=== {task_dir}/info.md (Technical Design) ===\n{info_content}"
|
||
|
|
)
|
||
|
|
|
||
|
|
return "\n\n".join(context_parts)
|
||
|
|
|
||
|
|
|
||
|
|
def get_check_context(repo_root: str, task_dir: str) -> str:
|
||
|
|
"""
|
||
|
|
Context for Check Agent: check.jsonl + prd.md
|
||
|
|
"""
|
||
|
|
context_parts = []
|
||
|
|
|
||
|
|
for file_path, content in read_jsonl_entries(repo_root, f"{task_dir}/check.jsonl"):
|
||
|
|
context_parts.append(f"=== {file_path} ===\n{content}")
|
||
|
|
|
||
|
|
prd_content = read_file_content(repo_root, f"{task_dir}/prd.md")
|
||
|
|
if prd_content:
|
||
|
|
context_parts.append(f"=== {task_dir}/prd.md (Requirements) ===\n{prd_content}")
|
||
|
|
|
||
|
|
return "\n\n".join(context_parts)
|
||
|
|
|
||
|
|
|
||
|
|
def get_finish_context(repo_root: str, task_dir: str) -> str:
|
||
|
|
"""
|
||
|
|
Context for Finish phase: reuses check.jsonl + prd.md
|
||
|
|
(Finish is a final check, same context source.)
|
||
|
|
"""
|
||
|
|
return get_check_context(repo_root, task_dir)
|
||
|
|
|
||
|
|
|
||
|
|
|
||
|
|
def build_implement_prompt(original_prompt: str, context: str) -> str:
|
||
|
|
"""Build complete prompt for Implement"""
|
||
|
|
return f"""# Implement Agent Task
|
||
|
|
|
||
|
|
You are the Implement Agent in the Multi-Agent Pipeline.
|
||
|
|
|
||
|
|
## Your Context
|
||
|
|
|
||
|
|
All the information you need has been prepared for you:
|
||
|
|
|
||
|
|
{context}
|
||
|
|
|
||
|
|
---
|
||
|
|
|
||
|
|
## Your Task
|
||
|
|
|
||
|
|
{original_prompt}
|
||
|
|
|
||
|
|
---
|
||
|
|
|
||
|
|
## Workflow
|
||
|
|
|
||
|
|
1. **Understand specs** - All dev specs are injected above, understand them
|
||
|
|
2. **Understand requirements** - Read requirements document and technical design
|
||
|
|
3. **Implement feature** - Implement following specs and design
|
||
|
|
4. **Self-check** - Ensure code quality against check specs
|
||
|
|
|
||
|
|
## Important Constraints
|
||
|
|
|
||
|
|
- Do NOT execute git commit, only code modifications
|
||
|
|
- Follow all dev specs injected above
|
||
|
|
- Report list of modified/created files when done"""
|
||
|
|
|
||
|
|
|
||
|
|
def build_check_prompt(original_prompt: str, context: str) -> str:
|
||
|
|
"""Build complete prompt for Check"""
|
||
|
|
return f"""# Check Agent Task
|
||
|
|
|
||
|
|
You are the Check Agent in the Multi-Agent Pipeline (code and cross-layer checker).
|
||
|
|
|
||
|
|
## Your Context
|
||
|
|
|
||
|
|
All check specs and dev specs you need:
|
||
|
|
|
||
|
|
{context}
|
||
|
|
|
||
|
|
---
|
||
|
|
|
||
|
|
## Your Task
|
||
|
|
|
||
|
|
{original_prompt}
|
||
|
|
|
||
|
|
---
|
||
|
|
|
||
|
|
## Workflow
|
||
|
|
|
||
|
|
1. **Get changes** - Run `git diff --name-only` and `git diff` to get code changes
|
||
|
|
2. **Check against specs** - Check item by item against specs above
|
||
|
|
3. **Self-fix** - Fix issues directly, don't just report
|
||
|
|
4. **Run verification** - Run project's lint and typecheck commands
|
||
|
|
|
||
|
|
## Important Constraints
|
||
|
|
|
||
|
|
- Fix issues yourself, don't just report
|
||
|
|
- Must execute complete checklist in check specs
|
||
|
|
- Pay special attention to impact radius analysis (L1-L5)"""
|
||
|
|
|
||
|
|
|
||
|
|
def build_finish_prompt(original_prompt: str, context: str) -> str:
|
||
|
|
"""Build complete prompt for Finish (final check before PR)"""
|
||
|
|
return f"""# Finish Agent Task
|
||
|
|
|
||
|
|
You are performing the final check before creating a PR.
|
||
|
|
|
||
|
|
## Your Context
|
||
|
|
|
||
|
|
Finish checklist and requirements:
|
||
|
|
|
||
|
|
{context}
|
||
|
|
|
||
|
|
---
|
||
|
|
|
||
|
|
## Your Task
|
||
|
|
|
||
|
|
{original_prompt}
|
||
|
|
|
||
|
|
---
|
||
|
|
|
||
|
|
## Workflow
|
||
|
|
|
||
|
|
1. **Review changes** - Run `git diff --name-only` to see all changed files
|
||
|
|
2. **Verify requirements** - Check each requirement in prd.md is implemented
|
||
|
|
3. **Spec sync** - Analyze whether changes introduce new patterns, contracts, or conventions
|
||
|
|
- If new pattern/convention found: read target spec file → update it → update index.md if needed
|
||
|
|
- If infra/cross-layer change: follow the 7-section mandatory template from update-spec.md
|
||
|
|
- If pure code fix with no new patterns: skip this step
|
||
|
|
4. **Run final checks** - Execute lint and typecheck
|
||
|
|
5. **Confirm ready** - Ensure code is ready for PR
|
||
|
|
|
||
|
|
## Important Constraints
|
||
|
|
|
||
|
|
- You MAY update spec files when gaps are detected (use update-spec.md as guide)
|
||
|
|
- MUST read the target spec file BEFORE editing (avoid duplicating existing content)
|
||
|
|
- Do NOT update specs for trivial changes (typos, formatting, obvious fixes)
|
||
|
|
- If critical CODE issues found, report them clearly (fix specs, not code)
|
||
|
|
- Verify all acceptance criteria in prd.md are met"""
|
||
|
|
|
||
|
|
|
||
|
|
|
||
|
|
def get_research_context(repo_root: str, task_dir: str | None) -> str:
|
||
|
|
"""
|
||
|
|
Context for Research Agent — project structure overview for spec directories.
|
||
|
|
|
||
|
|
`task_dir` kept for signature parity with get_implement_context / get_check_context
|
||
|
|
so the dispatcher can call them uniformly.
|
||
|
|
"""
|
||
|
|
_ = task_dir
|
||
|
|
context_parts = []
|
||
|
|
|
||
|
|
# 1. Project structure overview (dynamically discover spec directories)
|
||
|
|
spec_path = f"{DIR_WORKFLOW}/{DIR_SPEC}"
|
||
|
|
spec_root = Path(repo_root) / DIR_WORKFLOW / DIR_SPEC
|
||
|
|
|
||
|
|
# Build spec tree dynamically
|
||
|
|
tree_lines = [f"{spec_path}/"]
|
||
|
|
if spec_root.is_dir():
|
||
|
|
pkg_dirs = sorted(d for d in spec_root.iterdir() if d.is_dir())
|
||
|
|
for i, pkg_dir in enumerate(pkg_dirs):
|
||
|
|
is_last = i == len(pkg_dirs) - 1
|
||
|
|
prefix = "└── " if is_last else "├── "
|
||
|
|
layers = sorted(d.name for d in pkg_dir.iterdir() if d.is_dir())
|
||
|
|
layer_info = f" ({', '.join(layers)})" if layers else ""
|
||
|
|
tree_lines.append(f"{prefix}{pkg_dir.name}/{layer_info}")
|
||
|
|
|
||
|
|
spec_tree = "\n".join(tree_lines)
|
||
|
|
|
||
|
|
project_structure = f"""## Project Spec Directory Structure
|
||
|
|
|
||
|
|
```
|
||
|
|
{spec_tree}
|
||
|
|
```
|
||
|
|
|
||
|
|
To get structured package info, run: `python3 ./{DIR_WORKFLOW}/scripts/get_context.py --mode packages`
|
||
|
|
|
||
|
|
## Search Tips
|
||
|
|
|
||
|
|
- Spec files: `{spec_path}/**/*.md`
|
||
|
|
- Code search: Use Glob and Grep tools
|
||
|
|
- Tech solutions: Use mcp__exa__web_search_exa or mcp__exa__get_code_context_exa"""
|
||
|
|
|
||
|
|
context_parts.append(project_structure)
|
||
|
|
|
||
|
|
return "\n\n".join(context_parts)
|
||
|
|
|
||
|
|
|
||
|
|
def build_research_prompt(original_prompt: str, context: str) -> str:
|
||
|
|
"""Build complete prompt for Research"""
|
||
|
|
return f"""# Research Agent Task
|
||
|
|
|
||
|
|
You are the Research Agent in the Multi-Agent Pipeline (search researcher).
|
||
|
|
|
||
|
|
## Core Principle
|
||
|
|
|
||
|
|
**You do one thing: find and explain information.**
|
||
|
|
|
||
|
|
You are a documenter, not a reviewer.
|
||
|
|
|
||
|
|
## Project Info
|
||
|
|
|
||
|
|
{context}
|
||
|
|
|
||
|
|
---
|
||
|
|
|
||
|
|
## Your Task
|
||
|
|
|
||
|
|
{original_prompt}
|
||
|
|
|
||
|
|
---
|
||
|
|
|
||
|
|
## Workflow
|
||
|
|
|
||
|
|
1. **Understand query** - Determine search type (internal/external) and scope
|
||
|
|
2. **Plan search** - List search steps for complex queries
|
||
|
|
3. **Execute search** - Execute multiple independent searches in parallel
|
||
|
|
4. **Organize results** - Output structured report
|
||
|
|
|
||
|
|
## Search Tools
|
||
|
|
|
||
|
|
| Tool | Purpose |
|
||
|
|
|------|---------|
|
||
|
|
| Glob | Search by filename pattern |
|
||
|
|
| Grep | Search by content |
|
||
|
|
| Read | Read file content |
|
||
|
|
| mcp__exa__web_search_exa | External web search |
|
||
|
|
| mcp__exa__get_code_context_exa | External code/doc search |
|
||
|
|
|
||
|
|
## Strict Boundaries
|
||
|
|
|
||
|
|
**Only allowed**: Describe what exists, where it is, how it works
|
||
|
|
|
||
|
|
**Forbidden** (unless explicitly asked):
|
||
|
|
- Suggest improvements
|
||
|
|
- Criticize implementation
|
||
|
|
- Recommend refactoring
|
||
|
|
- Modify any files
|
||
|
|
|
||
|
|
## Report Format
|
||
|
|
|
||
|
|
Provide structured search results including:
|
||
|
|
- List of files found (with paths)
|
||
|
|
- Code pattern analysis (if applicable)
|
||
|
|
- Related spec documents
|
||
|
|
- External references (if any)"""
|
||
|
|
|
||
|
|
|
||
|
|
def _string_value(value: Any) -> str:
|
||
|
|
if isinstance(value, str):
|
||
|
|
stripped = value.strip()
|
||
|
|
return stripped
|
||
|
|
return ""
|
||
|
|
|
||
|
|
|
||
|
|
def _extract_subagent_name(value: Any) -> str:
|
||
|
|
"""Extract a sub-agent name from common platform encodings.
|
||
|
|
|
||
|
|
Cursor's native Task args encode custom sub-agents as a protobuf oneof,
|
||
|
|
which can appear in hook JSON as either ``{"custom": {"name": "..."}}``
|
||
|
|
or ``{"type": {"case": "custom", "value": {"name": "..."}}}``.
|
||
|
|
"""
|
||
|
|
direct = _string_value(value)
|
||
|
|
if direct:
|
||
|
|
return direct
|
||
|
|
|
||
|
|
if not isinstance(value, dict):
|
||
|
|
return ""
|
||
|
|
|
||
|
|
for key in ("name", "subagent_type_name", "subagentTypeName"):
|
||
|
|
direct = _string_value(value.get(key))
|
||
|
|
if direct:
|
||
|
|
return direct
|
||
|
|
|
||
|
|
custom = value.get("custom")
|
||
|
|
if isinstance(custom, dict):
|
||
|
|
custom_name = _string_value(custom.get("name"))
|
||
|
|
if custom_name:
|
||
|
|
return custom_name
|
||
|
|
|
||
|
|
oneof = value.get("type")
|
||
|
|
if isinstance(oneof, dict):
|
||
|
|
case_name = _string_value(oneof.get("case"))
|
||
|
|
if case_name == "custom":
|
||
|
|
nested_value = oneof.get("value")
|
||
|
|
if isinstance(nested_value, dict):
|
||
|
|
custom_name = _string_value(nested_value.get("name"))
|
||
|
|
if custom_name:
|
||
|
|
return custom_name
|
||
|
|
if case_name:
|
||
|
|
return case_name
|
||
|
|
|
||
|
|
case_name = _string_value(value.get("case"))
|
||
|
|
if case_name == "custom":
|
||
|
|
nested_value = value.get("value")
|
||
|
|
if isinstance(nested_value, dict):
|
||
|
|
custom_name = _string_value(nested_value.get("name"))
|
||
|
|
if custom_name:
|
||
|
|
return custom_name
|
||
|
|
if case_name:
|
||
|
|
return case_name
|
||
|
|
|
||
|
|
for agent_name in AGENTS_ALL:
|
||
|
|
if agent_name in value:
|
||
|
|
return agent_name
|
||
|
|
|
||
|
|
return ""
|
||
|
|
|
||
|
|
|
||
|
|
def _extract_subagent_type(tool_input: dict) -> str:
|
||
|
|
for key in (
|
||
|
|
"subagent_type",
|
||
|
|
"subagentType",
|
||
|
|
"subagent_type_name",
|
||
|
|
"subagentTypeName",
|
||
|
|
"agent_type",
|
||
|
|
"agentType",
|
||
|
|
"name",
|
||
|
|
):
|
||
|
|
agent_name = _extract_subagent_name(tool_input.get(key))
|
||
|
|
if agent_name:
|
||
|
|
return agent_name
|
||
|
|
return ""
|
||
|
|
|
||
|
|
|
||
|
|
def _parse_hook_input(input_data: dict) -> tuple[str, str, dict]:
|
||
|
|
"""Parse hook input across different platform formats.
|
||
|
|
|
||
|
|
Returns (subagent_type, original_prompt, tool_input).
|
||
|
|
Handles:
|
||
|
|
- Claude Code / Qoder / CodeBuddy / Droid: tool_name=Task|Agent, tool_input.subagent_type
|
||
|
|
- Cursor: tool_name=Task|Subagent, tool_input.subagent_type
|
||
|
|
- Copilot CLI: toolName=task (camelCase key, lowercase value)
|
||
|
|
- Gemini CLI: tool_name IS the agent name (BeforeTool matcher already filtered)
|
||
|
|
- Kiro: agentSpawn hook, agent_name field at top level
|
||
|
|
"""
|
||
|
|
tool_input = input_data.get("tool_input", {})
|
||
|
|
|
||
|
|
# Standard format: Task/Agent tool with subagent_type
|
||
|
|
tool_name = input_data.get("tool_name", "") or input_data.get("toolName", "")
|
||
|
|
if tool_name.lower() in ("task", "agent", "subagent"):
|
||
|
|
return (
|
||
|
|
_extract_subagent_type(tool_input),
|
||
|
|
tool_input.get("prompt", ""),
|
||
|
|
tool_input,
|
||
|
|
)
|
||
|
|
|
||
|
|
# Kiro: agentSpawn hook passes agent_name at top level
|
||
|
|
agent_name = input_data.get("agent_name", "")
|
||
|
|
if agent_name:
|
||
|
|
return agent_name, tool_input.get("prompt", input_data.get("prompt", "")), tool_input
|
||
|
|
|
||
|
|
# Gemini CLI: BeforeTool where tool_name IS the agent name
|
||
|
|
# (matcher already ensured it's one of our agents)
|
||
|
|
if tool_name in AGENTS_ALL:
|
||
|
|
return tool_name, tool_input.get("prompt", ""), tool_input
|
||
|
|
|
||
|
|
# Copilot CLI: toolName field (camelCase), value might be the agent name
|
||
|
|
tool_name_camel = input_data.get("toolName", "")
|
||
|
|
if tool_name_camel in AGENTS_ALL:
|
||
|
|
return tool_name_camel, input_data.get("toolArgs", ""), tool_input
|
||
|
|
|
||
|
|
return "", "", tool_input
|
||
|
|
|
||
|
|
|
||
|
|
def main():
|
||
|
|
if os.environ.get("TRELLIS_HOOKS") == "0" or os.environ.get("TRELLIS_DISABLE_HOOKS") == "1":
|
||
|
|
sys.exit(0)
|
||
|
|
|
||
|
|
try:
|
||
|
|
input_data = json.load(sys.stdin)
|
||
|
|
except json.JSONDecodeError:
|
||
|
|
sys.exit(0)
|
||
|
|
|
||
|
|
subagent_type, original_prompt, tool_input = _parse_hook_input(input_data)
|
||
|
|
cwd = input_data.get("cwd", os.getcwd())
|
||
|
|
|
||
|
|
# Only handle subagent types we care about
|
||
|
|
if subagent_type not in AGENTS_ALL:
|
||
|
|
sys.exit(0)
|
||
|
|
|
||
|
|
# Find repo root
|
||
|
|
repo_root = find_repo_root(cwd)
|
||
|
|
if not repo_root:
|
||
|
|
sys.exit(0)
|
||
|
|
|
||
|
|
# Get current task directory (research doesn't require it)
|
||
|
|
task_dir = get_current_task(repo_root, input_data)
|
||
|
|
|
||
|
|
# implement/check need task directory
|
||
|
|
if subagent_type in AGENTS_REQUIRE_TASK:
|
||
|
|
if not task_dir:
|
||
|
|
sys.exit(0)
|
||
|
|
# Check if task directory exists
|
||
|
|
task_dir_full = os.path.join(repo_root, task_dir)
|
||
|
|
if not os.path.exists(task_dir_full):
|
||
|
|
sys.exit(0)
|
||
|
|
|
||
|
|
# Check for [finish] marker in prompt (check agent with finish context)
|
||
|
|
is_finish_phase = "[finish]" in original_prompt.lower()
|
||
|
|
|
||
|
|
# Get context and build prompt based on subagent type
|
||
|
|
if subagent_type == AGENT_IMPLEMENT:
|
||
|
|
assert task_dir is not None # validated above
|
||
|
|
context = get_implement_context(repo_root, task_dir)
|
||
|
|
new_prompt = build_implement_prompt(original_prompt, context)
|
||
|
|
elif subagent_type == AGENT_CHECK:
|
||
|
|
assert task_dir is not None # validated above
|
||
|
|
if is_finish_phase:
|
||
|
|
# Finish phase: use finish context (lighter, focused on final verification)
|
||
|
|
context = get_finish_context(repo_root, task_dir)
|
||
|
|
new_prompt = build_finish_prompt(original_prompt, context)
|
||
|
|
else:
|
||
|
|
# Regular check phase: use check context (full specs for self-fix loop)
|
||
|
|
context = get_check_context(repo_root, task_dir)
|
||
|
|
new_prompt = build_check_prompt(original_prompt, context)
|
||
|
|
elif subagent_type == AGENT_RESEARCH:
|
||
|
|
# Research can work without task directory
|
||
|
|
context = get_research_context(repo_root, task_dir)
|
||
|
|
new_prompt = build_research_prompt(original_prompt, context)
|
||
|
|
else:
|
||
|
|
sys.exit(0)
|
||
|
|
|
||
|
|
if not context:
|
||
|
|
sys.exit(0)
|
||
|
|
|
||
|
|
# Return updated input — use a multi-format output that covers all platforms.
|
||
|
|
# Most platforms ignore unrecognized fields, so we include multiple formats.
|
||
|
|
# The platform picks whichever fields it understands.
|
||
|
|
updated = {**tool_input, "prompt": new_prompt}
|
||
|
|
output = {
|
||
|
|
# Claude Code / Qoder / CodeBuddy / Droid format
|
||
|
|
"hookSpecificOutput": {
|
||
|
|
"hookEventName": "PreToolUse",
|
||
|
|
"permissionDecision": "allow",
|
||
|
|
"updatedInput": updated,
|
||
|
|
},
|
||
|
|
# Cursor format
|
||
|
|
"permission": "allow",
|
||
|
|
"updated_input": updated,
|
||
|
|
# Gemini format
|
||
|
|
"updatedInput": updated,
|
||
|
|
}
|
||
|
|
|
||
|
|
print(json.dumps(output, ensure_ascii=False))
|
||
|
|
sys.exit(0)
|
||
|
|
|
||
|
|
|
||
|
|
if __name__ == "__main__":
|
||
|
|
main()
|