/** * Schedule module — persistent cron-based job scheduling. * * Replaces system-level crontabs with an in-process scheduler. * Jobs are stored in Automerge (survives restarts), evaluated on * a 60-second tick loop, and can execute emails, webhooks, * calendar events, broadcasts, or backlog briefings. * * All persistence uses Automerge documents via SyncServer. */ import { Hono } from "hono"; import * as Automerge from "@automerge/automerge"; import { createTransport, type Transporter } from "nodemailer"; import { CronExpressionParser } from "cron-parser"; import { renderShell } from "../../server/shell"; import { getModuleInfoList } from "../../shared/module"; import type { RSpaceModule } from "../../shared/module"; import { renderLanding } from "./landing"; import type { SyncServer } from "../../server/local-first/sync-server"; import { scheduleSchema, scheduleDocId, MAX_LOG_ENTRIES, MAX_REMINDERS, } from "./schemas"; import type { ScheduleDoc, ScheduleJob, ExecutionLogEntry, ActionType, Reminder, } from "./schemas"; import { calendarDocId } from "../rcal/schemas"; import type { CalendarDoc, ScheduledItemMetadata } from "../rcal/schemas"; let _syncServer: SyncServer | null = null; const routes = new Hono(); // ── SMTP transport (lazy init) ── let _smtpTransport: Transporter | null = null; function getSmtpTransport(): Transporter | null { if (_smtpTransport) return _smtpTransport; if (!process.env.SMTP_PASS) return null; _smtpTransport = createTransport({ host: process.env.SMTP_HOST || "mail.rmail.online", port: Number(process.env.SMTP_PORT) || 587, secure: Number(process.env.SMTP_PORT) === 465, auth: { user: process.env.SMTP_USER || "noreply@rmail.online", pass: process.env.SMTP_PASS, }, tls: { rejectUnauthorized: false }, }); return _smtpTransport; } // ── Local-first helpers ── function ensureDoc(space: string): ScheduleDoc { const docId = scheduleDocId(space); let doc = _syncServer!.getDoc(docId); if (!doc) { doc = Automerge.change( Automerge.init(), "init schedule", (d) => { const init = scheduleSchema.init(); d.meta = init.meta; d.meta.spaceSlug = space; d.jobs = {}; d.reminders = {}; d.log = []; }, ); _syncServer!.setDoc(docId, doc); } return doc; } // ── Cron helpers ── function computeNextRun(cronExpression: string, timezone: string): number | null { try { const interval = CronExpressionParser.parse(cronExpression, { currentDate: new Date(), tz: timezone, }); return interval.next().toDate().getTime(); } catch { return null; } } function cronToHuman(expr: string): string { const parts = expr.split(/\s+/); if (parts.length !== 5) return expr; const [min, hour, dom, mon, dow] = parts; const dowNames: Record = { "0": "Sun", "1": "Mon", "2": "Tue", "3": "Wed", "4": "Thu", "5": "Fri", "6": "Sat", "7": "Sun", "1-5": "weekdays", "0,6": "weekends", }; if (min === "0" && hour !== "*" && dom === "*" && mon === "*" && dow === "*") return `Daily at ${hour}:00`; if (min === "0" && hour !== "*" && dom === "*" && mon === "*" && dow === "1-5") return `Weekdays at ${hour}:00`; if (min === "0" && hour !== "*" && dom === "*" && mon === "*" && dow !== "*") return `${dowNames[dow] || dow} at ${hour}:00`; if (min === "0" && hour !== "*" && dom !== "*" && mon === "*" && dow === "*") return `Monthly on day ${dom} at ${hour}:00`; if (min === "*" && hour === "*" && dom === "*" && mon === "*" && dow === "*") return "Every minute"; if (min.startsWith("*/")) return `Every ${min.slice(2)} minutes`; return expr; } // ── Template helpers ── function renderTemplate(template: string, vars: Record): string { let result = template; for (const [key, value] of Object.entries(vars)) { result = result.replaceAll(`{{${key}}}`, value); } return result; } // ── Action executors ── async function executeEmail( job: ScheduleJob, ): Promise<{ success: boolean; message: string }> { const transport = getSmtpTransport(); if (!transport) return { success: false, message: "SMTP not configured (SMTP_PASS missing)" }; const config = job.actionConfig as { to?: string; subject?: string; bodyTemplate?: string; }; if (!config.to) return { success: false, message: "No recipient (to) configured" }; const vars = { date: new Date().toLocaleDateString("en-US", { weekday: "long", year: "numeric", month: "long", day: "numeric" }), jobName: job.name, timestamp: new Date().toISOString(), }; const subject = renderTemplate(config.subject || `[rSchedule] ${job.name}`, vars); const html = renderTemplate(config.bodyTemplate || `

Scheduled job ${job.name} executed at ${vars.date}.

