Add Kuma Alert Agent — monitors Uptime Kuma for outages, diagnoses via Claude CLI, and executes fixes with email-based approval workflow

Polls Kuma API every 60s, alerts jeff@jeffemmett.com from claude@jeffemmett.com
with diagnosis + proposed fix. Supports APPROVE/REJECT/custom reply flow.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Jeff Emmett 2026-03-16 01:52:01 +00:00
commit 54aac7b70b
6 changed files with 676 additions and 0 deletions

31
.env.example Normal file
View File

@ -0,0 +1,31 @@
# Uptime Kuma
KUMA_URL=http://uptime-kuma:3001
KUMA_USERNAME=admin
KUMA_PASSWORD=changeme
# IMAP (reading approval replies)
IMAP_HOST=mail.rmail.online
IMAP_PORT=993
IMAP_USER=claude@jeffemmett.com
IMAP_PASS=changeme
# SMTP (sending alerts)
SMTP_HOST=mail.rmail.online
SMTP_PORT=587
SMTP_USER=claude@jeffemmett.com
SMTP_PASS=changeme
SMTP_FROM=claude@jeffemmett.com
# Alert routing
ALERT_TO=jeff@jeffemmett.com
ALLOWED_APPROVERS=jeff@jeffemmett.com
# Tuning
CHECK_INTERVAL=60
ALERT_THRESHOLD=3
MAX_BUDGET_DIAGNOSE=0.50
MAX_BUDGET_FIX=2.00
# Claude CLI
CLAUDE_CONTAINER=claude-dev
CLAUDE_WORKDIR=/opt/apps

3
.gitignore vendored Normal file
View File

@ -0,0 +1,3 @@
.env
__pycache__/
*.pyc

28
Dockerfile Normal file
View File

