"""Governance DAG construction and querying.""" from __future__ import annotations from dataclasses import dataclass, field from .parser import GovernanceDoc from .validator import detect_cycles @dataclass class DAGNode: doc: GovernanceDoc children: list[str] = field(default_factory=list) parents: list[str] = field(default_factory=list) @dataclass class GovernanceDAG: nodes: dict[str, DAGNode] = field(default_factory=dict) def add_doc(self, doc: GovernanceDoc) -> None: """Add a document to the DAG.""" if doc.doc_id not in self.nodes: self.nodes[doc.doc_id] = DAGNode(doc=doc) else: self.nodes[doc.doc_id].doc = doc # Wire edges for dep_id in doc.depends_on: if dep_id not in self.nodes: # Placeholder — will be resolved when dep is added self.nodes[dep_id] = DAGNode( doc=GovernanceDoc(doc_id=dep_id, doc_kind="unknown") ) self.nodes[dep_id].children.append(doc.doc_id) self.nodes[doc.doc_id].parents.append(dep_id) def validate(self) -> list[str]: """Validate the DAG. Returns list of errors (empty = valid).""" errors = [] # Check for cycles adj = { node_id: node.parents for node_id, node in self.nodes.items() } cycles = detect_cycles(adj) for cycle in cycles: errors.append(f"Cycle detected: {' → '.join(cycle)}") # Check for unresolved placeholders for node_id, node in self.nodes.items(): if node.doc.doc_kind == "unknown": errors.append(f"Unresolved dependency: {node_id}") return errors def roots(self) -> list[str]: """Return doc_ids with no dependencies (DAG roots).""" return [ nid for nid, node in self.nodes.items() if not node.parents ] def topological_order(self) -> list[str]: """Return doc_ids in topological order (dependencies first).""" visited: set[str] = set() order: list[str] = [] def visit(node_id: str) -> None: if node_id in visited: return visited.add(node_id) for parent in self.nodes[node_id].parents: visit(parent) order.append(node_id) for nid in self.nodes: visit(nid) return order def to_dict(self) -> dict: """Serialize DAG to a JSON-friendly dict.""" return { "nodes": [ { "doc_id": nid, "doc_kind": node.doc.doc_kind, "title": node.doc.title, "status": node.doc.status, "parents": node.parents, "children": node.children, } for nid, node in self.nodes.items() ], "roots": self.roots(), "topological_order": self.topological_order(), } def build_dag(docs: list[GovernanceDoc]) -> GovernanceDAG: """Build a GovernanceDAG from a list of parsed documents.""" dag = GovernanceDAG() for doc in docs: dag.add_doc(doc) return dag