jellyfin-media/services/cost-monitor/monitor.py

208 lines
6.8 KiB
Python

#!/usr/bin/env python3
"""
R2 Cost Monitor Service
Tracks storage usage and costs for the R2-based media server.
Exposes metrics for Prometheus/Grafana integration.
"""
import os
import subprocess
import json
import time
from datetime import datetime
from http.server import HTTPServer, BaseHTTPRequestHandler
from threading import Thread
from loguru import logger
# Configuration
R2_BUCKET = os.getenv("R2_BUCKET", "plex-media")
RCLONE_CONFIG = os.getenv("RCLONE_CONFIG", "/config/rclone/rclone.conf")
METRICS_PORT = int(os.getenv("METRICS_PORT", "9100"))
UPDATE_INTERVAL = int(os.getenv("UPDATE_INTERVAL", "3600")) # 1 hour
# R2 Pricing (as of 2024)
R2_STORAGE_PRICE_PER_GB = 0.015 # per month
R2_CLASS_A_OPS_PRICE = 4.50 / 1_000_000 # per million (write, list)
R2_CLASS_B_OPS_PRICE = 0.36 / 1_000_000 # per million (read)
# NO EGRESS FEES! This is the killer feature
# Global metrics storage
metrics = {
"total_size_bytes": 0,
"total_files": 0,
"movies_size_bytes": 0,
"movies_files": 0,
"tv_size_bytes": 0,
"tv_files": 0,
"music_size_bytes": 0,
"music_files": 0,
"estimated_monthly_cost": 0.0,
"last_updated": 0,
}
def get_folder_stats(folder: str) -> dict:
"""Get size statistics for an R2 folder."""
cmd = [
"rclone", "size",
"--config", RCLONE_CONFIG,
"--json",
f"r2:{R2_BUCKET}/{folder}"
]
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=300)
if result.returncode == 0:
data = json.loads(result.stdout)
return {
"bytes": data.get("bytes", 0),
"count": data.get("count", 0)
}
except Exception as e:
logger.error(f"Failed to get stats for {folder}: {e}")
return {"bytes": 0, "count": 0}
def update_metrics():
"""Update all metrics from R2."""
global metrics
logger.info("Updating R2 metrics...")
# Get stats for each folder
movies = get_folder_stats("movies")
tv = get_folder_stats("tv")
music = get_folder_stats("music")
# Calculate totals
total_bytes = movies["bytes"] + tv["bytes"] + music["bytes"]
total_files = movies["count"] + tv["count"] + music["count"]
# Calculate monthly cost (storage only - no egress!)
total_gb = total_bytes / (1024**3)
monthly_cost = total_gb * R2_STORAGE_PRICE_PER_GB
# Update global metrics
metrics.update({
"total_size_bytes": total_bytes,
"total_files": total_files,
"movies_size_bytes": movies["bytes"],
"movies_files": movies["count"],
"tv_size_bytes": tv["bytes"],
"tv_files": tv["count"],
"music_size_bytes": music["bytes"],
"music_files": music["count"],
"estimated_monthly_cost": monthly_cost,
"last_updated": int(datetime.now().timestamp()),
})
logger.info(
f"R2 Stats: {total_gb:.2f} GB, {total_files} files, "
f"${monthly_cost:.2f}/month estimated"
)
def metrics_to_prometheus() -> str:
"""Convert metrics to Prometheus format."""
lines = [
"# HELP r2_storage_bytes Total storage used in R2",
"# TYPE r2_storage_bytes gauge",
f'r2_storage_bytes{{bucket="{R2_BUCKET}"}} {metrics["total_size_bytes"]}',
f'r2_storage_bytes{{bucket="{R2_BUCKET}",folder="movies"}} {metrics["movies_size_bytes"]}',
f'r2_storage_bytes{{bucket="{R2_BUCKET}",folder="tv"}} {metrics["tv_size_bytes"]}',
f'r2_storage_bytes{{bucket="{R2_BUCKET}",folder="music"}} {metrics["music_size_bytes"]}',
"",
"# HELP r2_files_total Total number of files in R2",
"# TYPE r2_files_total gauge",
f'r2_files_total{{bucket="{R2_BUCKET}"}} {metrics["total_files"]}',
f'r2_files_total{{bucket="{R2_BUCKET}",folder="movies"}} {metrics["movies_files"]}',
f'r2_files_total{{bucket="{R2_BUCKET}",folder="tv"}} {metrics["tv_files"]}',
f'r2_files_total{{bucket="{R2_BUCKET}",folder="music"}} {metrics["music_files"]}',
"",
"# HELP r2_estimated_cost_monthly Estimated monthly cost in USD",
"# TYPE r2_estimated_cost_monthly gauge",
f'r2_estimated_cost_monthly{{bucket="{R2_BUCKET}"}} {metrics["estimated_monthly_cost"]:.4f}',
"",
"# HELP r2_last_updated_timestamp Last metrics update timestamp",
"# TYPE r2_last_updated_timestamp gauge",
f'r2_last_updated_timestamp{{bucket="{R2_BUCKET}"}} {metrics["last_updated"]}',
"",
]
return "\n".join(lines)
class MetricsHandler(BaseHTTPRequestHandler):
"""HTTP handler for Prometheus metrics endpoint."""
def do_GET(self):
if self.path == "/metrics":
content = metrics_to_prometheus()
self.send_response(200)
self.send_header("Content-Type", "text/plain; charset=utf-8")
self.end_headers()
self.wfile.write(content.encode())
elif self.path == "/health":
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(b'{"status": "healthy"}')
elif self.path == "/stats":
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
# Add human-readable values
stats = metrics.copy()
stats["total_size_gb"] = metrics["total_size_bytes"] / (1024**3)
stats["movies_size_gb"] = metrics["movies_size_bytes"] / (1024**3)
stats["tv_size_gb"] = metrics["tv_size_bytes"] / (1024**3)
stats["music_size_gb"] = metrics["music_size_bytes"] / (1024**3)
self.wfile.write(json.dumps(stats, indent=2).encode())
else:
self.send_response(404)
self.end_headers()
def log_message(self, format, *args):
pass # Suppress default logging
def metrics_updater():
"""Background thread to update metrics periodically."""
while True:
try:
update_metrics()
except Exception as e:
logger.exception(f"Error updating metrics: {e}")
time.sleep(UPDATE_INTERVAL)
def main():
logger.info(f"Starting R2 Cost Monitor on port {METRICS_PORT}")
logger.info(f"Monitoring bucket: {R2_BUCKET}")
logger.info(f"Update interval: {UPDATE_INTERVAL}s")
# Initial metrics update
update_metrics()
# Start background updater
updater = Thread(target=metrics_updater, daemon=True)
updater.start()
# Start HTTP server
server = HTTPServer(("0.0.0.0", METRICS_PORT), MetricsHandler)
logger.info(f"Metrics server running on http://0.0.0.0:{METRICS_PORT}/metrics")
try:
server.serve_forever()
except KeyboardInterrupt:
logger.info("Shutting down...")
server.shutdown()
if __name__ == "__main__":
main()