@ -0,0 +1,28 @@
FROM python:3.12-slim
# Install docker CLI for exec into claude-dev
RUN apt-get update && \
apt-get install -y --no-install-recommends curl gnupg && \
install -m 0755 -d /etc/apt/keyrings && \
curl -fsSL https://download.docker.com/linux/debian/gpg -o /etc/apt/keyrings/docker.asc && \
chmod a+r /etc/apt/keyrings/docker.asc && \
echo "deb [arch=$(dpkg --print-architecture) signed-by=/etc/apt/keyrings/docker.asc] https://download.docker.com/linux/debian bookworm stable" > /etc/apt/sources.list.d/docker.list && \
apt-get update && \
apt-get install -y --no-install-recommends docker-ce-cli && \
apt-get purge -y curl gnupg && \
apt-get autoremove -y && \
rm -rf /var/lib/apt/lists/*
# Agent user with docker group access
RUN groupadd -g 990 docker && \
useradd -r -u 1003 -g 990 agent
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY agent.py .
RUN mkdir -p /data && chown agent:docker /data
USER agent
CMD ["python", "-u", "agent.py"]

583
agent.py Normal file
View File

@ -0,0 +1,583 @@
"""
Kuma Alert Agent monitors Uptime Kuma for service outages,
diagnoses via Claude CLI, proposes fixes with email-based approval flow.
Flow: Poll Kuma -> Detect DOWN -> Diagnose (Claude read-only) -> Email alert
-> Wait for approval reply -> Execute fix (Claude auto-accept) -> Report
"""
import email
import email.utils
import imaplib
import json
import logging
import os
import re
import smtplib
import subprocess
import time
import uuid
from datetime import datetime, timedelta, timezone
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from pathlib import Path
from uptime_kuma_api import UptimeKumaApi
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[
logging.StreamHandler(),
logging.FileHandler("/data/agent.log"),
],
)
log = logging.getLogger("kuma-alert-agent")
# ─── Configuration ─────────────────────────────────────────────────────
KUMA_URL = os.environ["KUMA_URL"]
KUMA_USERNAME = os.environ["KUMA_USERNAME"]
KUMA_PASSWORD = os.environ["KUMA_PASSWORD"]
IMAP_HOST = os.environ["IMAP_HOST"]
IMAP_PORT = int(os.environ.get("IMAP_PORT", "993"))
IMAP_USER = os.environ["IMAP_USER"]
IMAP_PASS = os.environ["IMAP_PASS"]
SMTP_HOST = os.environ["SMTP_HOST"]
SMTP_PORT = int(os.environ.get("SMTP_PORT", "587"))
SMTP_USER = os.environ["SMTP_USER"]
SMTP_PASS = os.environ["SMTP_PASS"]
SMTP_FROM = os.environ.get("SMTP_FROM", SMTP_USER)
ALERT_TO = os.environ.get("ALERT_TO", "jeff@jeffemmett.com")
ALLOWED_APPROVERS = [
s.strip().lower()
for s in os.environ.get("ALLOWED_APPROVERS", "jeff@jeffemmett.com").split(",")
if s.strip()
]
CHECK_INTERVAL = int(os.environ.get("CHECK_INTERVAL", "60"))
ALERT_THRESHOLD = int(os.environ.get("ALERT_THRESHOLD", "3")) # consecutive checks
MAX_BUDGET_DIAGNOSE = os.environ.get("MAX_BUDGET_DIAGNOSE", "0.50")
MAX_BUDGET_FIX = os.environ.get("MAX_BUDGET_FIX", "2.00")
CLAUDE_CONTAINER = os.environ.get("CLAUDE_CONTAINER", "claude-dev")
CLAUDE_WORKDIR = os.environ.get("CLAUDE_WORKDIR", "/opt/apps")
DATA_DIR = Path("/data")
INCIDENTS_FILE = DATA_DIR / "incidents.json"
AUDIT_LOG = DATA_DIR / "audit.json"
STATE_FILE = DATA_DIR / "state.json"
# Runtime state
_down_counts: dict[int, int] = {}
_processed_replies: set[str] = set()
# ─── Persistence ───────────────────────────────────────────────────────
def load_json(path: Path) -> dict:
if path.exists():
try:
return json.loads(path.read_text())
except json.JSONDecodeError:
return {}
return {}
def save_json(path: Path, data) -> None:
path.write_text(json.dumps(data, indent=2))
def audit(entry: dict) -> None:
entries = []
if AUDIT_LOG.exists():
try:
entries = json.loads(AUDIT_LOG.read_text())
except json.JSONDecodeError:
entries = []
entry["timestamp"] = datetime.now(timezone.utc).isoformat()
entries.append(entry)
if len(entries) > 500:
entries = entries[-500:]
save_json(AUDIT_LOG, entries)
# ─── Uptime Kuma API ──────────────────────────────────────────────────
def get_monitor_statuses() -> dict:
"""Connect to Kuma, return {monitor_id: info_dict}."""
api = UptimeKumaApi(KUMA_URL)
try:
api.login(KUMA_USERNAME, KUMA_PASSWORD)
monitors = api.get_monitors()
heartbeats = api.get_heartbeats()
results = {}
for monitor in monitors:
mid = monitor["id"]
beats = heartbeats.get(mid, [])
latest = beats[-1] if beats else None
results[mid] = {
"id": mid,
"name": monitor.get("name", f"Monitor {mid}"),
"url": monitor.get("url", ""),
"type": monitor.get("type", ""),
"active": monitor.get("active", True),
"status": latest["status"] if latest else None,
"status_msg": latest.get("msg", "") if latest else "",
"last_check": latest.get("time", "") if latest else "",
}
return results
except Exception as e:
log.error("Kuma API error: %s", e)
return {}
finally:
try:
api.disconnect()
except Exception:
pass
# ─── Claude CLI ────────────────────────────────────────────────────────
DIAGNOSE_PROMPT_TMPL = """SERVICE ALERT: "{name}" is DOWN
URL: {url}
Monitor type: {type}
Error: {status_msg}
Last check: {last_check}
Diagnose this outage. Check Docker container status, logs, and system resources.
Identify the root cause and propose a specific fix."""
DIAGNOSE_SYSTEM = """You are a server ops agent diagnosing a service outage on a Netcup RS 8000 running 40+ Docker services behind Traefik.
RULES:
1. Run diagnostic commands: docker ps, docker logs, docker inspect, docker stats, df, free, curl
2. Do NOT run any destructive or modifying commands (no restart, stop, rm, compose, edit)
3. ONLY diagnose do not fix anything
End your response with EXACTLY this format (each on its own line):
CONTAINER: <container_name or UNKNOWN>
DIAGNOSIS: <one-line summary of the problem>
PROPOSED_FIX: <exact commands to run, separated by semicolons>
RISK: <LOW|MEDIUM|HIGH>"""
FIX_SYSTEM = """You are a server ops agent executing an APPROVED fix on a Netcup RS 8000.
The server owner has explicitly approved this fix via email.
Execute the fix, verify the service recovers, and report results.
End your response with:
RESULT: <SUCCESS|PARTIAL|FAILED>
SUMMARY: <what happened>"""
def run_claude(prompt: str, system: str, budget: str, timeout: int = 300) -> str:
cmd = [
"docker", "exec", "-w", CLAUDE_WORKDIR, CLAUDE_CONTAINER,
"claude", "-p", prompt,
"--output-format", "text",
"--max-budget-usd", budget,
"--permission-mode", "auto-accept",
"--append-system-prompt", system,
]
log.info("Running Claude CLI (budget=$%s)", budget)
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout)
if result.returncode != 0:
log.error("Claude error (rc=%d): %s", result.returncode, result.stderr[:500])
return f"[Claude error: exit code {result.returncode}]\n{result.stderr[:500]}"
return result.stdout.strip() or "[Empty response]"
except subprocess.TimeoutExpired:
return f"[Claude timed out after {timeout}s]"
except Exception as e:
return f"[Error: {e}]"
def parse_diagnosis(response: str) -> dict:
result = {
"container": "UNKNOWN",
"diagnosis": "",
"proposed_fix": "",
"risk": "UNKNOWN",
"full_response": response,
}
for line in response.split("\n"):
line = line.strip()
if line.startswith("CONTAINER:"):
result["container"] = line.split(":", 1)[1].strip()
elif line.startswith("DIAGNOSIS:"):
result["diagnosis"] = line.split(":", 1)[1].strip()
elif line.startswith("PROPOSED_FIX:"):
result["proposed_fix"] = line.split(":", 1)[1].strip()
elif line.startswith("RISK:"):
result["risk"] = line.split(":", 1)[1].strip()
return result
# ─── Email ─────────────────────────────────────────────────────────────
def send_email(to: str, subject: str, body: str, message_id: str | None = None,
in_reply_to: str | None = None, references: str | None = None) -> str | None:
msg = MIMEMultipart("alternative")
msg["From"] = f"Kuma Alert Agent <{SMTP_FROM}>"
msg["To"] = to
msg["Subject"] = subject
if message_id:
msg["Message-ID"] = message_id
if in_reply_to:
msg["In-Reply-To"] = in_reply_to
if references:
msg["References"] = references
msg.attach(MIMEText(body, "plain"))
try:
server = smtplib.SMTP(SMTP_HOST, SMTP_PORT)
server.starttls()
server.login(SMTP_USER, SMTP_PASS)
server.sendmail(SMTP_USER, [to], msg.as_string())
server.quit()
log.info("Email sent to %s: %s", to, subject)
return message_id
except Exception as e:
log.error("SMTP error: %s", e)
return None
def extract_plain_text(msg: email.message.Message) -> str:
if msg.is_multipart():
for part in msg.walk():
if part.get_content_type() == "text/plain":
payload = part.get_payload(decode=True)
if payload:
return payload.decode(part.get_content_charset() or "utf-8", errors="replace")
else:
payload = msg.get_payload(decode=True)
if payload:
return payload.decode(msg.get_content_charset() or "utf-8", errors="replace")
return ""
def send_alert_email(monitor_info: dict, diagnosis: dict, incident_id: str) -> str | None:
subject = f"[KUMA] {monitor_info['name']} is DOWN (ID: {incident_id})"
message_id = f"<kuma-{incident_id}@jeffemmett.com>"
body = f"""SERVICE DOWN: {monitor_info['name']}
URL: {monitor_info.get('url', 'N/A')}
Down since: {monitor_info.get('last_check', 'N/A')}
Error: {monitor_info.get('status_msg', 'N/A')}
{'=' * 55}
DIAGNOSIS
{'=' * 55}
Container: {diagnosis.get('container', 'UNKNOWN')}
Issue: {diagnosis.get('diagnosis', 'See analysis below')}
Risk: {diagnosis.get('risk', 'UNKNOWN')}
{'=' * 55}
PROPOSED FIX
{'=' * 55}
{diagnosis.get('proposed_fix', 'No fix proposed')}
{'=' * 55}
FULL ANALYSIS
{'=' * 55}
{diagnosis.get('full_response', 'N/A')[:3000]}
{'=' * 55}
Reply APPROVE to execute the proposed fix.
Reply REJECT to dismiss this alert.
Reply with custom instructions to override the fix.
--
Kuma Alert Agent (claude@jeffemmett.com)
"""
return send_email(ALERT_TO, subject, body, message_id=message_id)
def send_recovery_email(monitor_name: str, incident_id: str, was_fixed: bool) -> None:
how = "after automated fix" if was_fixed else "on its own (no fix needed)"
subject = f"[KUMA] {monitor_name} is BACK UP (ID: {incident_id})"
body = f"""SERVICE RECOVERED: {monitor_name}
Recovered {how}.
Time: {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M UTC')}
--
Kuma Alert Agent (claude@jeffemmett.com)
"""
send_email(ALERT_TO, subject, body)
def send_fix_result(incident: dict, result: str) -> None:
iid = incident["id"]
alert_msg = f"<kuma-{iid}@jeffemmett.com>"
subject = f"Re: [KUMA] {incident['monitor_name']} is DOWN (ID: {iid})"
body = f"""FIX EXECUTED: {incident['monitor_name']}
{'=' * 55}
RESULT
{'=' * 55}
{result[:5000]}
{'=' * 55}
Monitoring for recovery. If the issue persists, a new alert will follow.
--
Kuma Alert Agent (claude@jeffemmett.com)
"""
send_email(ALERT_TO, subject, body, in_reply_to=alert_msg, references=alert_msg)
# ─── Approval Detection ───────────────────────────────────────────────
def check_for_approvals(incidents: dict) -> list[dict]:
"""Search IMAP for replies to pending alert emails."""
pending = {k: v for k, v in incidents.items() if v.get("status") == "alerted"}
if not pending:
return []
approvals = []
try:
mail = imaplib.IMAP4_SSL(IMAP_HOST, IMAP_PORT)
mail.login(IMAP_USER, IMAP_PASS)
mail.select("INBOX")
since = (datetime.now() - timedelta(days=7)).strftime("%d-%b-%Y")
for approver in ALLOWED_APPROVERS:
status, data = mail.search(None, f'(FROM "{approver}" SUBJECT "[KUMA]" SINCE {since})')
if status != "OK" or not data[0]:
continue
for msg_num in data[0].split():
status, msg_data = mail.fetch(msg_num, "(BODY.PEEK[])")
if status != "OK":
continue
msg = email.message_from_bytes(msg_data[0][1])
reply_id = msg.get("Message-ID", "")
if reply_id in _processed_replies:
continue
in_reply_to = msg.get("In-Reply-To", "")
refs = msg.get("References", "")
for mid, inc in pending.items():
expected = f"<kuma-{inc['id']}@jeffemmett.com>"
if expected in in_reply_to or expected in refs:
body = extract_plain_text(msg)
approvals.append({
"incident_id": inc["id"],
"monitor_id": mid,
"body": body,
"sender": approver,
"reply_id": reply_id,
})
_processed_replies.add(reply_id)
break
mail.logout()
except Exception as e:
log.error("IMAP approval check error: %s", e)
return approvals
def parse_approval(body: str) -> tuple[str, str]:
"""Returns (action, custom_instructions). action: approve|reject|custom"""
lines = [l for l in body.split("\n")
if not l.strip().startswith(">") and not l.strip().startswith("On ")]
clean = "\n".join(lines).strip()
first_line = ""
for line in clean.split("\n"):
line = line.strip()
if line and not line.startswith("--"):
first_line = line
break
fl = first_line.lower()
if any(kw in fl for kw in ["approve", "yes", "go ahead", "fix it", "do it", "proceed"]):
return "approve", ""
if any(kw in fl for kw in ["reject", "no", "dismiss", "ignore", "skip"]):
return "reject", ""
return "custom", clean
# ─── Incident Lifecycle ───────────────────────────────────────────────
def create_incident(monitor_info: dict, incidents: dict) -> None:
iid = str(uuid.uuid4())[:8]
mid = str(monitor_info["id"])
log.info("NEW INCIDENT %s: '%s' is DOWN", iid, monitor_info["name"])
# Diagnose
prompt = DIAGNOSE_PROMPT_TMPL.format(**monitor_info)
raw = run_claude(prompt, DIAGNOSE_SYSTEM, MAX_BUDGET_DIAGNOSE, timeout=180)
diag = parse_diagnosis(raw)
log.info("Diagnosis: %s | Fix: %s | Risk: %s",
diag["diagnosis"][:80], diag["proposed_fix"][:80], diag["risk"])
# Alert
msg_id = send_alert_email(monitor_info, diag, iid)
incidents[mid] = {
"id": iid,
"monitor_id": mid,
"monitor_name": monitor_info["name"],
"monitor_url": monitor_info.get("url", ""),
"status": "alerted",
"container": diag.get("container", "UNKNOWN"),
"diagnosis": diag.get("diagnosis", ""),
"proposed_fix": diag.get("proposed_fix", ""),
"risk": diag.get("risk", "UNKNOWN"),
"full_diagnosis": diag.get("full_response", "")[:5000],
"alert_msg_id": msg_id,
"created": datetime.now(timezone.utc).isoformat(),
"updated": datetime.now(timezone.utc).isoformat(),
}
save_json(INCIDENTS_FILE, incidents)
audit({"action": "incident_created", "id": iid, "monitor": monitor_info["name"]})
def handle_approval(approval: dict, incidents: dict) -> None:
mid = approval["monitor_id"]
incident = incidents.get(mid)
if not incident:
return
action, custom = parse_approval(approval["body"])
log.info("APPROVAL for %s: action=%s", incident["id"], action)
if action == "approve":
incident["status"] = "executing"
incident["updated"] = datetime.now(timezone.utc).isoformat()
save_json(INCIDENTS_FILE, incidents)
result = run_claude(
f'Execute this APPROVED fix for "{incident["monitor_name"]}":\n\n'
f'{incident["proposed_fix"]}\n\n'
f'Container: {incident.get("container", "unknown")}\n'
f'Original diagnosis: {incident["diagnosis"]}',
FIX_SYSTEM, MAX_BUDGET_FIX, timeout=300,
)
send_fix_result(incident, result)
incident["status"] = "fix_executed"
incident["fix_result"] = result[:5000]
elif action == "reject":
incident["status"] = "rejected"
log.info("Incident %s rejected", incident["id"])
elif action == "custom":
incident["status"] = "executing"
incident["proposed_fix"] = custom
incident["updated"] = datetime.now(timezone.utc).isoformat()
save_json(INCIDENTS_FILE, incidents)
result = run_claude(
f'Execute these CUSTOM instructions for "{incident["monitor_name"]}":\n\n'
f'{custom}\n\n'
f'Original diagnosis: {incident["diagnosis"]}',
FIX_SYSTEM, MAX_BUDGET_FIX, timeout=300,
)
send_fix_result(incident, result)
incident["status"] = "fix_executed"
incident["fix_result"] = result[:5000]
incident["updated"] = datetime.now(timezone.utc).isoformat()
save_json(INCIDENTS_FILE, incidents)
audit({"action": f"incident_{action}", "id": incident["id"],
"monitor": incident["monitor_name"], "by": approval["sender"]})
def handle_recovery(mid: str, monitor_info: dict, incidents: dict) -> None:
incident = incidents.get(mid)
if not incident:
return
was_fixed = incident.get("status") in ("fix_executed", "executing")
log.info("RECOVERED: '%s' (incident %s)", monitor_info["name"], incident["id"])
send_recovery_email(monitor_info["name"], incident["id"], was_fixed)
audit({"action": "resolved", "id": incident["id"],
"monitor": monitor_info["name"], "was_fixed": was_fixed})
del incidents[mid]
save_json(INCIDENTS_FILE, incidents)
_down_counts.pop(int(mid), None)
# ─── Main Loop ─────────────────────────────────────────────────────────
def main() -> None:
log.info("Kuma Alert Agent starting")
log.info("Kuma: %s | Alerts to: %s | Approvers: %s",
KUMA_URL, ALERT_TO, ALLOWED_APPROVERS)
log.info("Check interval: %ds | Alert after: %d consecutive downs",
CHECK_INTERVAL, ALERT_THRESHOLD)
incidents = load_json(INCIDENTS_FILE)
state = load_json(STATE_FILE)
_processed_replies.update(state.get("processed_replies", []))
cycle = 0
while True:
try:
# ── Poll monitors ──
monitors = get_monitor_statuses()
if monitors:
for mid_int, info in monitors.items():
if not info.get("active", True):
continue
mid = str(mid_int)
status = info.get("status")
if status == 0: # DOWN
_down_counts[mid_int] = _down_counts.get(mid_int, 0) + 1
if mid not in incidents and _down_counts[mid_int] >= ALERT_THRESHOLD:
create_incident(info, incidents)
elif status == 1: # UP
_down_counts.pop(mid_int, None)
if mid in incidents:
handle_recovery(mid, info, incidents)
else:
log.warning("No monitor data from Kuma (API unreachable?)")
# ── Check approvals ──
approvals = check_for_approvals(incidents)
for approval in approvals:
handle_approval(approval, incidents)
# ── Maintenance (every ~60 cycles) ──
cycle += 1
if cycle % 60 == 0:
now = datetime.now(timezone.utc)
stale = [mid for mid, inc in incidents.items()
if (now - datetime.fromisoformat(inc["updated"])).total_seconds() > 86400
and inc["status"] in ("rejected", "fix_executed")]
for mid in stale:
log.info("Cleaning stale incident %s", incidents[mid]["id"])
del incidents[mid]
if stale:
save_json(INCIDENTS_FILE, incidents)
if len(_processed_replies) > 1000:
_processed_replies.clear()
save_json(STATE_FILE, {"processed_replies": list(_processed_replies)[-500:]})
except Exception as e:
log.error("Loop error: %s", e, exc_info=True)
time.sleep(CHECK_INTERVAL)
if __name__ == "__main__":
main()

30
docker-compose.yml Normal file
View File

@ -0,0 +1,30 @@
services:
kuma-alert-agent:
build: .
container_name: kuma-alert-agent
restart: always
env_file: .env
security_opt:
- no-new-privileges:true
cap_drop:
- ALL
volumes:
# Docker socket for exec into claude-dev
- /var/run/docker.sock:/var/run/docker.sock:ro
# Persistent data (incidents, audit, state)
- agent-data:/data
networks:
# Reach uptime-kuma container on port 3001
- traefik-public
# Reach mail server (mail.rmail.online)
- mailcow-network
volumes:
agent-data:
networks:
traefik-public:
external: true
mailcow-network:
external: true
name: mailcowdockerized_mailcow-network

1
requirements.txt Normal file
View File

@ -0,0 +1 @@
uptime-kuma-api>=1.2.0