`, vars); await transport.sendMail({ from: process.env.SMTP_FROM || "rSchedule ", to: config.to, subject, html, }); return { success: true, message: `Email sent to ${config.to}` }; } async function executeWebhook( job: ScheduleJob, ): Promise<{ success: boolean; message: string }> { const config = job.actionConfig as { url?: string; method?: string; headers?: Record; bodyTemplate?: string; }; if (!config.url) return { success: false, message: "No webhook URL configured" }; const vars = { date: new Date().toISOString(), jobName: job.name, timestamp: new Date().toISOString(), }; const method = (config.method || "POST").toUpperCase(); const headers: Record = { "Content-Type": "application/json", ...config.headers, }; const body = method !== "GET" ? renderTemplate(config.bodyTemplate || JSON.stringify({ job: job.name, timestamp: vars.date }), vars) : undefined; const res = await fetch(config.url, { method, headers, body }); if (!res.ok) return { success: false, message: `Webhook ${res.status}: ${await res.text().catch(() => "")}` }; return { success: true, message: `Webhook ${method} ${config.url} → ${res.status}` }; } async function executeCalendarEvent( job: ScheduleJob, space: string, ): Promise<{ success: boolean; message: string }> { if (!_syncServer) return { success: false, message: "SyncServer not available" }; const config = job.actionConfig as { title?: string; duration?: number; sourceId?: string; }; const calDocId = calendarDocId(space); const calDoc = _syncServer.getDoc(calDocId); if (!calDoc) return { success: false, message: `Calendar doc not found for space ${space}` }; const eventId = crypto.randomUUID(); const now = Date.now(); const durationMs = (config.duration || 60) * 60 * 1000; _syncServer.changeDoc(calDocId, `rSchedule: create event for ${job.name}`, (d) => { d.events[eventId] = { id: eventId, title: config.title || job.name, description: `Auto-created by rSchedule job: ${job.name}`, startTime: now, endTime: now + durationMs, allDay: false, timezone: job.timezone || "UTC", rrule: null, status: null, visibility: null, sourceId: config.sourceId || null, sourceName: null, sourceType: null, sourceColor: null, locationId: null, locationName: null, coordinates: null, locationGranularity: null, locationLat: null, locationLng: null, isVirtual: false, virtualUrl: null, virtualPlatform: null, rToolSource: "rSchedule", rToolEntityId: job.id, attendees: [], attendeeCount: 0, metadata: null, createdAt: now, updatedAt: now, }; }); return { success: true, message: `Calendar event '${config.title || job.name}' created (${eventId})` }; } async function executeBroadcast( job: ScheduleJob, ): Promise<{ success: boolean; message: string }> { const config = job.actionConfig as { channel?: string; message?: string; }; // Broadcast via SyncServer's WebSocket connections is not directly accessible // from module code. For now, log the intent. Future: expose ws broadcast on SyncServer. const msg = config.message || `Scheduled broadcast from ${job.name}`; console.log(`[Schedule] Broadcast (${config.channel || "default"}): ${msg}`); return { success: true, message: `Broadcast sent: ${msg}` }; } async function executeBacklogBriefing( job: ScheduleJob, ): Promise<{ success: boolean; message: string }> { const config = job.actionConfig as { mode?: "morning" | "weekly" | "monthly"; scanPaths?: string[]; to?: string; }; const transport = getSmtpTransport(); if (!transport) return { success: false, message: "SMTP not configured (SMTP_PASS missing)" }; if (!config.to) return { success: false, message: "No recipient (to) configured" }; const mode = config.mode || "morning"; const scanPaths = config.scanPaths || ["/data/communities/*/backlog/tasks/"]; const now = new Date(); const dateStr = now.toLocaleDateString("en-US", { weekday: "long", year: "numeric", month: "long", day: "numeric" }); // Scan for backlog task files const { readdir, readFile, stat } = await import("node:fs/promises"); const { join, basename } = await import("node:path"); const { Glob } = await import("bun"); interface TaskInfo { file: string; title: string; priority: string; status: string; updatedAt: Date | null; staleDays: number; } const tasks: TaskInfo[] = []; for (const pattern of scanPaths) { try { const glob = new Glob(pattern.endsWith("/") ? pattern + "*.md" : pattern); for await (const filePath of glob.scan()) { try { const content = await readFile(filePath, "utf-8"); const fstat = await stat(filePath); // Parse YAML frontmatter const fmMatch = content.match(/^---\n([\s\S]*?)\n---/); let title = basename(filePath, ".md").replace(/-/g, " "); let priority = "medium"; let status = "open"; if (fmMatch) { const fm = fmMatch[1]; const titleMatch = fm.match(/^title:\s*(.+)$/m); const prioMatch = fm.match(/^priority:\s*(.+)$/m); const statusMatch = fm.match(/^status:\s*(.+)$/m); if (titleMatch) title = titleMatch[1].replace(/^["']|["']$/g, ""); if (prioMatch) priority = prioMatch[1].trim().toLowerCase(); if (statusMatch) status = statusMatch[1].trim().toLowerCase(); } const staleDays = Math.floor( (now.getTime() - fstat.mtime.getTime()) / (1000 * 60 * 60 * 24), ); tasks.push({ file: filePath, title, priority, status, updatedAt: fstat.mtime, staleDays }); } catch { // Skip unreadable files } } } catch { // Glob pattern didn't match or dir doesn't exist } } // Filter and sort based on mode let filtered = tasks.filter((t) => t.status !== "done" && t.status !== "closed"); let subject: string; let heading: string; switch (mode) { case "morning": // High/urgent priority + recently updated filtered = filtered .filter((t) => t.priority === "high" || t.priority === "urgent" || t.staleDays < 3) .sort((a, b) => { const priOrder: Record = { urgent: 0, high: 1, medium: 2, low: 3 }; return (priOrder[a.priority] ?? 2) - (priOrder[b.priority] ?? 2); }); subject = `Morning Briefing — ${dateStr}`; heading = "Good morning! Here's your task briefing:"; break; case "weekly": // All open tasks sorted by priority then staleness filtered.sort((a, b) => { const priOrder: Record = { urgent: 0, high: 1, medium: 2, low: 3 }; const pDiff = (priOrder[a.priority] ?? 2) - (priOrder[b.priority] ?? 2); return pDiff !== 0 ? pDiff : b.staleDays - a.staleDays; }); subject = `Weekly Backlog Review — ${dateStr}`; heading = "Weekly review of all open tasks:"; break; case "monthly": // Focus on stale items (> 14 days untouched) filtered = filtered .filter((t) => t.staleDays > 14) .sort((a, b) => b.staleDays - a.staleDays); subject = `Monthly Backlog Audit — ${dateStr}`; heading = "Monthly audit — these tasks haven't been touched in 14+ days:"; break; } // Build HTML email const taskRows = filtered.length > 0 ? filtered .slice(0, 50) .map((t) => { const prioColor: Record = { urgent: "#ef4444", high: "#f97316", medium: "#f59e0b", low: "#6b7280", }; return ` ${t.priority} ${t.title} ${t.status} ${t.staleDays}d ago `; }) .join("\n") : `No tasks match this filter.`; const html = `

