189 lines
5.6 KiB
Python
189 lines
5.6 KiB
Python
|
|
#!/usr/bin/env python3
|
||
|
|
# -*- coding: utf-8 -*-
|
||
|
|
"""
|
||
|
|
Workflow Phase Extraction.
|
||
|
|
|
||
|
|
Extracts step-level content from .trellis/workflow.md and optionally filters
|
||
|
|
platform-specific blocks.
|
||
|
|
|
||
|
|
Platform marker syntax in workflow.md:
|
||
|
|
|
||
|
|
[Claude Code, Cursor, ...]
|
||
|
|
agent-capable content
|
||
|
|
[/Claude Code, Cursor, ...]
|
||
|
|
|
||
|
|
Provides:
|
||
|
|
get_phase_index - Extract the Phase Index section (no --step)
|
||
|
|
get_step - Extract a single step (#### X.X) section
|
||
|
|
filter_platform - Strip platform blocks that don't include the given name
|
||
|
|
"""
|
||
|
|
|
||
|
|
from __future__ import annotations
|
||
|
|
|
||
|
|
import re
|
||
|
|
|
||
|
|
from .paths import DIR_WORKFLOW, get_repo_root
|
||
|
|
|
||
|
|
|
||
|
|
def _workflow_md_path():
|
||
|
|
return get_repo_root() / DIR_WORKFLOW / "workflow.md"
|
||
|
|
|
||
|
|
# Match a line that *is* a platform marker: "[A, B, C]" or "[/A, B, C]"
|
||
|
|
_MARKER_RE = re.compile(r"^\[(/?)([A-Za-z][^\[\]]*)\]\s*$")
|
||
|
|
|
||
|
|
# Step heading: "#### 1.0 Title" or "#### 1.0 ..."
|
||
|
|
_STEP_HEADING_RE = re.compile(r"^####\s+(\d+\.\d+)\b.*$")
|
||
|
|
|
||
|
|
# Phase Index starts here; Phase 1/2/3 step bodies follow; ends at Breadcrumbs.
|
||
|
|
_PHASE_INDEX_HEADING = "## Phase Index"
|
||
|
|
|
||
|
|
|
||
|
|
def _read_workflow() -> str:
|
||
|
|
path = _workflow_md_path()
|
||
|
|
if not path.exists():
|
||
|
|
raise FileNotFoundError(f"workflow.md not found: {path}")
|
||
|
|
return path.read_text(encoding="utf-8")
|
||
|
|
|
||
|
|
|
||
|
|
def _parse_marker(line: str) -> tuple[bool, list[str]] | None:
|
||
|
|
"""Parse a platform marker line.
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
(is_closing, [platform_names]) if line is a marker, else None.
|
||
|
|
"""
|
||
|
|
m = _MARKER_RE.match(line)
|
||
|
|
if not m:
|
||
|
|
return None
|
||
|
|
is_closing = m.group(1) == "/"
|
||
|
|
names = [p.strip() for p in m.group(2).split(",") if p.strip()]
|
||
|
|
return is_closing, names
|
||
|
|
|
||
|
|
|
||
|
|
def get_phase_index() -> str:
|
||
|
|
"""Return Phase Index + Phase 1/2/3 step bodies from workflow.md.
|
||
|
|
|
||
|
|
Matches what the SessionStart hook injects into the `<workflow>` block:
|
||
|
|
starts at `## Phase Index`, continues through `## Phase 1: Plan`,
|
||
|
|
`## Phase 2: Execute`, `## Phase 3: Finish`, stops at
|
||
|
|
`## Customizing Trellis (for forks)` (the docs-for-forks footer).
|
||
|
|
`[workflow-state:STATUS]` tag blocks (now embedded in Phase Index since
|
||
|
|
v0.5.0-rc.0) are consumed by the UserPromptSubmit hook so they're
|
||
|
|
stripped from this output.
|
||
|
|
"""
|
||
|
|
text = _read_workflow()
|
||
|
|
lines = text.splitlines()
|
||
|
|
|
||
|
|
start: int | None = None
|
||
|
|
end: int | None = None
|
||
|
|
for i, line in enumerate(lines):
|
||
|
|
stripped = line.strip()
|
||
|
|
if start is None and stripped == _PHASE_INDEX_HEADING:
|
||
|
|
start = i
|
||
|
|
continue
|
||
|
|
if start is not None and stripped == "## Customizing Trellis (for forks)":
|
||
|
|
end = i
|
||
|
|
break
|
||
|
|
|
||
|
|
if start is None:
|
||
|
|
return ""
|
||
|
|
if end is None:
|
||
|
|
end = len(lines)
|
||
|
|
|
||
|
|
section = "\n".join(lines[start:end]).rstrip()
|
||
|
|
# Strip [workflow-state:STATUS]...[/workflow-state:STATUS] blocks since
|
||
|
|
# they're injected separately by inject-workflow-state.py per-turn.
|
||
|
|
import re as _re
|
||
|
|
tag_re = _re.compile(
|
||
|
|
r"\[workflow-state:([A-Za-z0-9_-]+)\]\s*\n.*?\n\s*\[/workflow-state:\1\]\n?",
|
||
|
|
_re.DOTALL,
|
||
|
|
)
|
||
|
|
return tag_re.sub("", section).rstrip() + "\n"
|
||
|
|
|
||
|
|
|
||
|
|
def get_step(step_id: str) -> str:
|
||
|
|
"""Return the `#### X.X` section matching step_id (header + body).
|
||
|
|
|
||
|
|
Body ends at the next `####` or `---` or `##` heading (whichever comes first).
|
||
|
|
"""
|
||
|
|
text = _read_workflow()
|
||
|
|
lines = text.splitlines()
|
||
|
|
|
||
|
|
start: int | None = None
|
||
|
|
for i, line in enumerate(lines):
|
||
|
|
m = _STEP_HEADING_RE.match(line)
|
||
|
|
if m and m.group(1) == step_id:
|
||
|
|
start = i
|
||
|
|
break
|
||
|
|
if start is None:
|
||
|
|
return ""
|
||
|
|
|
||
|
|
end: int = len(lines)
|
||
|
|
for j in range(start + 1, len(lines)):
|
||
|
|
line = lines[j]
|
||
|
|
if line.startswith("#### "):
|
||
|
|
end = j
|
||
|
|
break
|
||
|
|
if line.startswith("## "):
|
||
|
|
end = j
|
||
|
|
break
|
||
|
|
# Horizontal rule at column 0
|
||
|
|
if line.strip() == "---":
|
||
|
|
end = j
|
||
|
|
break
|
||
|
|
|
||
|
|
return "\n".join(lines[start:end]).rstrip() + "\n"
|
||
|
|
|
||
|
|
|
||
|
|
def _platform_matches(platform: str, block_names: list[str]) -> bool:
|
||
|
|
"""Case-insensitive fuzzy match: accept 'cursor', 'Cursor', 'claude-code', 'Claude Code'."""
|
||
|
|
needle = platform.lower().replace("-", "").replace("_", "").replace(" ", "")
|
||
|
|
for name in block_names:
|
||
|
|
hay = name.lower().replace("-", "").replace("_", "").replace(" ", "")
|
||
|
|
if needle == hay:
|
||
|
|
return True
|
||
|
|
return False
|
||
|
|
|
||
|
|
|
||
|
|
def filter_platform(content: str, platform: str) -> str:
|
||
|
|
"""Keep lines outside any `[...]` block + lines inside blocks that include platform.
|
||
|
|
|
||
|
|
Marker lines themselves are dropped from the output.
|
||
|
|
"""
|
||
|
|
lines = content.splitlines()
|
||
|
|
out: list[str] = []
|
||
|
|
|
||
|
|
in_block = False
|
||
|
|
keep_block = False
|
||
|
|
|
||
|
|
for line in lines:
|
||
|
|
marker = _parse_marker(line)
|
||
|
|
if marker is not None:
|
||
|
|
is_closing, names = marker
|
||
|
|
if not is_closing:
|
||
|
|
in_block = True
|
||
|
|
keep_block = _platform_matches(platform, names)
|
||
|
|
else:
|
||
|
|
in_block = False
|
||
|
|
keep_block = False
|
||
|
|
continue # drop the marker line itself
|
||
|
|
|
||
|
|
if in_block:
|
||
|
|
if keep_block:
|
||
|
|
out.append(line)
|
||
|
|
continue
|
||
|
|
out.append(line)
|
||
|
|
|
||
|
|
# Collapse runs of 3+ blank lines that may arise from dropped markers
|
||
|
|
collapsed: list[str] = []
|
||
|
|
blank_run = 0
|
||
|
|
for line in out:
|
||
|
|
if line.strip() == "":
|
||
|
|
blank_run += 1
|
||
|
|
if blank_run <= 2:
|
||
|
|
collapsed.append(line)
|
||
|
|
else:
|
||
|
|
blank_run = 0
|
||
|
|
collapsed.append(line)
|
||
|
|
|
||
|
|
return "\n".join(collapsed).rstrip() + "\n"
|