107 lines
3.2 KiB
Python
107 lines
3.2 KiB
Python
"""Governance DAG construction and querying."""
|
|
|
|
from __future__ import annotations
|
|
from dataclasses import dataclass, field
|
|
|
|
from .parser import GovernanceDoc
|
|
from .validator import detect_cycles
|
|
|
|
|
|
@dataclass
|
|
class DAGNode:
|
|
doc: GovernanceDoc
|
|
children: list[str] = field(default_factory=list)
|
|
parents: list[str] = field(default_factory=list)
|
|
|
|
|
|
@dataclass
|
|
class GovernanceDAG:
|
|
nodes: dict[str, DAGNode] = field(default_factory=dict)
|
|
|
|
def add_doc(self, doc: GovernanceDoc) -> None:
|
|
"""Add a document to the DAG."""
|
|
if doc.doc_id not in self.nodes:
|
|
self.nodes[doc.doc_id] = DAGNode(doc=doc)
|
|
else:
|
|
self.nodes[doc.doc_id].doc = doc
|
|
|
|
# Wire edges
|
|
for dep_id in doc.depends_on:
|
|
if dep_id not in self.nodes:
|
|
# Placeholder — will be resolved when dep is added
|
|
self.nodes[dep_id] = DAGNode(
|
|
doc=GovernanceDoc(doc_id=dep_id, doc_kind="unknown")
|
|
)
|
|
self.nodes[dep_id].children.append(doc.doc_id)
|
|
self.nodes[doc.doc_id].parents.append(dep_id)
|
|
|
|
def validate(self) -> list[str]:
|
|
"""Validate the DAG. Returns list of errors (empty = valid)."""
|
|
errors = []
|
|
|
|
# Check for cycles
|
|
adj = {
|
|
node_id: node.parents for node_id, node in self.nodes.items()
|
|
}
|
|
cycles = detect_cycles(adj)
|
|
for cycle in cycles:
|
|
errors.append(f"Cycle detected: {' → '.join(cycle)}")
|
|
|
|
# Check for unresolved placeholders
|
|
for node_id, node in self.nodes.items():
|
|
if node.doc.doc_kind == "unknown":
|
|
errors.append(f"Unresolved dependency: {node_id}")
|
|
|
|
return errors
|
|
|
|
def roots(self) -> list[str]:
|
|
"""Return doc_ids with no dependencies (DAG roots)."""
|
|
return [
|
|
nid for nid, node in self.nodes.items()
|
|
if not node.parents
|
|
]
|
|
|
|
def topological_order(self) -> list[str]:
|
|
"""Return doc_ids in topological order (dependencies first)."""
|
|
visited: set[str] = set()
|
|
order: list[str] = []
|
|
|
|
def visit(node_id: str) -> None:
|
|
if node_id in visited:
|
|
return
|
|
visited.add(node_id)
|
|
for parent in self.nodes[node_id].parents:
|
|
visit(parent)
|
|
order.append(node_id)
|
|
|
|
for nid in self.nodes:
|
|
visit(nid)
|
|
|
|
return order
|
|
|
|
def to_dict(self) -> dict:
|
|
"""Serialize DAG to a JSON-friendly dict."""
|
|
return {
|
|
"nodes": [
|
|
{
|
|
"doc_id": nid,
|
|
"doc_kind": node.doc.doc_kind,
|
|
"title": node.doc.title,
|
|
"status": node.doc.status,
|
|
"parents": node.parents,
|
|
"children": node.children,
|
|
}
|
|
for nid, node in self.nodes.items()
|
|
],
|
|
"roots": self.roots(),
|
|
"topological_order": self.topological_order(),
|
|
}
|
|
|
|
|
|
def build_dag(docs: list[GovernanceDoc]) -> GovernanceDAG:
|
|
"""Build a GovernanceDAG from a list of parsed documents."""
|
|
dag = GovernanceDAG()
|
|
for doc in docs:
|
|
dag.add_doc(doc)
|
|
return dag
|