infinite-agents-public/infinite_variants/infinite_variant_6/state_manager.py

499 lines
16 KiB
Python

#!/usr/bin/env python3
"""
State Manager Utility for Stateful Infinite Loop
Provides programmatic interface for state management operations:
- Load and save state files
- Validate state consistency
- Update iteration records
- Track URL usage
- Compute validation hashes
Usage:
from state_manager import StateManager
# Create manager
sm = StateManager('.claude/state')
# Load state
state = sm.load_state('run_20250310_143022')
# Validate consistency
score = sm.validate_consistency(state)
# Update state
sm.add_iteration(state, iteration_data)
sm.save_state(state)
"""
import json
import os
import hashlib
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional, Tuple
from glob import glob
class StateManager:
"""Manages state files for stateful infinite loop runs."""
def __init__(self, state_dir: str = '.claude/state'):
"""Initialize state manager.
Args:
state_dir: Directory containing state files
"""
self.state_dir = Path(state_dir)
self.state_dir.mkdir(parents=True, exist_ok=True)
def create_run_id(self) -> str:
"""Generate new run ID based on current timestamp.
Returns:
Run ID string (format: run_YYYYMMDD_HHMMSS)
"""
return f"run_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
def get_state_file(self, run_id: str) -> Path:
"""Get path to state file for run ID.
Args:
run_id: Run identifier
Returns:
Path to state file
"""
return self.state_dir / f"{run_id}.json"
def list_runs(self) -> List[str]:
"""List all available run IDs.
Returns:
List of run IDs
"""
state_files = glob(str(self.state_dir / "run_*.json"))
return [Path(f).stem for f in state_files]
def load_state(self, run_id: str) -> Dict:
"""Load state from file.
Args:
run_id: Run identifier
Returns:
State dictionary
Raises:
FileNotFoundError: If state file doesn't exist
json.JSONDecodeError: If state file is corrupted
"""
state_file = self.get_state_file(run_id)
with open(state_file, 'r') as f:
return json.load(f)
def save_state(self, state: Dict) -> None:
"""Save state to file atomically.
Uses temp file + rename for atomic write to prevent corruption.
Args:
state: State dictionary to save
"""
run_id = state['run_id']
state_file = self.get_state_file(run_id)
# Update timestamp
state['updated_at'] = datetime.now().isoformat() + 'Z'
# Write to temp file
temp_file = state_file.with_suffix('.tmp')
with open(temp_file, 'w') as f:
json.dump(state, f, indent=2)
# Atomic rename
temp_file.rename(state_file)
def create_state(self, spec_path: str, output_dir: str,
total_count: str, url_strategy_path: Optional[str] = None) -> Dict:
"""Create new state structure.
Args:
spec_path: Path to specification file
output_dir: Output directory for generated files
total_count: Total iterations (number or "infinite")
url_strategy_path: Optional URL strategy file
Returns:
New state dictionary
"""
run_id = self.create_run_id()
now = datetime.now().isoformat() + 'Z'
return {
"run_id": run_id,
"spec_path": spec_path,
"output_dir": output_dir,
"total_count": total_count,
"url_strategy_path": url_strategy_path,
"status": "in_progress",
"created_at": now,
"updated_at": now,
"completed_iterations": 0,
"failed_iterations": 0,
"iterations": [],
"used_urls": [],
"validation": {
"last_check": now,
"consistency_score": 1.0,
"issues": []
}
}
def add_iteration(self, state: Dict, iteration_data: Dict) -> None:
"""Add iteration record to state.
Args:
state: State dictionary to update
iteration_data: Iteration data to add
"""
state['iterations'].append(iteration_data)
# Update counters
if iteration_data['status'] == 'completed':
state['completed_iterations'] = max(
state['completed_iterations'],
iteration_data['number']
)
elif iteration_data['status'] == 'failed':
state['failed_iterations'] += 1
# Add URL to used list
if 'web_url' in iteration_data and iteration_data['web_url']:
if iteration_data['web_url'] not in state['used_urls']:
state['used_urls'].append(iteration_data['web_url'])
def compute_file_hash(self, file_path: str) -> str:
"""Compute SHA256 hash of file content.
Args:
file_path: Path to file
Returns:
First 16 characters of SHA256 hash
"""
sha256 = hashlib.sha256()
with open(file_path, 'rb') as f:
for chunk in iter(lambda: f.read(4096), b''):
sha256.update(chunk)
return sha256.hexdigest()[:16]
def validate_consistency(self, state: Dict) -> Tuple[float, List[Dict]]:
"""Validate state consistency using multiple checks.
Applies self-consistency principle with multiple independent
validation approaches and majority voting.
Args:
state: State dictionary to validate
Returns:
Tuple of (consistency_score, validation_results)
- consistency_score: Float from 0.0 to 1.0
- validation_results: List of validation check results
"""
validations = []
# Check 1: Schema validation
required_fields = [
"run_id", "spec_path", "output_dir", "total_count",
"status", "created_at", "updated_at", "completed_iterations",
"iterations", "used_urls", "validation"
]
schema_valid = all(field in state for field in required_fields)
validations.append({
"name": "Schema Validation",
"passed": schema_valid,
"details": "All required fields present" if schema_valid else "Missing fields"
})
# Check 2: File count
output_dir = state['output_dir']
if os.path.exists(output_dir):
file_count = len(glob(f"{output_dir}/*"))
expected_count = state['completed_iterations']
file_check = file_count >= expected_count
validations.append({
"name": "File Count",
"passed": file_check,
"details": f"Expected: >={expected_count}, Actual: {file_count}"
})
else:
validations.append({
"name": "File Count",
"passed": False,
"details": f"Output directory does not exist: {output_dir}"
})
# Check 3: Iteration consistency
iteration_count = len([i for i in state['iterations'] if i['status'] == 'completed'])
iteration_check = iteration_count == state['completed_iterations']
validations.append({
"name": "Iteration Records",
"passed": iteration_check,
"details": f"Expected: {state['completed_iterations']}, Actual: {iteration_count}"
})
# Check 4: URL uniqueness
total_urls = len(state['used_urls'])
unique_urls = len(set(state['used_urls']))
url_check = total_urls == unique_urls
validations.append({
"name": "URL Uniqueness",
"passed": url_check,
"details": f"Total: {total_urls}, Unique: {unique_urls}"
})
# Check 5: File existence
missing_files = []
for iteration in state['iterations']:
if iteration['status'] == 'completed':
if not os.path.exists(iteration['output_file']):
missing_files.append(iteration['output_file'])
existence_check = len(missing_files) == 0
validations.append({
"name": "File Existence",
"passed": existence_check,
"details": f"All files exist" if existence_check else f"Missing: {len(missing_files)} files"
})
# Check 6: Timestamp validity
try:
created = datetime.fromisoformat(state['created_at'].replace('Z', '+00:00'))
updated = datetime.fromisoformat(state['updated_at'].replace('Z', '+00:00'))
timestamp_check = updated >= created
validations.append({
"name": "Timestamp Validity",
"passed": timestamp_check,
"details": "Valid chronology" if timestamp_check else "Updated before created"
})
except Exception as e:
validations.append({
"name": "Timestamp Validity",
"passed": False,
"details": f"Invalid timestamp format: {e}"
})
# Compute consistency score via majority voting
consistency_score = sum(v["passed"] for v in validations) / len(validations)
return consistency_score, validations
def update_validation(self, state: Dict) -> None:
"""Update validation metadata in state.
Args:
state: State dictionary to update
"""
consistency_score, validations = self.validate_consistency(state)
issues = [v for v in validations if not v["passed"]]
state['validation'] = {
"last_check": datetime.now().isoformat() + 'Z',
"consistency_score": consistency_score,
"issues": [v["details"] for v in issues]
}
def rebuild_from_files(self, run_id: str, spec_path: str,
output_dir: str, total_count: str) -> Dict:
"""Rebuild state from output directory files.
Args:
run_id: Run identifier to use
spec_path: Specification file path
output_dir: Output directory to scan
total_count: Total iteration count
Returns:
Rebuilt state dictionary
"""
# Scan output directory
output_files = sorted(glob(f"{output_dir}/*"))
iterations = []
used_urls = set()
for file_path in output_files:
# Extract iteration number from filename
filename = os.path.basename(file_path)
import re
match = re.search(r'_(\d+)\.[^.]+$', filename)
iteration_num = int(match.group(1)) if match else len(iterations) + 1
# Compute file hash
file_hash = self.compute_file_hash(file_path)
# Try to extract metadata from file
web_url = "unknown"
try:
with open(file_path, 'r') as f:
content = f.read(5000)
# Look for metadata div
metadata_match = re.search(
r'<div id="metadata"[^>]*>(.*?)</div>',
content,
re.DOTALL
)
if metadata_match:
metadata_json = metadata_match.group(1).strip()
metadata = json.loads(metadata_json)
web_url = metadata.get('web_source', 'unknown')
used_urls.add(web_url)
except:
pass
# Get file modification time
mtime = os.path.getmtime(file_path)
completed_at = datetime.fromtimestamp(mtime).isoformat() + 'Z'
iterations.append({
"number": iteration_num,
"status": "completed",
"output_file": file_path,
"web_url": web_url,
"started_at": completed_at,
"completed_at": completed_at,
"validation_hash": file_hash,
"metadata": {"rebuilt": True}
})
# Sort by iteration number
iterations.sort(key=lambda x: x['number'])
# Create state
now = datetime.now().isoformat() + 'Z'
state = {
"run_id": run_id,
"spec_path": spec_path,
"output_dir": output_dir,
"total_count": total_count,
"url_strategy_path": None,
"status": "paused",
"created_at": iterations[0]['completed_at'] if iterations else now,
"updated_at": iterations[-1]['completed_at'] if iterations else now,
"completed_iterations": len(iterations),
"failed_iterations": 0,
"iterations": iterations,
"used_urls": list(used_urls),
"validation": {
"last_check": now,
"consistency_score": 1.0,
"issues": [],
"rebuilt": True
}
}
return state
def get_next_iteration(self, state: Dict) -> int:
"""Get next iteration number to generate.
Args:
state: State dictionary
Returns:
Next iteration number
"""
return state['completed_iterations'] + 1
def is_url_used(self, state: Dict, url: str) -> bool:
"""Check if URL has been used.
Args:
state: State dictionary
url: URL to check
Returns:
True if URL already used
"""
return url in state['used_urls']
def get_available_urls(self, state: Dict, url_strategy: Dict) -> List[str]:
"""Get list of available (unused) URLs from strategy.
Args:
state: State dictionary
url_strategy: URL strategy dictionary
Returns:
List of available URLs
"""
available = []
for level, urls in url_strategy.items():
for url in urls:
if not self.is_url_used(state, url):
available.append(url)
return available
if __name__ == "__main__":
# Example usage
import sys
sm = StateManager()
if len(sys.argv) < 2:
print("Usage: python state_manager.py <command> [args]")
print("Commands:")
print(" list - List all runs")
print(" validate <run_id> - Validate state consistency")
print(" info <run_id> - Show run information")
sys.exit(1)
command = sys.argv[1]
if command == "list":
runs = sm.list_runs()
print(f"Available runs: {len(runs)}")
for run_id in runs:
state = sm.load_state(run_id)
print(f" {run_id}: {state['status']} - {state['completed_iterations']} iterations")
elif command == "validate":
if len(sys.argv) < 3:
print("Usage: python state_manager.py validate <run_id>")
sys.exit(1)
run_id = sys.argv[2]
state = sm.load_state(run_id)
score, validations = sm.validate_consistency(state)
print(f"Consistency Score: {score:.2f} ({score * 100:.0f}%)")
print("\nValidation Results:")
for v in validations:
status = "✓ PASS" if v["passed"] else "✗ FAIL"
print(f"{status}: {v['name']}")
print(f" {v['details']}")
elif command == "info":
if len(sys.argv) < 3:
print("Usage: python state_manager.py info <run_id>")
sys.exit(1)
run_id = sys.argv[2]
state = sm.load_state(run_id)
print(f"Run ID: {state['run_id']}")
print(f"Status: {state['status']}")
print(f"Spec: {state['spec_path']}")
print(f"Output: {state['output_dir']}")
print(f"Progress: {state['completed_iterations']} of {state['total_count']}")
print(f"URLs Used: {len(state['used_urls'])}")
print(f"Created: {state['created_at']}")
print(f"Updated: {state['updated_at']}")
else:
print(f"Unknown command: {command}")
sys.exit(1)