p2pwiki-ai/src/api.py

393 lines
10 KiB
Python

"""FastAPI backend for P2P Wiki AI system."""
import asyncio
from contextlib import asynccontextmanager
from pathlib import Path
from typing import Optional
from fastapi import FastAPI, HTTPException, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse
from pydantic import BaseModel, HttpUrl
from .config import settings
from .embeddings import WikiVectorStore
from .rag import WikiRAG, RAGResponse
from .ingress import IngressPipeline, get_review_queue, approve_item, reject_item
from .mediawiki import wiki_client
# Global instances
vector_store: Optional[WikiVectorStore] = None
rag_system: Optional[WikiRAG] = None
ingress_pipeline: Optional[IngressPipeline] = None
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Initialize services on startup."""
global vector_store, rag_system, ingress_pipeline
print("Initializing P2P Wiki AI system...")
# Check if vector store has been populated
chroma_path = settings.chroma_persist_dir
if not chroma_path.exists() or not any(chroma_path.iterdir()):
print("WARNING: Vector store not initialized. Run 'python -m src.parser' and 'python -m src.embeddings' first.")
else:
vector_store = WikiVectorStore()
rag_system = WikiRAG(vector_store)
ingress_pipeline = IngressPipeline(vector_store)
print(f"Loaded vector store with {vector_store.get_stats()['total_chunks']} chunks")
yield
print("Shutting down...")
app = FastAPI(
title="P2P Wiki AI",
description="AI-augmented system for P2P Foundation Wiki - chat agent and ingress pipeline",
version="0.1.0",
lifespan=lifespan,
)
# CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Configure appropriately for production
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# --- Request/Response Models ---
class ChatRequest(BaseModel):
"""Chat request model."""
query: str
n_results: int = 5
filter_categories: Optional[list[str]] = None
class ChatResponse(BaseModel):
"""Chat response model."""
answer: str
sources: list[dict]
query: str
class IngressRequest(BaseModel):
"""Ingress request model."""
url: HttpUrl
class IngressResponse(BaseModel):
"""Ingress response model."""
status: str
message: str
scraped_title: Optional[str] = None
topics_found: int = 0
wiki_matches: int = 0
drafts_generated: int = 0
queue_file: Optional[str] = None
class ReviewActionRequest(BaseModel):
"""Review action request model."""
filepath: str
item_type: str # "match" or "draft"
item_index: int
action: str # "approve" or "reject"
class DraftApproveRequest(BaseModel):
"""Request to approve a draft article."""
title: str # e.g., "Draft:Article_Name" or just "Article_Name"
# --- API Endpoints ---
@app.get("/")
async def root():
"""Root endpoint."""
return {
"name": "P2P Wiki AI",
"version": "0.1.0",
"status": "running",
"vector_store_ready": vector_store is not None,
}
@app.get("/health")
async def health():
"""Health check endpoint."""
return {
"status": "healthy",
"vector_store_ready": vector_store is not None,
}
@app.get("/stats")
async def stats():
"""Get system statistics."""
if not vector_store:
return {"error": "Vector store not initialized"}
return {
"vector_store": vector_store.get_stats(),
"review_queue_count": len(get_review_queue()),
}
# --- Chat Endpoints ---
@app.post("/chat", response_model=ChatResponse)
async def chat(request: ChatRequest):
"""Chat with the wiki knowledge base."""
if not rag_system:
raise HTTPException(
status_code=503,
detail="RAG system not initialized. Run indexing first.",
)
response = await rag_system.ask(
query=request.query,
n_results=request.n_results,
filter_categories=request.filter_categories,
)
return ChatResponse(
answer=response.answer,
sources=response.sources,
query=response.query,
)
@app.post("/chat/clear")
async def clear_chat():
"""Clear chat history."""
if rag_system:
rag_system.clear_history()
return {"status": "cleared"}
@app.get("/chat/suggestions")
async def chat_suggestions(q: str = ""):
"""Get article title suggestions for autocomplete."""
if not rag_system or not q:
return {"suggestions": []}
suggestions = rag_system.get_suggestions(q)
return {"suggestions": suggestions}
# --- Ingress Endpoints ---
async def process_ingress_background(url: str):
"""Background task to process ingress."""
try:
await ingress_pipeline.process(url)
except Exception as e:
print(f"Ingress error for {url}: {e}")
@app.post("/ingress", response_model=IngressResponse)
async def ingress(request: IngressRequest, background_tasks: BackgroundTasks):
"""
Process an external article URL through the ingress pipeline.
Returns immediately and processes in the background.
Check /review endpoint for results.
"""
if not ingress_pipeline:
raise HTTPException(
status_code=503,
detail="Ingress pipeline not initialized. Run indexing first.",
)
# Process in background to avoid Cloudflare timeout
background_tasks.add_task(process_ingress_background, str(request.url))
return IngressResponse(
status="processing",
message="Article submitted for processing. Check the Review tab for results.",
scraped_title=None,
topics_found=0,
wiki_matches=0,
drafts_generated=0,
queue_file=None,
)
# --- Review Queue Endpoints ---
@app.get("/review")
async def get_review_items():
"""Get all items in the review queue."""
items = get_review_queue()
return {"count": len(items), "items": items}
@app.get("/review/{filename}")
async def get_review_item(filename: str):
"""Get a specific review item."""
filepath = settings.review_queue_dir / filename
if not filepath.exists():
raise HTTPException(status_code=404, detail="Review item not found")
import json
with open(filepath, "r", encoding="utf-8") as f:
data = json.load(f)
return data
@app.post("/review/action")
async def review_action(request: ReviewActionRequest):
"""Approve or reject a review item."""
if request.action == "approve":
success = approve_item(request.filepath, request.item_type, request.item_index)
elif request.action == "reject":
success = reject_item(request.filepath, request.item_type, request.item_index)
else:
raise HTTPException(status_code=400, detail="Invalid action")
if success:
return {"status": "success", "action": request.action}
else:
raise HTTPException(status_code=500, detail="Action failed")
# --- Search Endpoints ---
@app.get("/search")
async def search(q: str, n: int = 10, categories: Optional[str] = None):
"""Direct search of the vector store."""
if not vector_store:
raise HTTPException(status_code=503, detail="Vector store not initialized")
filter_cats = categories.split(",") if categories else None
results = vector_store.search(q, n_results=n, filter_categories=filter_cats)
return {"query": q, "count": len(results), "results": results}
@app.get("/articles")
async def list_articles(limit: int = 100, offset: int = 0):
"""List article titles."""
if not vector_store:
raise HTTPException(status_code=503, detail="Vector store not initialized")
titles = vector_store.get_article_titles()
return {
"total": len(titles),
"limit": limit,
"offset": offset,
"titles": titles[offset : offset + limit],
}
# --- Wiki Draft Management Endpoints ---
@app.get("/wiki/auth")
async def wiki_auth_status():
"""Check wiki authentication status."""
try:
auth_info = await wiki_client.check_auth()
return auth_info
except Exception as e:
return {
"authenticated": False,
"error": str(e)
}
@app.get("/wiki/drafts")
async def list_wiki_drafts():
"""List draft articles pending review from the wiki."""
try:
drafts = await wiki_client.list_draft_articles()
return {
"count": len(drafts),
"drafts": drafts
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/wiki/approve")
async def approve_wiki_draft(request: DraftApproveRequest):
"""
Approve a draft article - moves it from Draft: namespace to main namespace.
Requires authentication with 'move' permission via wiki cookies.
"""
# Check authentication first
auth_info = await wiki_client.check_auth()
if not auth_info.get("authenticated"):
raise HTTPException(
status_code=401,
detail="Not authenticated. Please ensure wiki cookies are set up."
)
if not auth_info.get("can_move"):
raise HTTPException(
status_code=403,
detail=f"Move permission required. Current user: {auth_info.get('username')}"
)
# Approve the draft
result = await wiki_client.approve_draft(request.title)
if "error" in result:
raise HTTPException(status_code=400, detail=result["error"])
return result
# --- Static Files (Web UI) ---
web_dir = Path(__file__).parent.parent / "web"
if web_dir.exists():
app.mount("/static", StaticFiles(directory=str(web_dir)), name="static")
@app.get("/ui")
async def ui():
"""Serve the web UI."""
index_path = web_dir / "index.html"
if index_path.exists():
return FileResponse(index_path)
raise HTTPException(status_code=404, detail="Web UI not found")
def main():
"""Run the API server."""
import uvicorn
uvicorn.run(
"src.api:app",
host=settings.host,
port=settings.port,
reload=True,
)
if __name__ == "__main__":
main()