Files
eryao/.trellis/scripts/common/task_queue.py
T

260 lines
6.6 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Task queue utility functions.
Provides:
list_tasks_by_status - List tasks by status
list_pending_tasks - List tasks with pending status
list_tasks_by_assignee - List tasks by assignee
list_my_tasks - List tasks assigned to current developer
get_task_stats - Get P0/P1/P2/P3 counts
"""
from __future__ import annotations
import json
from pathlib import Path
from .paths import (
FILE_TASK_JSON,
get_repo_root,
get_developer,
get_tasks_dir,
)
def _read_json_file(path: Path) -> dict | None:
"""Read and parse a JSON file."""
try:
return json.loads(path.read_text(encoding="utf-8"))
except (FileNotFoundError, json.JSONDecodeError, OSError):
return None
# =============================================================================
# Public Functions
# =============================================================================
def list_tasks_by_status(
filter_status: str | None = None,
repo_root: Path | None = None
) -> list[dict]:
"""List tasks by status.
Args:
filter_status: Optional status filter.
repo_root: Repository root path. Defaults to auto-detected.
Returns:
List of task info dicts with keys: priority, id, title, status, assignee.
"""
if repo_root is None:
repo_root = get_repo_root()
tasks_dir = get_tasks_dir(repo_root)
results = []
if not tasks_dir.is_dir():
return results
for d in tasks_dir.iterdir():
if not d.is_dir() or d.name == "archive":
continue
task_json = d / FILE_TASK_JSON
if not task_json.is_file():
continue
data = _read_json_file(task_json)
if not data:
continue
task_id = data.get("id", "")
title = data.get("title") or data.get("name", "")
priority = data.get("priority", "P2")
status = data.get("status", "planning")
assignee = data.get("assignee", "-")
# Apply filter
if filter_status and status != filter_status:
continue
results.append({
"priority": priority,
"id": task_id,
"title": title,
"status": status,
"assignee": assignee,
"dir": d.name,
"children": data.get("children", []),
"parent": data.get("parent"),
})
return results
def list_pending_tasks(repo_root: Path | None = None) -> list[dict]:
"""List pending tasks.
Args:
repo_root: Repository root path. Defaults to auto-detected.
Returns:
List of task info dicts.
"""
return list_tasks_by_status("planning", repo_root)
def list_tasks_by_assignee(
assignee: str,
filter_status: str | None = None,
repo_root: Path | None = None
) -> list[dict]:
"""List tasks assigned to a specific developer.
Args:
assignee: Developer name.
filter_status: Optional status filter.
repo_root: Repository root path. Defaults to auto-detected.
Returns:
List of task info dicts.
"""
if repo_root is None:
repo_root = get_repo_root()
tasks_dir = get_tasks_dir(repo_root)
results = []
if not tasks_dir.is_dir():
return results
for d in tasks_dir.iterdir():
if not d.is_dir() or d.name == "archive":
continue
task_json = d / FILE_TASK_JSON
if not task_json.is_file():
continue
data = _read_json_file(task_json)
if not data:
continue
task_assignee = data.get("assignee", "-")
# Apply assignee filter
if task_assignee != assignee:
continue
task_id = data.get("id", "")
title = data.get("title") or data.get("name", "")
priority = data.get("priority", "P2")
status = data.get("status", "planning")
# Apply status filter
if filter_status and status != filter_status:
continue
results.append({
"priority": priority,
"id": task_id,
"title": title,
"status": status,
"assignee": task_assignee,
"dir": d.name,
"children": data.get("children", []),
"parent": data.get("parent"),
})
return results
def list_my_tasks(
filter_status: str | None = None,
repo_root: Path | None = None
) -> list[dict]:
"""List tasks assigned to current developer.
Args:
filter_status: Optional status filter.
repo_root: Repository root path. Defaults to auto-detected.
Returns:
List of task info dicts.
Raises:
ValueError: If developer not set.
"""
if repo_root is None:
repo_root = get_repo_root()
developer = get_developer(repo_root)
if not developer:
raise ValueError("Developer not set")
return list_tasks_by_assignee(developer, filter_status, repo_root)
def get_task_stats(repo_root: Path | None = None) -> dict[str, int]:
"""Get task statistics.
Args:
repo_root: Repository root path. Defaults to auto-detected.
Returns:
Dict with keys: P0, P1, P2, P3, Total.
"""
if repo_root is None:
repo_root = get_repo_root()
tasks_dir = get_tasks_dir(repo_root)
stats = {"P0": 0, "P1": 0, "P2": 0, "P3": 0, "Total": 0}
if not tasks_dir.is_dir():
return stats
for d in tasks_dir.iterdir():
if not d.is_dir() or d.name == "archive":
continue
task_json = d / FILE_TASK_JSON
if not task_json.is_file():
continue
data = _read_json_file(task_json)
if not data:
continue
priority = data.get("priority", "P2")
if priority in stats:
stats[priority] += 1
stats["Total"] += 1
return stats
def format_task_stats(stats: dict[str, int]) -> str:
"""Format task stats as string.
Args:
stats: Stats dict from get_task_stats.
Returns:
Formatted string like "P0:0 P1:1 P2:2 P3:0 Total:3".
"""
return f"P0:{stats['P0']} P1:{stats['P1']} P2:{stats['P2']} P3:{stats['P3']} Total:{stats['Total']}"
# =============================================================================
# Main Entry (for testing)
# =============================================================================
if __name__ == "__main__":
stats = get_task_stats()
print(format_task_stats(stats))
print()
print("Pending tasks:")
for task in list_pending_tasks():
print(f" {task['priority']}|{task['id']}|{task['title']}|{task['status']}|{task['assignee']}")