erowid-bot/app/scraper/experiences.py

245 lines
8.4 KiB
Python

import asyncio
import re
import logging
from datetime import datetime, timezone
import httpx
from bs4 import BeautifulSoup
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.config import settings
from app.database import async_session
from app.models import Experience
logger = logging.getLogger(__name__)
BASE_URL = "https://erowid.org"
EXP_LIST_URL = "https://erowid.org/experiences/exp_list.shtml"
REPORT_URL = "https://erowid.org/experiences/exp.php?ID={id}"
HEADERS = {
"User-Agent": "ErowidResearchBot/1.0 (educational research project)",
"Accept": "text/html,application/xhtml+xml",
}
async def get_all_substance_pages(client: httpx.AsyncClient) -> list[dict]:
"""Get main substance experience listing pages from the master index.
Only fetches top-level substance pages (e.g. exp_LSD.shtml), not
category sub-pages (e.g. exp_LSD_General.shtml) since the main page
already contains all report IDs for that substance.
"""
resp = await client.get(EXP_LIST_URL, headers=HEADERS, timeout=60)
resp.raise_for_status()
soup = BeautifulSoup(resp.text, "lxml")
pages = []
seen_substances = set()
for a in soup.select("a[href]"):
href = a.get("href", "")
name = a.get_text(strip=True)
if not href.startswith("subs/exp_") or not href.endswith(".shtml") or not name:
continue
full_url = f"https://erowid.org/experiences/{href}"
# Extract the base substance name from the URL
# e.g. subs/exp_LSD.shtml -> LSD, subs/exp_LSD_General.shtml -> LSD
filename = href.replace("subs/exp_", "").replace(".shtml", "")
# Skip category sub-pages — they contain subsets of the main page
# Category sub-pages have suffixes like _General, _First_Times, _Bad_Trips, etc.
known_categories = [
"_General", "_First_Times", "_Combinations", "_Retrospective",
"_Preparation", "_Difficult_Experiences", "_Bad_Trips",
"_Health_Problems", "_Train_Wrecks", "_Glowing_Experiences",
"_Mystical_Experiences", "_Health_Benefits", "_What_Was_in_That",
"_Medical_Use", "_Performance_Enhancement", "_Addiction",
]
is_category = any(filename.endswith(cat) for cat in known_categories)
if is_category:
continue
if full_url not in seen_substances:
seen_substances.add(full_url)
pages.append({"name": name, "url": full_url})
logger.info(f"Found {len(pages)} main substance experience pages (filtered from category sub-pages)")
return pages
async def get_experience_ids_from_page(client: httpx.AsyncClient, url: str) -> list[int]:
"""Extract all experience report IDs from a substance listing page."""
try:
resp = await client.get(url, headers=HEADERS, timeout=30)
resp.raise_for_status()
except httpx.HTTPError as e:
logger.warning(f"Failed to fetch {url}: {e}")
return []
ids = [int(x) for x in re.findall(r"exp\.php\?ID=(\d+)", resp.text)]
return list(set(ids)) # dedupe
async def get_all_experience_ids(client: httpx.AsyncClient) -> list[int]:
"""Collect all unique experience IDs from all substance pages.
Fetches pages concurrently in batches of 5 for speed.
"""
pages = await get_all_substance_pages(client)
all_ids = set()
batch_size = 5
for i in range(0, len(pages), batch_size):
batch = pages[i : i + batch_size]
tasks = [get_experience_ids_from_page(client, p["url"]) for p in batch]
results = await asyncio.gather(*tasks, return_exceptions=True)
for result in results:
if isinstance(result, list):
all_ids.update(result)
logger.info(f"Scanned {min(i + batch_size, len(pages))}/{len(pages)} pages, {len(all_ids)} unique IDs")
await asyncio.sleep(0.5)
logger.info(f"Found {len(all_ids)} unique experience IDs total")
return sorted(all_ids)
async def scrape_experience_report(client: httpx.AsyncClient, erowid_id: int) -> dict | None:
"""Scrape a single experience report."""
url = REPORT_URL.format(id=erowid_id)
try:
resp = await client.get(url, headers=HEADERS, timeout=30)
resp.raise_for_status()
except httpx.HTTPError as e:
logger.warning(f"Failed to fetch experience {erowid_id}: {e}")
return None
soup = BeautifulSoup(resp.text, "lxml")
# Extract the main report body
body_div = soup.select_one("div.report-text-surround")
if not body_div:
logger.warning(f"No report body found for {erowid_id}")
return None
# Remove the dosechart from body text to avoid duplication
body_text_parts = []
for el in body_div.children:
if hasattr(el, "name") and el.name == "table":
continue # skip dosechart table
text = el.get_text(separator="\n", strip=True) if hasattr(el, "get_text") else str(el).strip()
if text:
body_text_parts.append(text)
body = "\n\n".join(body_text_parts)
if not body or len(body) < 50:
return None
# Extract metadata
title = ""
title_el = soup.select_one("div.title")
if title_el:
title = title_el.get_text(strip=True)
substance = ""
sub_el = soup.select_one("div.substance")
if sub_el:
substance = sub_el.get_text(strip=True)
substance_list = [s.strip() for s in re.split(r"[,&]", substance) if s.strip()]
author = ""
author_el = soup.select_one("div.author")
if author_el:
author = author_el.get_text(strip=True).replace("by ", "")
# Dosage info
dose_table = soup.select_one("table.dosechart")
dose_text = ""
if dose_table:
rows = dose_table.select("tr")
dose_parts = []
for row in rows:
cells = row.select("td")
row_text = " ".join(c.get_text(strip=True) for c in cells if c.get_text(strip=True))
if row_text:
dose_parts.append(row_text)
dose_text = "; ".join(dose_parts)
if dose_text:
body = f"Dosage: {dose_text}\n\n{body}"
# Weight/gender from the body weight line inside report-text-surround
gender = ""
age = ""
weight_el = soup.select_one("table.bodyweight")
if weight_el:
wt = weight_el.get_text(strip=True)
age_match = re.search(r"(\d+)\s*yr", wt, re.IGNORECASE)
if age_match:
age = age_match.group(1)
if "male" in wt.lower():
gender = "Male"
elif "female" in wt.lower():
gender = "Female"
# Try to extract category from the page
category = ""
cat_el = soup.select_one("div.foot-eroid-cat")
if cat_el:
category = cat_el.get_text(strip=True)
return {
"erowid_id": erowid_id,
"title": title,
"author": author,
"substance": substance,
"substance_list": substance_list,
"body": body,
"category": category,
"gender": gender,
"age": age,
"url": url,
"raw_html": resp.text,
}
async def scrape_all_experiences(limit: int | None = None):
"""Main scraper entry point. Scrapes all experience reports into the database."""
async with httpx.AsyncClient(timeout=30, follow_redirects=True) as client:
all_ids = await get_all_experience_ids(client)
if limit:
all_ids = all_ids[:limit]
async with async_session() as db:
result = await db.execute(select(Experience.erowid_id))
existing_ids = {row[0] for row in result.fetchall()}
logger.info(f"Already have {len(existing_ids)} experiences in DB")
to_scrape = [eid for eid in all_ids if eid not in existing_ids]
logger.info(f"Need to scrape {len(to_scrape)} new experiences")
scraped = 0
errors = 0
for eid in to_scrape:
data = await scrape_experience_report(client, eid)
if data:
exp = Experience(**data)
db.add(exp)
scraped += 1
if scraped % settings.scrape_batch_size == 0:
await db.commit()
logger.info(f"Committed batch: {scraped}/{len(to_scrape)} scraped ({errors} errors)")
else:
errors += 1
await asyncio.sleep(settings.scrape_delay)
await db.commit()
logger.info(f"Done! Scraped {scraped} new experiences ({errors} errors)")
return scraped