${heading}

${dateStr} • ${filtered.length} task${filtered.length !== 1 ? "s" : ""}

${taskRows}
Priority Task Status Last Update

Sent by rSchedule • Manage Schedules

`; await transport.sendMail({ from: process.env.SMTP_FROM || "rSchedule ", to: config.to, subject: `[rSchedule] ${subject}`, html, }); return { success: true, message: `${mode} briefing sent to ${config.to} (${filtered.length} tasks)` }; } async function executeCalendarReminder( job: ScheduleJob, space: string, ): Promise<{ success: boolean; message: string }> { if (!_syncServer) return { success: false, message: "SyncServer not available" }; const transport = getSmtpTransport(); if (!transport) return { success: false, message: "SMTP not configured (SMTP_PASS missing)" }; const config = job.actionConfig as { to?: string }; if (!config.to) return { success: false, message: "No recipient (to) configured" }; // Load the calendar doc for this space const calDocId = calendarDocId(space); const calDoc = _syncServer.getDoc(calDocId); if (!calDoc) return { success: false, message: `Calendar doc not found for space ${space}` }; // Find scheduled items due today that haven't been reminded yet const now = new Date(); const todayStart = new Date(now.getFullYear(), now.getMonth(), now.getDate()).getTime(); const todayEnd = todayStart + 86400000; const dueItems = Object.values(calDoc.events).filter((ev) => { const meta = ev.metadata as ScheduledItemMetadata | null; return meta?.isScheduledItem === true && !meta.reminderSent && ev.startTime >= todayStart && ev.startTime < todayEnd; }); if (dueItems.length === 0) return { success: true, message: "No scheduled items due today" }; // Render email with all due items const dateStr = now.toLocaleDateString("en-US", { weekday: "long", year: "numeric", month: "long", day: "numeric" }); const itemRows = dueItems.map((ev) => { const meta = ev.metadata as ScheduledItemMetadata; const preview = meta.itemPreview; const prov = meta.provenance; const thumbHtml = preview.thumbnailUrl ? `thumbnail` : ""; const canvasLink = preview.canvasUrl ? `Open in Canvas` : ""; return `
${ev.title}
${preview.textPreview}
Source: ${prov.sourceType} in ${prov.sourceSpace} ${prov.rid ? ` • RID: ${prov.rid}` : ""}
${thumbHtml}
${canvasLink}
`; }).join("\n"); const html = `

Scheduled Knowledge Reminders

