#!/usr/bin/env python3 """ State Manager Utility for Stateful Infinite Loop Provides programmatic interface for state management operations: - Load and save state files - Validate state consistency - Update iteration records - Track URL usage - Compute validation hashes Usage: from state_manager import StateManager # Create manager sm = StateManager('.claude/state') # Load state state = sm.load_state('run_20250310_143022') # Validate consistency score = sm.validate_consistency(state) # Update state sm.add_iteration(state, iteration_data) sm.save_state(state) """ import json import os import hashlib from datetime import datetime from pathlib import Path from typing import Dict, List, Optional, Tuple from glob import glob class StateManager: """Manages state files for stateful infinite loop runs.""" def __init__(self, state_dir: str = '.claude/state'): """Initialize state manager. Args: state_dir: Directory containing state files """ self.state_dir = Path(state_dir) self.state_dir.mkdir(parents=True, exist_ok=True) def create_run_id(self) -> str: """Generate new run ID based on current timestamp. Returns: Run ID string (format: run_YYYYMMDD_HHMMSS) """ return f"run_{datetime.now().strftime('%Y%m%d_%H%M%S')}" def get_state_file(self, run_id: str) -> Path: """Get path to state file for run ID. Args: run_id: Run identifier Returns: Path to state file """ return self.state_dir / f"{run_id}.json" def list_runs(self) -> List[str]: """List all available run IDs. Returns: List of run IDs """ state_files = glob(str(self.state_dir / "run_*.json")) return [Path(f).stem for f in state_files] def load_state(self, run_id: str) -> Dict: """Load state from file. Args: run_id: Run identifier Returns: State dictionary Raises: FileNotFoundError: If state file doesn't exist json.JSONDecodeError: If state file is corrupted """ state_file = self.get_state_file(run_id) with open(state_file, 'r') as f: return json.load(f) def save_state(self, state: Dict) -> None: """Save state to file atomically. Uses temp file + rename for atomic write to prevent corruption. Args: state: State dictionary to save """ run_id = state['run_id'] state_file = self.get_state_file(run_id) # Update timestamp state['updated_at'] = datetime.now().isoformat() + 'Z' # Write to temp file temp_file = state_file.with_suffix('.tmp') with open(temp_file, 'w') as f: json.dump(state, f, indent=2) # Atomic rename temp_file.rename(state_file) def create_state(self, spec_path: str, output_dir: str, total_count: str, url_strategy_path: Optional[str] = None) -> Dict: """Create new state structure. Args: spec_path: Path to specification file output_dir: Output directory for generated files total_count: Total iterations (number or "infinite") url_strategy_path: Optional URL strategy file Returns: New state dictionary """ run_id = self.create_run_id() now = datetime.now().isoformat() + 'Z' return { "run_id": run_id, "spec_path": spec_path, "output_dir": output_dir, "total_count": total_count, "url_strategy_path": url_strategy_path, "status": "in_progress", "created_at": now, "updated_at": now, "completed_iterations": 0, "failed_iterations": 0, "iterations": [], "used_urls": [], "validation": { "last_check": now, "consistency_score": 1.0, "issues": [] } } def add_iteration(self, state: Dict, iteration_data: Dict) -> None: """Add iteration record to state. Args: state: State dictionary to update iteration_data: Iteration data to add """ state['iterations'].append(iteration_data) # Update counters if iteration_data['status'] == 'completed': state['completed_iterations'] = max( state['completed_iterations'], iteration_data['number'] ) elif iteration_data['status'] == 'failed': state['failed_iterations'] += 1 # Add URL to used list if 'web_url' in iteration_data and iteration_data['web_url']: if iteration_data['web_url'] not in state['used_urls']: state['used_urls'].append(iteration_data['web_url']) def compute_file_hash(self, file_path: str) -> str: """Compute SHA256 hash of file content. Args: file_path: Path to file Returns: First 16 characters of SHA256 hash """ sha256 = hashlib.sha256() with open(file_path, 'rb') as f: for chunk in iter(lambda: f.read(4096), b''): sha256.update(chunk) return sha256.hexdigest()[:16] def validate_consistency(self, state: Dict) -> Tuple[float, List[Dict]]: """Validate state consistency using multiple checks. Applies self-consistency principle with multiple independent validation approaches and majority voting. Args: state: State dictionary to validate Returns: Tuple of (consistency_score, validation_results) - consistency_score: Float from 0.0 to 1.0 - validation_results: List of validation check results """ validations = [] # Check 1: Schema validation required_fields = [ "run_id", "spec_path", "output_dir", "total_count", "status", "created_at", "updated_at", "completed_iterations", "iterations", "used_urls", "validation" ] schema_valid = all(field in state for field in required_fields) validations.append({ "name": "Schema Validation", "passed": schema_valid, "details": "All required fields present" if schema_valid else "Missing fields" }) # Check 2: File count output_dir = state['output_dir'] if os.path.exists(output_dir): file_count = len(glob(f"{output_dir}/*")) expected_count = state['completed_iterations'] file_check = file_count >= expected_count validations.append({ "name": "File Count", "passed": file_check, "details": f"Expected: >={expected_count}, Actual: {file_count}" }) else: validations.append({ "name": "File Count", "passed": False, "details": f"Output directory does not exist: {output_dir}" }) # Check 3: Iteration consistency iteration_count = len([i for i in state['iterations'] if i['status'] == 'completed']) iteration_check = iteration_count == state['completed_iterations'] validations.append({ "name": "Iteration Records", "passed": iteration_check, "details": f"Expected: {state['completed_iterations']}, Actual: {iteration_count}" }) # Check 4: URL uniqueness total_urls = len(state['used_urls']) unique_urls = len(set(state['used_urls'])) url_check = total_urls == unique_urls validations.append({ "name": "URL Uniqueness", "passed": url_check, "details": f"Total: {total_urls}, Unique: {unique_urls}" }) # Check 5: File existence missing_files = [] for iteration in state['iterations']: if iteration['status'] == 'completed': if not os.path.exists(iteration['output_file']): missing_files.append(iteration['output_file']) existence_check = len(missing_files) == 0 validations.append({ "name": "File Existence", "passed": existence_check, "details": f"All files exist" if existence_check else f"Missing: {len(missing_files)} files" }) # Check 6: Timestamp validity try: created = datetime.fromisoformat(state['created_at'].replace('Z', '+00:00')) updated = datetime.fromisoformat(state['updated_at'].replace('Z', '+00:00')) timestamp_check = updated >= created validations.append({ "name": "Timestamp Validity", "passed": timestamp_check, "details": "Valid chronology" if timestamp_check else "Updated before created" }) except Exception as e: validations.append({ "name": "Timestamp Validity", "passed": False, "details": f"Invalid timestamp format: {e}" }) # Compute consistency score via majority voting consistency_score = sum(v["passed"] for v in validations) / len(validations) return consistency_score, validations def update_validation(self, state: Dict) -> None: """Update validation metadata in state. Args: state: State dictionary to update """ consistency_score, validations = self.validate_consistency(state) issues = [v for v in validations if not v["passed"]] state['validation'] = { "last_check": datetime.now().isoformat() + 'Z', "consistency_score": consistency_score, "issues": [v["details"] for v in issues] } def rebuild_from_files(self, run_id: str, spec_path: str, output_dir: str, total_count: str) -> Dict: """Rebuild state from output directory files. Args: run_id: Run identifier to use spec_path: Specification file path output_dir: Output directory to scan total_count: Total iteration count Returns: Rebuilt state dictionary """ # Scan output directory output_files = sorted(glob(f"{output_dir}/*")) iterations = [] used_urls = set() for file_path in output_files: # Extract iteration number from filename filename = os.path.basename(file_path) import re match = re.search(r'_(\d+)\.[^.]+$', filename) iteration_num = int(match.group(1)) if match else len(iterations) + 1 # Compute file hash file_hash = self.compute_file_hash(file_path) # Try to extract metadata from file web_url = "unknown" try: with open(file_path, 'r') as f: content = f.read(5000) # Look for metadata div metadata_match = re.search( r'