273 lines
9.0 KiB
Python
273 lines
9.0 KiB
Python
"""Blog HTML parser - extracts blog posts from cached WordPress HTML files."""
|
|
|
|
import json
|
|
import re
|
|
import zipfile
|
|
from dataclasses import dataclass, field, asdict
|
|
from pathlib import Path
|
|
from typing import Iterator, Optional
|
|
from bs4 import BeautifulSoup
|
|
from rich.progress import Progress
|
|
from rich.console import Console
|
|
|
|
from .config import settings
|
|
|
|
console = Console()
|
|
|
|
|
|
@dataclass
|
|
class BlogPost:
|
|
"""Represents a parsed blog post."""
|
|
|
|
id: int
|
|
title: str
|
|
content: str # Raw HTML content
|
|
plain_text: str # Cleaned plain text for embedding
|
|
categories: list[str] = field(default_factory=list)
|
|
links: list[str] = field(default_factory=list) # Internal links
|
|
external_links: list[str] = field(default_factory=list)
|
|
timestamp: str = ""
|
|
contributor: str = ""
|
|
url: str = ""
|
|
description: str = ""
|
|
|
|
def to_dict(self) -> dict:
|
|
return asdict(self)
|
|
|
|
|
|
def clean_html_text(soup_element) -> str:
|
|
"""Extract clean text from HTML element."""
|
|
if not soup_element:
|
|
return ""
|
|
|
|
# Get text with space separators
|
|
text = soup_element.get_text(separator="\n", strip=True)
|
|
|
|
# Clean up whitespace
|
|
text = re.sub(r"\n{3,}", "\n\n", text)
|
|
text = re.sub(r" {2,}", " ", text)
|
|
|
|
return text.strip()
|
|
|
|
|
|
def extract_links(soup_element) -> tuple[list[str], list[str]]:
|
|
"""Extract internal and external links from HTML."""
|
|
internal = []
|
|
external = []
|
|
|
|
if not soup_element:
|
|
return internal, external
|
|
|
|
for a in soup_element.find_all("a", href=True):
|
|
href = a["href"]
|
|
if "blog.p2pfoundation.net" in href or "wiki.p2pfoundation.net" in href:
|
|
internal.append(href)
|
|
elif href.startswith("http"):
|
|
external.append(href)
|
|
|
|
return list(set(internal)), list(set(external))
|
|
|
|
|
|
def parse_blog_html(html_content: str, post_id: int, slug: str) -> Optional[BlogPost]:
|
|
"""Parse a single blog post HTML file."""
|
|
soup = BeautifulSoup(html_content, "html.parser")
|
|
|
|
# Extract title
|
|
title_tag = soup.find("title")
|
|
title = title_tag.text.replace(" | P2P Foundation", "").strip() if title_tag else slug.replace("-", " ").title()
|
|
|
|
# Extract description
|
|
desc_meta = soup.find("meta", attrs={"name": "description"})
|
|
description = desc_meta["content"] if desc_meta and desc_meta.get("content") else ""
|
|
|
|
# Extract published time
|
|
pub_meta = soup.find("meta", attrs={"property": "article:published_time"})
|
|
timestamp = pub_meta["content"] if pub_meta and pub_meta.get("content") else ""
|
|
|
|
# Extract author
|
|
author = ""
|
|
author_meta = soup.find("meta", attrs={"name": "author"})
|
|
if author_meta and author_meta.get("content"):
|
|
author = author_meta["content"]
|
|
else:
|
|
# Try to find in schema.org data
|
|
schema = soup.find("script", class_="yoast-schema-graph")
|
|
if schema:
|
|
try:
|
|
schema_data = json.loads(schema.string)
|
|
for item in schema_data.get("@graph", []):
|
|
if item.get("@type") == "Person":
|
|
author = item.get("name", "")
|
|
break
|
|
except (json.JSONDecodeError, TypeError):
|
|
pass
|
|
|
|
# Extract article content
|
|
article = soup.find("article")
|
|
if not article:
|
|
return None
|
|
|
|
# Get HTML content
|
|
content = str(article)
|
|
|
|
# Get plain text
|
|
plain_text = clean_html_text(article)
|
|
|
|
# Skip if too short (likely not a real post)
|
|
if len(plain_text) < 100:
|
|
return None
|
|
|
|
# Extract links
|
|
internal_links, external_links = extract_links(article)
|
|
|
|
# Extract categories from tags/categories section
|
|
categories = []
|
|
cat_links = soup.find_all("a", rel="category tag")
|
|
for cat in cat_links:
|
|
categories.append(cat.text.strip())
|
|
|
|
tag_links = soup.find_all("a", rel="tag")
|
|
for tag in tag_links:
|
|
if tag.text.strip() not in categories:
|
|
categories.append(tag.text.strip())
|
|
|
|
return BlogPost(
|
|
id=post_id,
|
|
title=title,
|
|
content=content,
|
|
plain_text=plain_text,
|
|
categories=categories,
|
|
links=internal_links,
|
|
external_links=external_links,
|
|
timestamp=timestamp,
|
|
contributor=author,
|
|
url=f"https://blog.p2pfoundation.net/{slug}/",
|
|
description=description,
|
|
)
|
|
|
|
|
|
def parse_blog_zip(zip_path: Path, output_path: Optional[Path] = None) -> list[BlogPost]:
|
|
"""Parse blog posts from a WordPress cache zip file."""
|
|
console.print(f"[cyan]Parsing blog posts from {zip_path}...[/cyan]")
|
|
|
|
posts = []
|
|
seen_slugs = set()
|
|
post_id = 100000 # Start high to avoid conflicts with wiki article IDs
|
|
|
|
# Pattern to match main blog post HTML files (not feeds, embeds, or date-specific)
|
|
post_pattern = re.compile(
|
|
r"blog\.p2pfoundation\.net/public_html/wp-content/cache/page_enhanced/blog\.p2pfoundation\.net/([^/]+)/_index_ssl\.html$"
|
|
)
|
|
|
|
with zipfile.ZipFile(zip_path, "r") as zf:
|
|
# Get all matching files
|
|
html_files = [
|
|
name for name in zf.namelist()
|
|
if post_pattern.search(name) and "/feed/" not in name and "/embed/" not in name
|
|
]
|
|
|
|
console.print(f"[green]Found {len(html_files)} blog post files[/green]")
|
|
|
|
with Progress() as progress:
|
|
task = progress.add_task("[cyan]Parsing blog posts...", total=len(html_files))
|
|
|
|
for filepath in html_files:
|
|
match = post_pattern.search(filepath)
|
|
if not match:
|
|
progress.advance(task)
|
|
continue
|
|
|
|
slug = match.group(1)
|
|
|
|
# Skip duplicates and special pages
|
|
if slug in seen_slugs:
|
|
progress.advance(task)
|
|
continue
|
|
|
|
# Skip non-post pages
|
|
skip_patterns = ["page", "category", "tag", "author", "feed", "wp-", "uploads"]
|
|
if any(slug.startswith(p) for p in skip_patterns):
|
|
progress.advance(task)
|
|
continue
|
|
|
|
seen_slugs.add(slug)
|
|
|
|
try:
|
|
with zf.open(filepath) as f:
|
|
html_content = f.read().decode("utf-8", errors="replace")
|
|
|
|
post = parse_blog_html(html_content, post_id, slug)
|
|
if post:
|
|
posts.append(post)
|
|
post_id += 1
|
|
except Exception as e:
|
|
console.print(f"[yellow]Warning: Could not parse {slug}: {e}[/yellow]")
|
|
|
|
progress.advance(task)
|
|
|
|
console.print(f"[green]Parsed {len(posts)} blog posts[/green]")
|
|
|
|
if output_path:
|
|
console.print(f"[cyan]Saving to {output_path}...[/cyan]")
|
|
with open(output_path, "w", encoding="utf-8") as f:
|
|
json.dump([p.to_dict() for p in posts], f, ensure_ascii=False, indent=2)
|
|
console.print(f"[green]Saved {len(posts)} blog posts to {output_path}[/green]")
|
|
|
|
return posts
|
|
|
|
|
|
def merge_with_wiki_articles(blog_posts: list[BlogPost], wiki_articles_path: Path, output_path: Path):
|
|
"""Merge blog posts with existing wiki articles."""
|
|
console.print(f"[cyan]Loading existing wiki articles from {wiki_articles_path}...[/cyan]")
|
|
|
|
with open(wiki_articles_path, "r", encoding="utf-8") as f:
|
|
wiki_articles = json.load(f)
|
|
|
|
console.print(f"[green]Loaded {len(wiki_articles)} wiki articles[/green]")
|
|
|
|
# Convert blog posts to same format as wiki articles
|
|
for post in blog_posts:
|
|
wiki_articles.append({
|
|
"id": post.id,
|
|
"title": f"[Blog] {post.title}", # Prefix to distinguish from wiki articles
|
|
"content": post.content,
|
|
"plain_text": post.plain_text,
|
|
"categories": post.categories,
|
|
"links": post.links,
|
|
"external_links": post.external_links,
|
|
"timestamp": post.timestamp,
|
|
"contributor": post.contributor,
|
|
})
|
|
|
|
console.print(f"[cyan]Saving merged articles to {output_path}...[/cyan]")
|
|
with open(output_path, "w", encoding="utf-8") as f:
|
|
json.dump(wiki_articles, f, ensure_ascii=False, indent=2)
|
|
|
|
console.print(f"[green]Saved {len(wiki_articles)} total articles[/green]")
|
|
|
|
|
|
def main():
|
|
"""CLI entry point for parsing blog content."""
|
|
# Look for blog zip file
|
|
blog_zip = Path("/mnt/c/Users/jeffe/Downloads/blog.p2pfoundation.net.zip")
|
|
|
|
if not blog_zip.exists():
|
|
console.print(f"[red]Blog zip not found at {blog_zip}[/red]")
|
|
return
|
|
|
|
# Parse blog posts
|
|
blog_output = settings.data_dir / "blog_posts.json"
|
|
posts = parse_blog_zip(blog_zip, blog_output)
|
|
|
|
# Merge with wiki articles
|
|
wiki_articles = settings.data_dir / "articles.json"
|
|
if wiki_articles.exists():
|
|
merged_output = settings.data_dir / "articles_with_blog.json"
|
|
merge_with_wiki_articles(posts, wiki_articles, merged_output)
|
|
console.print(f"[green]Merged articles saved to {merged_output}[/green]")
|
|
console.print("[yellow]To use merged articles, rename articles_with_blog.json to articles.json and re-run embeddings[/yellow]")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|