${dateStr} • ${dueItems.length} item${dueItems.length !== 1 ? "s" : ""}

${itemRows}

Sent by rSchedule • View Calendar

`; await transport.sendMail({ from: process.env.SMTP_FROM || "rSchedule ", to: config.to, subject: `[rSpace] ${dueItems.length} scheduled item${dueItems.length !== 1 ? "s" : ""} for ${dateStr}`, html, }); // Mark all sent items as reminded _syncServer.changeDoc(calDocId, `mark ${dueItems.length} reminders sent`, (d) => { for (const item of dueItems) { const ev = d.events[item.id]; if (!ev) continue; const meta = ev.metadata as ScheduledItemMetadata; meta.reminderSent = true; meta.reminderSentAt = Date.now(); ev.updatedAt = Date.now(); } }); return { success: true, message: `Calendar reminder sent to ${config.to} (${dueItems.length} items)` }; } // ── Unified executor ── async function executeJob( job: ScheduleJob, space: string, ): Promise<{ success: boolean; message: string }> { switch (job.actionType) { case "email": return executeEmail(job); case "webhook": return executeWebhook(job); case "calendar-event": return executeCalendarEvent(job, space); case "broadcast": return executeBroadcast(job); case "backlog-briefing": return executeBacklogBriefing(job); case "calendar-reminder": return executeCalendarReminder(job, space); default: return { success: false, message: `Unknown action type: ${job.actionType}` }; } } // ── Tick loop ── const TICK_INTERVAL = 60_000; function startTickLoop() { console.log("[Schedule] Tick loop started — checking every 60s"); const tick = async () => { if (!_syncServer) return; const now = Date.now(); // Iterate all known schedule docs // Convention: check the "demo" space and any spaces that have schedule docs const spaceSlugs = new Set(); spaceSlugs.add("demo"); // Also scan for any schedule docs already loaded const allDocs = _syncServer.listDocs(); for (const docId of allDocs) { const match = docId.match(/^(.+):schedule:jobs$/); if (match) spaceSlugs.add(match[1]); } for (const space of spaceSlugs) { try { const docId = scheduleDocId(space); const doc = _syncServer.getDoc(docId); if (!doc) continue; const dueJobs = Object.values(doc.jobs).filter( (j) => j.enabled && j.nextRunAt && j.nextRunAt <= now, ); for (const job of dueJobs) { const startMs = Date.now(); let result: { success: boolean; message: string }; try { result = await executeJob(job, space); } catch (e: any) { result = { success: false, message: e.message || String(e) }; } const durationMs = Date.now() - startMs; const logEntry: ExecutionLogEntry = { id: crypto.randomUUID(), jobId: job.id, status: result.success ? "success" : "error", message: result.message, durationMs, timestamp: Date.now(), }; console.log( `[Schedule] ${result.success ? "OK" : "ERR"} ${job.name} (${durationMs}ms): ${result.message}`, ); // Update job state + append log _syncServer.changeDoc(docId, `run job ${job.id}`, (d) => { const j = d.jobs[job.id]; if (!j) return; j.lastRunAt = Date.now(); j.lastRunStatus = result.success ? "success" : "error"; j.lastRunMessage = result.message; j.runCount = (j.runCount || 0) + 1; j.nextRunAt = computeNextRun(j.cronExpression, j.timezone) ?? null; // Append log entry, trim to max d.log.push(logEntry); while (d.log.length > MAX_LOG_ENTRIES) { d.log.splice(0, 1); } }); } // ── Process due reminders ── const dueReminders = Object.values(doc.reminders || {}).filter( (r) => !r.notified && !r.completed && r.remindAt <= now && r.notifyEmail, ); for (const reminder of dueReminders) { try { const result = await executeReminderEmail(reminder, space); console.log( `[Schedule] Reminder ${result.success ? "OK" : "ERR"} "${reminder.title}": ${result.message}`, ); _syncServer.changeDoc(docId, `notify reminder ${reminder.id}`, (d) => { const r = d.reminders[reminder.id]; if (!r) return; r.notified = true; r.updatedAt = Date.now(); // Handle recurring reminders if (r.cronExpression) { const nextRun = computeNextRun(r.cronExpression, r.timezone); if (nextRun) { r.remindAt = nextRun; r.notified = false; } } }); } catch (e) { console.error(`[Schedule] Reminder email error for "${reminder.title}":`, e); } } } catch (e) { console.error(`[Schedule] Tick error for space ${space}:`, e); } } }; setTimeout(tick, 10_000); // First tick after 10s setInterval(tick, TICK_INTERVAL); } // ── Seed default jobs ── const SEED_JOBS: Omit[] = [ { id: "backlog-morning", name: "Morning Backlog Briefing", description: "Weekday morning digest of high-priority and recently-updated tasks.", enabled: true, cronExpression: "0 14 * * 1-5", timezone: "America/Vancouver", actionType: "backlog-briefing", actionConfig: { mode: "morning", to: "jeff@jeffemmett.com" }, createdBy: "system", }, { id: "backlog-weekly", name: "Weekly Backlog Review", description: "Friday afternoon review of all open tasks sorted by priority and staleness.", enabled: true, cronExpression: "0 22 * * 5", timezone: "America/Vancouver", actionType: "backlog-briefing", actionConfig: { mode: "weekly", to: "jeff@jeffemmett.com" }, createdBy: "system", }, { id: "backlog-monthly", name: "Monthly Backlog Audit", description: "First of the month audit of stale tasks (14+ days untouched).", enabled: true, cronExpression: "0 14 1 * *", timezone: "America/Vancouver", actionType: "backlog-briefing", actionConfig: { mode: "monthly", to: "jeff@jeffemmett.com" }, createdBy: "system", }, { id: "calendar-reminder-daily", name: "Daily Calendar Reminders", description: "Sends email reminders for knowledge items scheduled on today's date.", enabled: true, cronExpression: "0 14 * * *", timezone: "America/Vancouver", actionType: "calendar-reminder", actionConfig: { to: "jeff@jeffemmett.com" }, createdBy: "system", }, ]; function seedDefaultJobs(space: string) { const docId = scheduleDocId(space); const doc = ensureDoc(space); if (Object.keys(doc.jobs).length > 0) return; const now = Date.now(); _syncServer!.changeDoc(docId, "seed default jobs", (d) => { for (const seed of SEED_JOBS) { d.jobs[seed.id] = { ...seed, lastRunAt: null, lastRunStatus: null, lastRunMessage: "", nextRunAt: computeNextRun(seed.cronExpression, seed.timezone), runCount: 0, createdAt: now, updatedAt: now, }; } }); console.log(`[Schedule] Seeded ${SEED_JOBS.length} default jobs for space "${space}"`); } // ── API routes ── // GET / — serve schedule UI routes.get("/", (c) => { const space = c.req.param("space") || "demo"; return c.html( renderShell({ title: `${space} — Schedule | rSpace`, moduleId: "rschedule", spaceSlug: space, modules: getModuleInfoList(), theme: "dark", body: ``, scripts: ``, styles: ``, }), ); }); // GET /api/jobs — list all jobs routes.get("/api/jobs", (c) => { const space = c.req.param("space") || "demo"; const doc = ensureDoc(space); const jobs = Object.values(doc.jobs).map((j) => ({ ...j, cronHuman: cronToHuman(j.cronExpression), })); jobs.sort((a, b) => a.name.localeCompare(b.name)); return c.json({ count: jobs.length, results: jobs }); }); // POST /api/jobs — create a new job routes.post("/api/jobs", async (c) => { const space = c.req.param("space") || "demo"; const body = await c.req.json(); const { name, description, cronExpression, timezone, actionType, actionConfig, enabled } = body; if (!name?.trim() || !cronExpression || !actionType) return c.json({ error: "name, cronExpression, and actionType required" }, 400); // Validate cron expression try { CronExpressionParser.parse(cronExpression); } catch { return c.json({ error: "Invalid cron expression" }, 400); } const docId = scheduleDocId(space); ensureDoc(space); const jobId = crypto.randomUUID(); const now = Date.now(); const tz = timezone || "UTC"; _syncServer!.changeDoc(docId, `create job ${jobId}`, (d) => { d.jobs[jobId] = { id: jobId, name: name.trim(), description: description || "", enabled: enabled !== false, cronExpression, timezone: tz, actionType, actionConfig: actionConfig || {}, lastRunAt: null, lastRunStatus: null, lastRunMessage: "", nextRunAt: computeNextRun(cronExpression, tz), runCount: 0, createdBy: "user", createdAt: now, updatedAt: now, }; }); const updated = _syncServer!.getDoc(docId)!; return c.json(updated.jobs[jobId], 201); }); // GET /api/jobs/:id routes.get("/api/jobs/:id", (c) => { const space = c.req.param("space") || "demo"; const id = c.req.param("id"); const doc = ensureDoc(space); const job = doc.jobs[id]; if (!job) return c.json({ error: "Job not found" }, 404); return c.json({ ...job, cronHuman: cronToHuman(job.cronExpression) }); }); // PUT /api/jobs/:id — update a job routes.put("/api/jobs/:id", async (c) => { const space = c.req.param("space") || "demo"; const id = c.req.param("id"); const body = await c.req.json(); const docId = scheduleDocId(space); const doc = ensureDoc(space); if (!doc.jobs[id]) return c.json({ error: "Job not found" }, 404); // Validate cron if provided if (body.cronExpression) { try { CronExpressionParser.parse(body.cronExpression); } catch { return c.json({ error: "Invalid cron expression" }, 400); } } _syncServer!.changeDoc(docId, `update job ${id}`, (d) => { const j = d.jobs[id]; if (!j) return; if (body.name !== undefined) j.name = body.name; if (body.description !== undefined) j.description = body.description; if (body.enabled !== undefined) j.enabled = body.enabled; if (body.cronExpression !== undefined) { j.cronExpression = body.cronExpression; j.nextRunAt = computeNextRun(body.cronExpression, body.timezone || j.timezone); } if (body.timezone !== undefined) { j.timezone = body.timezone; j.nextRunAt = computeNextRun(j.cronExpression, body.timezone); } if (body.actionType !== undefined) j.actionType = body.actionType; if (body.actionConfig !== undefined) j.actionConfig = body.actionConfig; j.updatedAt = Date.now(); }); const updated = _syncServer!.getDoc(docId)!; return c.json(updated.jobs[id]); }); // DELETE /api/jobs/:id routes.delete("/api/jobs/:id", (c) => { const space = c.req.param("space") || "demo"; const id = c.req.param("id"); const docId = scheduleDocId(space); const doc = ensureDoc(space); if (!doc.jobs[id]) return c.json({ error: "Job not found" }, 404); _syncServer!.changeDoc(docId, `delete job ${id}`, (d) => { delete d.jobs[id]; }); return c.json({ ok: true }); }); // POST /api/jobs/:id/run — manually trigger a job routes.post("/api/jobs/:id/run", async (c) => { const space = c.req.param("space") || "demo"; const id = c.req.param("id"); const docId = scheduleDocId(space); const doc = ensureDoc(space); const job = doc.jobs[id]; if (!job) return c.json({ error: "Job not found" }, 404); const startMs = Date.now(); let result: { success: boolean; message: string }; try { result = await executeJob(job, space); } catch (e: any) { result = { success: false, message: e.message || String(e) }; } const durationMs = Date.now() - startMs; const logEntry: ExecutionLogEntry = { id: crypto.randomUUID(), jobId: job.id, status: result.success ? "success" : "error", message: result.message, durationMs, timestamp: Date.now(), }; _syncServer!.changeDoc(docId, `manual run ${id}`, (d) => { const j = d.jobs[id]; if (j) { j.lastRunAt = Date.now(); j.lastRunStatus = result.success ? "success" : "error"; j.lastRunMessage = result.message; j.runCount = (j.runCount || 0) + 1; } d.log.push(logEntry); while (d.log.length > MAX_LOG_ENTRIES) { d.log.splice(0, 1); } }); return c.json({ ...result, durationMs }); }); // GET /api/log — execution log routes.get("/api/log", (c) => { const space = c.req.param("space") || "demo"; const doc = ensureDoc(space); const log = [...doc.log].reverse(); // newest first return c.json({ count: log.length, results: log }); }); // GET /api/log/:jobId — execution log filtered by job routes.get("/api/log/:jobId", (c) => { const space = c.req.param("space") || "demo"; const jobId = c.req.param("jobId"); const doc = ensureDoc(space); const log = doc.log.filter((e) => e.jobId === jobId).reverse(); return c.json({ count: log.length, results: log }); }); // ── Reminder helpers ── function ensureRemindersCalendarSource(space: string): string { const calDocId = calendarDocId(space); const calDoc = _syncServer!.getDoc(calDocId); if (!calDoc) return ""; // Check if "Reminders" source already exists const existing = Object.values(calDoc.sources).find( (s) => s.name === "Reminders" && s.sourceType === "rSchedule", ); if (existing) return existing.id; const sourceId = crypto.randomUUID(); const now = Date.now(); _syncServer!.changeDoc(calDocId, "create Reminders calendar source", (d) => { d.sources[sourceId] = { id: sourceId, name: "Reminders", sourceType: "rSchedule", url: null, color: "#f59e0b", isActive: true, isVisible: true, syncIntervalMinutes: null, lastSyncedAt: now, ownerId: null, createdAt: now, }; }); return sourceId; } function syncReminderToCalendar(reminder: Reminder, space: string): string | null { if (!_syncServer) return null; const calDocId = calendarDocId(space); const calDoc = _syncServer.getDoc(calDocId); if (!calDoc) return null; const sourceId = ensureRemindersCalendarSource(space); const eventId = crypto.randomUUID(); const now = Date.now(); const duration = reminder.allDay ? 86400000 : 3600000; _syncServer.changeDoc(calDocId, `sync reminder ${reminder.id} to calendar`, (d) => { d.events[eventId] = { id: eventId, title: reminder.title, description: reminder.description, startTime: reminder.remindAt, endTime: reminder.remindAt + duration, allDay: reminder.allDay, timezone: reminder.timezone || "UTC", rrule: null, status: null, visibility: null, sourceId, sourceName: "Reminders", sourceType: "rSchedule", sourceColor: reminder.sourceColor || "#f59e0b", locationId: null, locationName: null, coordinates: null, locationGranularity: null, locationLat: null, locationLng: null, isVirtual: false, virtualUrl: null, virtualPlatform: null, rToolSource: "rSchedule", rToolEntityId: reminder.id, attendees: [], attendeeCount: 0, metadata: null, createdAt: now, updatedAt: now, }; }); return eventId; } function deleteCalendarEvent(space: string, eventId: string) { if (!_syncServer) return; const calDocId = calendarDocId(space); const calDoc = _syncServer.getDoc(calDocId); if (!calDoc || !calDoc.events[eventId]) return; _syncServer.changeDoc(calDocId, `delete reminder calendar event ${eventId}`, (d) => { delete d.events[eventId]; }); } // ── Reminder email executor ── async function executeReminderEmail( reminder: Reminder, space: string, ): Promise<{ success: boolean; message: string }> { const transport = getSmtpTransport(); if (!transport) return { success: false, message: "SMTP not configured (SMTP_PASS missing)" }; if (!reminder.notifyEmail) return { success: false, message: "No email address on reminder" }; const dateStr = new Date(reminder.remindAt).toLocaleDateString("en-US", { weekday: "long", year: "numeric", month: "long", day: "numeric", hour: "numeric", minute: "2-digit", }); const sourceInfo = reminder.sourceModule ? `

Source: ${reminder.sourceLabel || reminder.sourceModule}

` : ""; const html = `

🔔 Reminder: ${reminder.title}

${dateStr}

${reminder.description ? `

${reminder.description}

` : ""} ${sourceInfo}

Sent by rSchedule • Manage Reminders

`; await transport.sendMail({ from: process.env.SMTP_FROM || "rSchedule ", to: reminder.notifyEmail, subject: `[Reminder] ${reminder.title}`, html, }); return { success: true, message: `Reminder email sent to ${reminder.notifyEmail}` }; } // ── Reminder API routes ── // GET /api/reminders — list reminders routes.get("/api/reminders", (c) => { const space = c.req.param("space") || "demo"; const doc = ensureDoc(space); let reminders = Object.values(doc.reminders); // Query filters const upcoming = c.req.query("upcoming"); const completed = c.req.query("completed"); if (completed === "false") { reminders = reminders.filter((r) => !r.completed); } else if (completed === "true") { reminders = reminders.filter((r) => r.completed); } if (upcoming) { const days = parseInt(upcoming) || 7; const now = Date.now(); const cutoff = now + days * 86400000; reminders = reminders.filter((r) => r.remindAt >= now && r.remindAt <= cutoff); } reminders.sort((a, b) => a.remindAt - b.remindAt); return c.json({ count: reminders.length, results: reminders }); }); // POST /api/reminders — create a reminder routes.post("/api/reminders", async (c) => { const space = c.req.param("space") || "demo"; const body = await c.req.json(); const { title, description, remindAt, allDay, timezone, notifyEmail, syncToCalendar, cronExpression } = body; if (!title?.trim() || !remindAt) return c.json({ error: "title and remindAt required" }, 400); const docId = scheduleDocId(space); const doc = ensureDoc(space); if (Object.keys(doc.reminders).length >= MAX_REMINDERS) return c.json({ error: `Maximum ${MAX_REMINDERS} reminders reached` }, 400); const reminderId = crypto.randomUUID(); const now = Date.now(); const reminder: Reminder = { id: reminderId, title: title.trim(), description: description || "", remindAt: typeof remindAt === "number" ? remindAt : new Date(remindAt).getTime(), allDay: allDay || false, timezone: timezone || "UTC", notifyEmail: notifyEmail || null, notified: false, completed: false, sourceModule: body.sourceModule || null, sourceEntityId: body.sourceEntityId || null, sourceLabel: body.sourceLabel || null, sourceColor: body.sourceColor || null, cronExpression: cronExpression || null, calendarEventId: null, createdBy: "user", createdAt: now, updatedAt: now, }; // Sync to calendar if requested if (syncToCalendar) { const eventId = syncReminderToCalendar(reminder, space); if (eventId) reminder.calendarEventId = eventId; } _syncServer!.changeDoc(docId, `create reminder ${reminderId}`, (d) => { d.reminders[reminderId] = reminder; }); const updated = _syncServer!.getDoc(docId)!; return c.json(updated.reminders[reminderId], 201); }); // GET /api/reminders/:id — get single reminder routes.get("/api/reminders/:id", (c) => { const space = c.req.param("space") || "demo"; const id = c.req.param("id"); const doc = ensureDoc(space); const reminder = doc.reminders[id]; if (!reminder) return c.json({ error: "Reminder not found" }, 404); return c.json(reminder); }); // PUT /api/reminders/:id — update a reminder routes.put("/api/reminders/:id", async (c) => { const space = c.req.param("space") || "demo"; const id = c.req.param("id"); const body = await c.req.json(); const docId = scheduleDocId(space); const doc = ensureDoc(space); if (!doc.reminders[id]) return c.json({ error: "Reminder not found" }, 404); _syncServer!.changeDoc(docId, `update reminder ${id}`, (d) => { const r = d.reminders[id]; if (!r) return; if (body.title !== undefined) r.title = body.title; if (body.description !== undefined) r.description = body.description; if (body.remindAt !== undefined) r.remindAt = typeof body.remindAt === "number" ? body.remindAt : new Date(body.remindAt).getTime(); if (body.allDay !== undefined) r.allDay = body.allDay; if (body.timezone !== undefined) r.timezone = body.timezone; if (body.notifyEmail !== undefined) r.notifyEmail = body.notifyEmail; if (body.cronExpression !== undefined) r.cronExpression = body.cronExpression; r.updatedAt = Date.now(); }); const updated = _syncServer!.getDoc(docId)!; return c.json(updated.reminders[id]); }); // DELETE /api/reminders/:id — delete (cascades to calendar) routes.delete("/api/reminders/:id", (c) => { const space = c.req.param("space") || "demo"; const id = c.req.param("id"); const docId = scheduleDocId(space); const doc = ensureDoc(space); const reminder = doc.reminders[id]; if (!reminder) return c.json({ error: "Reminder not found" }, 404); // Cascade: delete linked calendar event if (reminder.calendarEventId) { deleteCalendarEvent(space, reminder.calendarEventId); } _syncServer!.changeDoc(docId, `delete reminder ${id}`, (d) => { delete d.reminders[id]; }); return c.json({ ok: true }); }); // POST /api/reminders/:id/complete — mark completed routes.post("/api/reminders/:id/complete", (c) => { const space = c.req.param("space") || "demo"; const id = c.req.param("id"); const docId = scheduleDocId(space); const doc = ensureDoc(space); if (!doc.reminders[id]) return c.json({ error: "Reminder not found" }, 404); _syncServer!.changeDoc(docId, `complete reminder ${id}`, (d) => { const r = d.reminders[id]; if (!r) return; r.completed = true; r.updatedAt = Date.now(); }); const updated = _syncServer!.getDoc(docId)!; return c.json(updated.reminders[id]); }); // POST /api/reminders/:id/snooze — reschedule to a new date routes.post("/api/reminders/:id/snooze", async (c) => { const space = c.req.param("space") || "demo"; const id = c.req.param("id"); const body = await c.req.json(); const docId = scheduleDocId(space); const doc = ensureDoc(space); if (!doc.reminders[id]) return c.json({ error: "Reminder not found" }, 404); const newRemindAt = body.remindAt ? (typeof body.remindAt === "number" ? body.remindAt : new Date(body.remindAt).getTime()) : Date.now() + (body.hours || 24) * 3600000; _syncServer!.changeDoc(docId, `snooze reminder ${id}`, (d) => { const r = d.reminders[id]; if (!r) return; r.remindAt = newRemindAt; r.notified = false; r.updatedAt = Date.now(); }); // Update linked calendar event if exists const updated = _syncServer!.getDoc(docId)!; const reminder = updated.reminders[id]; if (reminder?.calendarEventId) { const calDocId = calendarDocId(space); const duration = reminder.allDay ? 86400000 : 3600000; _syncServer!.changeDoc(calDocId, `update reminder event time`, (d) => { const ev = d.events[reminder.calendarEventId!]; if (ev) { ev.startTime = newRemindAt; ev.endTime = newRemindAt + duration; ev.updatedAt = Date.now(); } }); } return c.json(updated.reminders[id]); }); // ── Module export ── export const scheduleModule: RSpaceModule = { id: "rschedule", name: "rSchedule", icon: "⏱", description: "Persistent cron-based job scheduling with email, webhooks, and backlog briefings", scoping: { defaultScope: "global", userConfigurable: false }, docSchemas: [ { pattern: "{space}:schedule:jobs", description: "Scheduled jobs and execution log", init: scheduleSchema.init, }, ], routes, landingPage: renderLanding, seedTemplate: seedDefaultJobs, async onInit(ctx) { _syncServer = ctx.syncServer; seedDefaultJobs("demo"); startTickLoop(); }, feeds: [ { id: "executions", name: "Executions", kind: "data", description: "Job execution events with status, timing, and output", }, ], acceptsFeeds: ["data", "governance"], outputPaths: [ { path: "jobs", name: "Jobs", icon: "⏱", description: "Scheduled jobs and their configurations" }, { path: "reminders", name: "Reminders", icon: "🔔", description: "Scheduled reminders with email notifications" }, { path: "log", name: "Execution Log", icon: "📋", description: "History of job executions" }, ], };