/** * Schedule module — persistent cron-based job scheduling. * * Replaces system-level crontabs with an in-process scheduler. * Jobs are stored in Automerge (survives restarts), evaluated on * a 60-second tick loop, and can execute emails, webhooks, * calendar events, broadcasts, or backlog briefings. * * All persistence uses Automerge documents via SyncServer. */ import { Hono } from "hono"; import * as Automerge from "@automerge/automerge"; import { createTransport, type Transporter } from "nodemailer"; import { CronExpressionParser } from "cron-parser"; import { renderShell } from "../../server/shell"; import { getModuleInfoList } from "../../shared/module"; import type { RSpaceModule } from "../../shared/module"; import { renderLanding } from "./landing"; import type { SyncServer } from "../../server/local-first/sync-server"; import { scheduleSchema, scheduleDocId, MAX_LOG_ENTRIES, MAX_REMINDERS, } from "./schemas"; import type { ScheduleDoc, ScheduleJob, ExecutionLogEntry, ActionType, Reminder, Workflow, WorkflowNode, WorkflowEdge, } from "./schemas"; import { NODE_CATALOG } from "./schemas"; import { calendarDocId } from "../rcal/schemas"; import type { CalendarDoc, ScheduledItemMetadata } from "../rcal/schemas"; let _syncServer: SyncServer | null = null; const routes = new Hono(); // ── SMTP transport (lazy init) ── let _smtpTransport: Transporter | null = null; function getSmtpTransport(): Transporter | null { if (_smtpTransport) return _smtpTransport; if (!process.env.SMTP_PASS) return null; _smtpTransport = createTransport({ host: process.env.SMTP_HOST || "mail.rmail.online", port: Number(process.env.SMTP_PORT) || 587, secure: Number(process.env.SMTP_PORT) === 465, auth: { user: process.env.SMTP_USER || "noreply@rmail.online", pass: process.env.SMTP_PASS, }, tls: { rejectUnauthorized: false }, }); return _smtpTransport; } // ── Local-first helpers ── function ensureDoc(space: string): ScheduleDoc { const docId = scheduleDocId(space); let doc = _syncServer!.getDoc(docId); if (!doc) { doc = Automerge.change( Automerge.init(), "init schedule", (d) => { const init = scheduleSchema.init(); d.meta = init.meta; d.meta.spaceSlug = space; d.jobs = {}; d.reminders = {}; d.workflows = {}; d.log = []; }, ); _syncServer!.setDoc(docId, doc); } return doc; } // ── Cron helpers ── function computeNextRun(cronExpression: string, timezone: string): number | null { try { const interval = CronExpressionParser.parse(cronExpression, { currentDate: new Date(), tz: timezone, }); return interval.next().toDate().getTime(); } catch { return null; } } function cronToHuman(expr: string): string { const parts = expr.split(/\s+/); if (parts.length !== 5) return expr; const [min, hour, dom, mon, dow] = parts; const dowNames: Record = { "0": "Sun", "1": "Mon", "2": "Tue", "3": "Wed", "4": "Thu", "5": "Fri", "6": "Sat", "7": "Sun", "1-5": "weekdays", "0,6": "weekends", }; if (min === "0" && hour !== "*" && dom === "*" && mon === "*" && dow === "*") return `Daily at ${hour}:00`; if (min === "0" && hour !== "*" && dom === "*" && mon === "*" && dow === "1-5") return `Weekdays at ${hour}:00`; if (min === "0" && hour !== "*" && dom === "*" && mon === "*" && dow !== "*") return `${dowNames[dow] || dow} at ${hour}:00`; if (min === "0" && hour !== "*" && dom !== "*" && mon === "*" && dow === "*") return `Monthly on day ${dom} at ${hour}:00`; if (min === "*" && hour === "*" && dom === "*" && mon === "*" && dow === "*") return "Every minute"; if (min.startsWith("*/")) return `Every ${min.slice(2)} minutes`; return expr; } // ── Template helpers ── function renderTemplate(template: string, vars: Record): string { let result = template; for (const [key, value] of Object.entries(vars)) { result = result.replaceAll(`{{${key}}}`, value); } return result; } // ── Action executors ── async function executeEmail( job: ScheduleJob, ): Promise<{ success: boolean; message: string }> { const transport = getSmtpTransport(); if (!transport) return { success: false, message: "SMTP not configured (SMTP_PASS missing)" }; const config = job.actionConfig as { to?: string; subject?: string; bodyTemplate?: string; }; if (!config.to) return { success: false, message: "No recipient (to) configured" }; const vars = { date: new Date().toLocaleDateString("en-US", { weekday: "long", year: "numeric", month: "long", day: "numeric" }), jobName: job.name, timestamp: new Date().toISOString(), }; const subject = renderTemplate(config.subject || `[rSchedule] ${job.name}`, vars); const html = renderTemplate(config.bodyTemplate || `

Scheduled job ${job.name} executed at ${vars.date}.

`, vars); await transport.sendMail({ from: process.env.SMTP_FROM || "rSchedule ", to: config.to, subject, html, }); return { success: true, message: `Email sent to ${config.to}` }; } async function executeWebhook( job: ScheduleJob, ): Promise<{ success: boolean; message: string }> { const config = job.actionConfig as { url?: string; method?: string; headers?: Record; bodyTemplate?: string; }; if (!config.url) return { success: false, message: "No webhook URL configured" }; const vars = { date: new Date().toISOString(), jobName: job.name, timestamp: new Date().toISOString(), }; const method = (config.method || "POST").toUpperCase(); const headers: Record = { "Content-Type": "application/json", ...config.headers, }; const body = method !== "GET" ? renderTemplate(config.bodyTemplate || JSON.stringify({ job: job.name, timestamp: vars.date }), vars) : undefined; const res = await fetch(config.url, { method, headers, body }); if (!res.ok) return { success: false, message: `Webhook ${res.status}: ${await res.text().catch(() => "")}` }; return { success: true, message: `Webhook ${method} ${config.url} → ${res.status}` }; } async function executeCalendarEvent( job: ScheduleJob, space: string, ): Promise<{ success: boolean; message: string }> { if (!_syncServer) return { success: false, message: "SyncServer not available" }; const config = job.actionConfig as { title?: string; duration?: number; sourceId?: string; }; const calDocId = calendarDocId(space); const calDoc = _syncServer.getDoc(calDocId); if (!calDoc) return { success: false, message: `Calendar doc not found for space ${space}` }; const eventId = crypto.randomUUID(); const now = Date.now(); const durationMs = (config.duration || 60) * 60 * 1000; _syncServer.changeDoc(calDocId, `rSchedule: create event for ${job.name}`, (d) => { d.events[eventId] = { id: eventId, title: config.title || job.name, description: `Auto-created by rSchedule job: ${job.name}`, startTime: now, endTime: now + durationMs, allDay: false, timezone: job.timezone || "UTC", rrule: null, status: null, visibility: null, sourceId: config.sourceId || null, sourceName: null, sourceType: null, sourceColor: null, locationId: null, locationName: null, coordinates: null, locationGranularity: null, locationLat: null, locationLng: null, isVirtual: false, virtualUrl: null, virtualPlatform: null, rToolSource: "rSchedule", rToolEntityId: job.id, attendees: [], attendeeCount: 0, metadata: null, createdAt: now, updatedAt: now, }; }); return { success: true, message: `Calendar event '${config.title || job.name}' created (${eventId})` }; } async function executeBroadcast( job: ScheduleJob, ): Promise<{ success: boolean; message: string }> { const config = job.actionConfig as { channel?: string; message?: string; }; // Broadcast via SyncServer's WebSocket connections is not directly accessible // from module code. For now, log the intent. Future: expose ws broadcast on SyncServer. const msg = config.message || `Scheduled broadcast from ${job.name}`; console.log(`[Schedule] Broadcast (${config.channel || "default"}): ${msg}`); return { success: true, message: `Broadcast sent: ${msg}` }; } async function executeBacklogBriefing( job: ScheduleJob, ): Promise<{ success: boolean; message: string }> { const config = job.actionConfig as { mode?: "morning" | "weekly" | "monthly"; scanPaths?: string[]; to?: string; }; const transport = getSmtpTransport(); if (!transport) return { success: false, message: "SMTP not configured (SMTP_PASS missing)" }; if (!config.to) return { success: false, message: "No recipient (to) configured" }; const mode = config.mode || "morning"; const scanPaths = config.scanPaths || ["/data/communities/*/backlog/tasks/"]; const now = new Date(); const dateStr = now.toLocaleDateString("en-US", { weekday: "long", year: "numeric", month: "long", day: "numeric" }); // Scan for backlog task files const { readdir, readFile, stat } = await import("node:fs/promises"); const { join, basename } = await import("node:path"); const { Glob } = await import("bun"); interface TaskInfo { file: string; title: string; priority: string; status: string; updatedAt: Date | null; staleDays: number; } const tasks: TaskInfo[] = []; for (const pattern of scanPaths) { try { const glob = new Glob(pattern.endsWith("/") ? pattern + "*.md" : pattern); for await (const filePath of glob.scan()) { try { const content = await readFile(filePath, "utf-8"); const fstat = await stat(filePath); // Parse YAML frontmatter const fmMatch = content.match(/^---\n([\s\S]*?)\n---/); let title = basename(filePath, ".md").replace(/-/g, " "); let priority = "medium"; let status = "open"; if (fmMatch) { const fm = fmMatch[1]; const titleMatch = fm.match(/^title:\s*(.+)$/m); const prioMatch = fm.match(/^priority:\s*(.+)$/m); const statusMatch = fm.match(/^status:\s*(.+)$/m); if (titleMatch) title = titleMatch[1].replace(/^["']|["']$/g, ""); if (prioMatch) priority = prioMatch[1].trim().toLowerCase(); if (statusMatch) status = statusMatch[1].trim().toLowerCase(); } const staleDays = Math.floor( (now.getTime() - fstat.mtime.getTime()) / (1000 * 60 * 60 * 24), ); tasks.push({ file: filePath, title, priority, status, updatedAt: fstat.mtime, staleDays }); } catch { // Skip unreadable files } } } catch { // Glob pattern didn't match or dir doesn't exist } } // Filter and sort based on mode let filtered = tasks.filter((t) => t.status !== "done" && t.status !== "closed"); let subject: string; let heading: string; switch (mode) { case "morning": // High/urgent priority + recently updated filtered = filtered .filter((t) => t.priority === "high" || t.priority === "urgent" || t.staleDays < 3) .sort((a, b) => { const priOrder: Record = { urgent: 0, high: 1, medium: 2, low: 3 }; return (priOrder[a.priority] ?? 2) - (priOrder[b.priority] ?? 2); }); subject = `Morning Briefing — ${dateStr}`; heading = "Good morning! Here's your task briefing:"; break; case "weekly": // All open tasks sorted by priority then staleness filtered.sort((a, b) => { const priOrder: Record = { urgent: 0, high: 1, medium: 2, low: 3 }; const pDiff = (priOrder[a.priority] ?? 2) - (priOrder[b.priority] ?? 2); return pDiff !== 0 ? pDiff : b.staleDays - a.staleDays; }); subject = `Weekly Backlog Review — ${dateStr}`; heading = "Weekly review of all open tasks:"; break; case "monthly": // Focus on stale items (> 14 days untouched) filtered = filtered .filter((t) => t.staleDays > 14) .sort((a, b) => b.staleDays - a.staleDays); subject = `Monthly Backlog Audit — ${dateStr}`; heading = "Monthly audit — these tasks haven't been touched in 14+ days:"; break; } // Build HTML email const taskRows = filtered.length > 0 ? filtered .slice(0, 50) .map((t) => { const prioColor: Record = { urgent: "#ef4444", high: "#f97316", medium: "#f59e0b", low: "#6b7280", }; return ` ${t.priority} ${t.title} ${t.status} ${t.staleDays}d ago `; }) .join("\n") : `No tasks match this filter.`; const html = `

${heading}

${dateStr} • ${filtered.length} task${filtered.length !== 1 ? "s" : ""}

${taskRows}
Priority Task Status Last Update

Sent by rSchedule • Manage Schedules

`; await transport.sendMail({ from: process.env.SMTP_FROM || "rSchedule ", to: config.to, subject: `[rSchedule] ${subject}`, html, }); return { success: true, message: `${mode} briefing sent to ${config.to} (${filtered.length} tasks)` }; } async function executeCalendarReminder( job: ScheduleJob, space: string, ): Promise<{ success: boolean; message: string }> { if (!_syncServer) return { success: false, message: "SyncServer not available" }; const transport = getSmtpTransport(); if (!transport) return { success: false, message: "SMTP not configured (SMTP_PASS missing)" }; const config = job.actionConfig as { to?: string }; if (!config.to) return { success: false, message: "No recipient (to) configured" }; // Load the calendar doc for this space const calDocId = calendarDocId(space); const calDoc = _syncServer.getDoc(calDocId); if (!calDoc) return { success: false, message: `Calendar doc not found for space ${space}` }; // Find scheduled items due today that haven't been reminded yet const now = new Date(); const todayStart = new Date(now.getFullYear(), now.getMonth(), now.getDate()).getTime(); const todayEnd = todayStart + 86400000; const dueItems = Object.values(calDoc.events).filter((ev) => { const meta = ev.metadata as ScheduledItemMetadata | null; return meta?.isScheduledItem === true && !meta.reminderSent && ev.startTime >= todayStart && ev.startTime < todayEnd; }); if (dueItems.length === 0) return { success: true, message: "No scheduled items due today" }; // Render email with all due items const dateStr = now.toLocaleDateString("en-US", { weekday: "long", year: "numeric", month: "long", day: "numeric" }); const itemRows = dueItems.map((ev) => { const meta = ev.metadata as ScheduledItemMetadata; const preview = meta.itemPreview; const prov = meta.provenance; const thumbHtml = preview.thumbnailUrl ? `thumbnail` : ""; const canvasLink = preview.canvasUrl ? `Open in Canvas` : ""; return `
${ev.title}
${preview.textPreview}
Source: ${prov.sourceType} in ${prov.sourceSpace} ${prov.rid ? ` • RID: ${prov.rid}` : ""}
${thumbHtml}
${canvasLink}
`; }).join("\n"); const html = `

Scheduled Knowledge Reminders

${dateStr} • ${dueItems.length} item${dueItems.length !== 1 ? "s" : ""}

${itemRows}

Sent by rSchedule • View Calendar

`; await transport.sendMail({ from: process.env.SMTP_FROM || "rSchedule ", to: config.to, subject: `[rSpace] ${dueItems.length} scheduled item${dueItems.length !== 1 ? "s" : ""} for ${dateStr}`, html, }); // Mark all sent items as reminded _syncServer.changeDoc(calDocId, `mark ${dueItems.length} reminders sent`, (d) => { for (const item of dueItems) { const ev = d.events[item.id]; if (!ev) continue; const meta = ev.metadata as ScheduledItemMetadata; meta.reminderSent = true; meta.reminderSentAt = Date.now(); ev.updatedAt = Date.now(); } }); return { success: true, message: `Calendar reminder sent to ${config.to} (${dueItems.length} items)` }; } // ── Unified executor ── async function executeJob( job: ScheduleJob, space: string, ): Promise<{ success: boolean; message: string }> { switch (job.actionType) { case "email": return executeEmail(job); case "webhook": return executeWebhook(job); case "calendar-event": return executeCalendarEvent(job, space); case "broadcast": return executeBroadcast(job); case "backlog-briefing": return executeBacklogBriefing(job); case "calendar-reminder": return executeCalendarReminder(job, space); default: return { success: false, message: `Unknown action type: ${job.actionType}` }; } } // ── Tick loop ── const TICK_INTERVAL = 60_000; function startTickLoop() { console.log("[Schedule] Tick loop started — checking every 60s"); const tick = async () => { if (!_syncServer) return; const now = Date.now(); // Iterate all known schedule docs // Convention: check the "demo" space and any spaces that have schedule docs const spaceSlugs = new Set(); spaceSlugs.add("demo"); // Also scan for any schedule docs already loaded const allDocs = _syncServer.listDocs(); for (const docId of allDocs) { const match = docId.match(/^(.+):schedule:jobs$/); if (match) spaceSlugs.add(match[1]); } for (const space of spaceSlugs) { try { const docId = scheduleDocId(space); const doc = _syncServer.getDoc(docId); if (!doc) continue; const dueJobs = Object.values(doc.jobs).filter( (j) => j.enabled && j.nextRunAt && j.nextRunAt <= now, ); for (const job of dueJobs) { const startMs = Date.now(); let result: { success: boolean; message: string }; try { result = await executeJob(job, space); } catch (e: any) { result = { success: false, message: e.message || String(e) }; } const durationMs = Date.now() - startMs; const logEntry: ExecutionLogEntry = { id: crypto.randomUUID(), jobId: job.id, status: result.success ? "success" : "error", message: result.message, durationMs, timestamp: Date.now(), }; console.log( `[Schedule] ${result.success ? "OK" : "ERR"} ${job.name} (${durationMs}ms): ${result.message}`, ); // Update job state + append log _syncServer.changeDoc(docId, `run job ${job.id}`, (d) => { const j = d.jobs[job.id]; if (!j) return; j.lastRunAt = Date.now(); j.lastRunStatus = result.success ? "success" : "error"; j.lastRunMessage = result.message; j.runCount = (j.runCount || 0) + 1; j.nextRunAt = computeNextRun(j.cronExpression, j.timezone) ?? null; // Append log entry, trim to max d.log.push(logEntry); while (d.log.length > MAX_LOG_ENTRIES) { d.log.splice(0, 1); } }); } // ── Process due reminders ── const dueReminders = Object.values(doc.reminders || {}).filter( (r) => !r.notified && !r.completed && r.remindAt <= now && r.notifyEmail, ); for (const reminder of dueReminders) { try { const result = await executeReminderEmail(reminder, space); console.log( `[Schedule] Reminder ${result.success ? "OK" : "ERR"} "${reminder.title}": ${result.message}`, ); _syncServer.changeDoc(docId, `notify reminder ${reminder.id}`, (d) => { const r = d.reminders[reminder.id]; if (!r) return; r.notified = true; r.updatedAt = Date.now(); // Handle recurring reminders if (r.cronExpression) { const nextRun = computeNextRun(r.cronExpression, r.timezone); if (nextRun) { r.remindAt = nextRun; r.notified = false; } } }); } catch (e) { console.error(`[Schedule] Reminder email error for "${reminder.title}":`, e); } } // ── Process due automation workflows ── const workflows = Object.values(doc.workflows || {}); for (const wf of workflows) { if (!wf.enabled) continue; const cronNodes = wf.nodes.filter(n => n.type === "trigger-cron"); for (const cronNode of cronNodes) { const expr = String(cronNode.config.cronExpression || ""); const tz = String(cronNode.config.timezone || "UTC"); if (!expr) continue; try { const interval = CronExpressionParser.parse(expr, { currentDate: new Date(now - TICK_INTERVAL), tz, }); const nextDate = interval.next().toDate(); if (nextDate.getTime() <= now) { console.log(`[Schedule] Running cron workflow "${wf.name}" for space ${space}`); const results = await executeWorkflow(wf, space); const allOk = results.every(r => r.status !== "error"); _syncServer.changeDoc(docId, `tick workflow ${wf.id}`, (d) => { const w = d.workflows[wf.id]; if (!w) return; w.lastRunAt = Date.now(); w.lastRunStatus = allOk ? "success" : "error"; w.runCount = (w.runCount || 0) + 1; w.updatedAt = Date.now(); }); } } catch { /* invalid cron — skip */ } } } } catch (e) { console.error(`[Schedule] Tick error for space ${space}:`, e); } } }; setTimeout(tick, 10_000); // First tick after 10s setInterval(tick, TICK_INTERVAL); } // ── Seed default jobs ── const SEED_JOBS: Omit[] = [ { id: "backlog-morning", name: "Morning Backlog Briefing", description: "Weekday morning digest of high-priority and recently-updated tasks.", enabled: true, cronExpression: "0 14 * * 1-5", timezone: "America/Vancouver", actionType: "backlog-briefing", actionConfig: { mode: "morning", to: "jeff@jeffemmett.com" }, createdBy: "system", }, { id: "backlog-weekly", name: "Weekly Backlog Review", description: "Friday afternoon review of all open tasks sorted by priority and staleness.", enabled: true, cronExpression: "0 22 * * 5", timezone: "America/Vancouver", actionType: "backlog-briefing", actionConfig: { mode: "weekly", to: "jeff@jeffemmett.com" }, createdBy: "system", }, { id: "backlog-monthly", name: "Monthly Backlog Audit", description: "First of the month audit of stale tasks (14+ days untouched).", enabled: true, cronExpression: "0 14 1 * *", timezone: "America/Vancouver", actionType: "backlog-briefing", actionConfig: { mode: "monthly", to: "jeff@jeffemmett.com" }, createdBy: "system", }, { id: "calendar-reminder-daily", name: "Daily Calendar Reminders", description: "Sends email reminders for knowledge items scheduled on today's date.", enabled: true, cronExpression: "0 14 * * *", timezone: "America/Vancouver", actionType: "calendar-reminder", actionConfig: { to: "jeff@jeffemmett.com" }, createdBy: "system", }, ]; function seedDefaultJobs(space: string) { const docId = scheduleDocId(space); const doc = ensureDoc(space); if (Object.keys(doc.jobs).length > 0) return; const now = Date.now(); _syncServer!.changeDoc(docId, "seed default jobs", (d) => { for (const seed of SEED_JOBS) { d.jobs[seed.id] = { ...seed, lastRunAt: null, lastRunStatus: null, lastRunMessage: "", nextRunAt: computeNextRun(seed.cronExpression, seed.timezone), runCount: 0, createdAt: now, updatedAt: now, }; } }); console.log(`[Schedule] Seeded ${SEED_JOBS.length} default jobs for space "${space}"`); } // ── API routes ── // GET / — serve schedule UI routes.get("/", (c) => { const space = c.req.param("space") || "demo"; const dataSpace = (c.get("effectiveSpace" as any) as string) || space; return c.html( renderShell({ title: `${space} — Schedule | rSpace`, moduleId: "rschedule", spaceSlug: space, modules: getModuleInfoList(), theme: "dark", body: ``, scripts: ``, styles: ``, }), ); }); // GET /api/jobs — list all jobs routes.get("/api/jobs", (c) => { const space = c.req.param("space") || "demo"; const dataSpace = (c.get("effectiveSpace" as any) as string) || space; const doc = ensureDoc(dataSpace); const jobs = Object.values(doc.jobs).map((j) => ({ ...j, cronHuman: cronToHuman(j.cronExpression), })); jobs.sort((a, b) => a.name.localeCompare(b.name)); return c.json({ count: jobs.length, results: jobs }); }); // POST /api/jobs — create a new job routes.post("/api/jobs", async (c) => { const space = c.req.param("space") || "demo"; const dataSpace = (c.get("effectiveSpace" as any) as string) || space; const body = await c.req.json(); const { name, description, cronExpression, timezone, actionType, actionConfig, enabled } = body; if (!name?.trim() || !cronExpression || !actionType) return c.json({ error: "name, cronExpression, and actionType required" }, 400); // Validate cron expression try { CronExpressionParser.parse(cronExpression); } catch { return c.json({ error: "Invalid cron expression" }, 400); } const docId = scheduleDocId(dataSpace); ensureDoc(dataSpace); const jobId = crypto.randomUUID(); const now = Date.now(); const tz = timezone || "UTC"; _syncServer!.changeDoc(docId, `create job ${jobId}`, (d) => { d.jobs[jobId] = { id: jobId, name: name.trim(), description: description || "", enabled: enabled !== false, cronExpression, timezone: tz, actionType, actionConfig: actionConfig || {}, lastRunAt: null, lastRunStatus: null, lastRunMessage: "", nextRunAt: computeNextRun(cronExpression, tz), runCount: 0, createdBy: "user", createdAt: now, updatedAt: now, }; }); const updated = _syncServer!.getDoc(docId)!; return c.json(updated.jobs[jobId], 201); }); // GET /api/jobs/:id routes.get("/api/jobs/:id", (c) => { const space = c.req.param("space") || "demo"; const dataSpace = (c.get("effectiveSpace" as any) as string) || space; const id = c.req.param("id"); const doc = ensureDoc(dataSpace); const job = doc.jobs[id]; if (!job) return c.json({ error: "Job not found" }, 404); return c.json({ ...job, cronHuman: cronToHuman(job.cronExpression) }); }); // PUT /api/jobs/:id — update a job routes.put("/api/jobs/:id", async (c) => { const space = c.req.param("space") || "demo"; const dataSpace = (c.get("effectiveSpace" as any) as string) || space; const id = c.req.param("id"); const body = await c.req.json(); const docId = scheduleDocId(dataSpace); const doc = ensureDoc(dataSpace); if (!doc.jobs[id]) return c.json({ error: "Job not found" }, 404); // Validate cron if provided if (body.cronExpression) { try { CronExpressionParser.parse(body.cronExpression); } catch { return c.json({ error: "Invalid cron expression" }, 400); } } _syncServer!.changeDoc(docId, `update job ${id}`, (d) => { const j = d.jobs[id]; if (!j) return; if (body.name !== undefined) j.name = body.name; if (body.description !== undefined) j.description = body.description; if (body.enabled !== undefined) j.enabled = body.enabled; if (body.cronExpression !== undefined) { j.cronExpression = body.cronExpression; j.nextRunAt = computeNextRun(body.cronExpression, body.timezone || j.timezone); } if (body.timezone !== undefined) { j.timezone = body.timezone; j.nextRunAt = computeNextRun(j.cronExpression, body.timezone); } if (body.actionType !== undefined) j.actionType = body.actionType; if (body.actionConfig !== undefined) j.actionConfig = body.actionConfig; j.updatedAt = Date.now(); }); const updated = _syncServer!.getDoc(docId)!; return c.json(updated.jobs[id]); }); // DELETE /api/jobs/:id routes.delete("/api/jobs/:id", (c) => { const space = c.req.param("space") || "demo"; const dataSpace = (c.get("effectiveSpace" as any) as string) || space; const id = c.req.param("id"); const docId = scheduleDocId(dataSpace); const doc = ensureDoc(dataSpace); if (!doc.jobs[id]) return c.json({ error: "Job not found" }, 404); _syncServer!.changeDoc(docId, `delete job ${id}`, (d) => { delete d.jobs[id]; }); return c.json({ ok: true }); }); // POST /api/jobs/:id/run — manually trigger a job routes.post("/api/jobs/:id/run", async (c) => { const space = c.req.param("space") || "demo"; const dataSpace = (c.get("effectiveSpace" as any) as string) || space; const id = c.req.param("id"); const docId = scheduleDocId(dataSpace); const doc = ensureDoc(dataSpace); const job = doc.jobs[id]; if (!job) return c.json({ error: "Job not found" }, 404); const startMs = Date.now(); let result: { success: boolean; message: string }; try { result = await executeJob(job, dataSpace); } catch (e: any) { result = { success: false, message: e.message || String(e) }; } const durationMs = Date.now() - startMs; const logEntry: ExecutionLogEntry = { id: crypto.randomUUID(), jobId: job.id, status: result.success ? "success" : "error", message: result.message, durationMs, timestamp: Date.now(), }; _syncServer!.changeDoc(docId, `manual run ${id}`, (d) => { const j = d.jobs[id]; if (j) { j.lastRunAt = Date.now(); j.lastRunStatus = result.success ? "success" : "error"; j.lastRunMessage = result.message; j.runCount = (j.runCount || 0) + 1; } d.log.push(logEntry); while (d.log.length > MAX_LOG_ENTRIES) { d.log.splice(0, 1); } }); return c.json({ ...result, durationMs }); }); // GET /api/log — execution log routes.get("/api/log", (c) => { const space = c.req.param("space") || "demo"; const dataSpace = (c.get("effectiveSpace" as any) as string) || space; const doc = ensureDoc(dataSpace); const log = [...doc.log].reverse(); // newest first return c.json({ count: log.length, results: log }); }); // GET /api/log/:jobId — execution log filtered by job routes.get("/api/log/:jobId", (c) => { const space = c.req.param("space") || "demo"; const dataSpace = (c.get("effectiveSpace" as any) as string) || space; const jobId = c.req.param("jobId"); const doc = ensureDoc(dataSpace); const log = doc.log.filter((e) => e.jobId === jobId).reverse(); return c.json({ count: log.length, results: log }); }); // ── Reminder helpers ── function ensureRemindersCalendarSource(space: string): string { const calDocId = calendarDocId(space); const calDoc = _syncServer!.getDoc(calDocId); if (!calDoc) return ""; // Check if "Reminders" source already exists const existing = Object.values(calDoc.sources).find( (s) => s.name === "Reminders" && s.sourceType === "rSchedule", ); if (existing) return existing.id; const sourceId = crypto.randomUUID(); const now = Date.now(); _syncServer!.changeDoc(calDocId, "create Reminders calendar source", (d) => { d.sources[sourceId] = { id: sourceId, name: "Reminders", sourceType: "rSchedule", url: null, color: "#f59e0b", isActive: true, isVisible: true, syncIntervalMinutes: null, lastSyncedAt: now, ownerId: null, createdAt: now, }; }); return sourceId; } function syncReminderToCalendar(reminder: Reminder, space: string): string | null { if (!_syncServer) return null; const calDocId = calendarDocId(space); const calDoc = _syncServer.getDoc(calDocId); if (!calDoc) return null; const sourceId = ensureRemindersCalendarSource(space); const eventId = crypto.randomUUID(); const now = Date.now(); const duration = reminder.allDay ? 86400000 : 3600000; _syncServer.changeDoc(calDocId, `sync reminder ${reminder.id} to calendar`, (d) => { d.events[eventId] = { id: eventId, title: reminder.title, description: reminder.description, startTime: reminder.remindAt, endTime: reminder.remindAt + duration, allDay: reminder.allDay, timezone: reminder.timezone || "UTC", rrule: null, status: null, visibility: null, sourceId, sourceName: "Reminders", sourceType: "rSchedule", sourceColor: reminder.sourceColor || "#f59e0b", locationId: null, locationName: null, coordinates: null, locationGranularity: null, locationLat: null, locationLng: null, isVirtual: false, virtualUrl: null, virtualPlatform: null, rToolSource: "rSchedule", rToolEntityId: reminder.id, attendees: [], attendeeCount: 0, metadata: null, createdAt: now, updatedAt: now, }; }); return eventId; } function deleteCalendarEvent(space: string, eventId: string) { if (!_syncServer) return; const calDocId = calendarDocId(space); const calDoc = _syncServer.getDoc(calDocId); if (!calDoc || !calDoc.events[eventId]) return; _syncServer.changeDoc(calDocId, `delete reminder calendar event ${eventId}`, (d) => { delete d.events[eventId]; }); } // ── Reminder email executor ── async function executeReminderEmail( reminder: Reminder, space: string, ): Promise<{ success: boolean; message: string }> { const transport = getSmtpTransport(); if (!transport) return { success: false, message: "SMTP not configured (SMTP_PASS missing)" }; if (!reminder.notifyEmail) return { success: false, message: "No email address on reminder" }; const dateStr = new Date(reminder.remindAt).toLocaleDateString("en-US", { weekday: "long", year: "numeric", month: "long", day: "numeric", hour: "numeric", minute: "2-digit", }); const sourceInfo = reminder.sourceModule ? `

Source: ${reminder.sourceLabel || reminder.sourceModule}

` : ""; const html = `

🔔 Reminder: ${reminder.title}

${dateStr}

${reminder.description ? `

${reminder.description}

` : ""} ${sourceInfo}

Sent by rSchedule • Manage Reminders

`; await transport.sendMail({ from: process.env.SMTP_FROM || "rSchedule ", to: reminder.notifyEmail, subject: `[Reminder] ${reminder.title}`, html, }); return { success: true, message: `Reminder email sent to ${reminder.notifyEmail}` }; } // ── Reminder API routes ── // GET /api/reminders — list reminders routes.get("/api/reminders", (c) => { const space = c.req.param("space") || "demo"; const dataSpace = (c.get("effectiveSpace" as any) as string) || space; const doc = ensureDoc(dataSpace); let reminders = Object.values(doc.reminders); // Query filters const upcoming = c.req.query("upcoming"); const completed = c.req.query("completed"); if (completed === "false") { reminders = reminders.filter((r) => !r.completed); } else if (completed === "true") { reminders = reminders.filter((r) => r.completed); } if (upcoming) { const days = parseInt(upcoming) || 7; const now = Date.now(); const cutoff = now + days * 86400000; reminders = reminders.filter((r) => r.remindAt >= now && r.remindAt <= cutoff); } reminders.sort((a, b) => a.remindAt - b.remindAt); return c.json({ count: reminders.length, results: reminders }); }); // POST /api/reminders — create a reminder routes.post("/api/reminders", async (c) => { const space = c.req.param("space") || "demo"; const dataSpace = (c.get("effectiveSpace" as any) as string) || space; const body = await c.req.json(); const { title, description, remindAt, allDay, timezone, notifyEmail, syncToCalendar, cronExpression } = body; if (!title?.trim() || !remindAt) return c.json({ error: "title and remindAt required" }, 400); const docId = scheduleDocId(dataSpace); const doc = ensureDoc(dataSpace); if (Object.keys(doc.reminders).length >= MAX_REMINDERS) return c.json({ error: `Maximum ${MAX_REMINDERS} reminders reached` }, 400); const reminderId = crypto.randomUUID(); const now = Date.now(); const reminder: Reminder = { id: reminderId, title: title.trim(), description: description || "", remindAt: typeof remindAt === "number" ? remindAt : new Date(remindAt).getTime(), allDay: allDay || false, timezone: timezone || "UTC", notifyEmail: notifyEmail || null, notified: false, completed: false, sourceModule: body.sourceModule || null, sourceEntityId: body.sourceEntityId || null, sourceLabel: body.sourceLabel || null, sourceColor: body.sourceColor || null, cronExpression: cronExpression || null, calendarEventId: null, createdBy: "user", createdAt: now, updatedAt: now, }; // Sync to calendar if requested if (syncToCalendar) { const eventId = syncReminderToCalendar(reminder, dataSpace); if (eventId) reminder.calendarEventId = eventId; } _syncServer!.changeDoc(docId, `create reminder ${reminderId}`, (d) => { d.reminders[reminderId] = reminder; }); const updated = _syncServer!.getDoc(docId)!; return c.json(updated.reminders[reminderId], 201); }); // GET /api/reminders/:id — get single reminder routes.get("/api/reminders/:id", (c) => { const space = c.req.param("space") || "demo"; const dataSpace = (c.get("effectiveSpace" as any) as string) || space; const id = c.req.param("id"); const doc = ensureDoc(dataSpace); const reminder = doc.reminders[id]; if (!reminder) return c.json({ error: "Reminder not found" }, 404); return c.json(reminder); }); // PUT /api/reminders/:id — update a reminder routes.put("/api/reminders/:id", async (c) => { const space = c.req.param("space") || "demo"; const dataSpace = (c.get("effectiveSpace" as any) as string) || space; const id = c.req.param("id"); const body = await c.req.json(); const docId = scheduleDocId(dataSpace); const doc = ensureDoc(dataSpace); if (!doc.reminders[id]) return c.json({ error: "Reminder not found" }, 404); _syncServer!.changeDoc(docId, `update reminder ${id}`, (d) => { const r = d.reminders[id]; if (!r) return; if (body.title !== undefined) r.title = body.title; if (body.description !== undefined) r.description = body.description; if (body.remindAt !== undefined) r.remindAt = typeof body.remindAt === "number" ? body.remindAt : new Date(body.remindAt).getTime(); if (body.allDay !== undefined) r.allDay = body.allDay; if (body.timezone !== undefined) r.timezone = body.timezone; if (body.notifyEmail !== undefined) r.notifyEmail = body.notifyEmail; if (body.cronExpression !== undefined) r.cronExpression = body.cronExpression; r.updatedAt = Date.now(); }); const updated = _syncServer!.getDoc(docId)!; return c.json(updated.reminders[id]); }); // DELETE /api/reminders/:id — delete (cascades to calendar) routes.delete("/api/reminders/:id", (c) => { const space = c.req.param("space") || "demo"; const dataSpace = (c.get("effectiveSpace" as any) as string) || space; const id = c.req.param("id"); const docId = scheduleDocId(dataSpace); const doc = ensureDoc(dataSpace); const reminder = doc.reminders[id]; if (!reminder) return c.json({ error: "Reminder not found" }, 404); // Cascade: delete linked calendar event if (reminder.calendarEventId) { deleteCalendarEvent(dataSpace, reminder.calendarEventId); } _syncServer!.changeDoc(docId, `delete reminder ${id}`, (d) => { delete d.reminders[id]; }); return c.json({ ok: true }); }); // POST /api/reminders/:id/complete — mark completed routes.post("/api/reminders/:id/complete", (c) => { const space = c.req.param("space") || "demo"; const dataSpace = (c.get("effectiveSpace" as any) as string) || space; const id = c.req.param("id"); const docId = scheduleDocId(dataSpace); const doc = ensureDoc(dataSpace); if (!doc.reminders[id]) return c.json({ error: "Reminder not found" }, 404); _syncServer!.changeDoc(docId, `complete reminder ${id}`, (d) => { const r = d.reminders[id]; if (!r) return; r.completed = true; r.updatedAt = Date.now(); }); const updated = _syncServer!.getDoc(docId)!; return c.json(updated.reminders[id]); }); // POST /api/reminders/:id/snooze — reschedule to a new date routes.post("/api/reminders/:id/snooze", async (c) => { const space = c.req.param("space") || "demo"; const dataSpace = (c.get("effectiveSpace" as any) as string) || space; const id = c.req.param("id"); const body = await c.req.json(); const docId = scheduleDocId(dataSpace); const doc = ensureDoc(dataSpace); if (!doc.reminders[id]) return c.json({ error: "Reminder not found" }, 404); const newRemindAt = body.remindAt ? (typeof body.remindAt === "number" ? body.remindAt : new Date(body.remindAt).getTime()) : Date.now() + (body.hours || 24) * 3600000; _syncServer!.changeDoc(docId, `snooze reminder ${id}`, (d) => { const r = d.reminders[id]; if (!r) return; r.remindAt = newRemindAt; r.notified = false; r.updatedAt = Date.now(); }); // Update linked calendar event if exists const updated = _syncServer!.getDoc(docId)!; const reminder = updated.reminders[id]; if (reminder?.calendarEventId) { const calDocId = calendarDocId(dataSpace); const duration = reminder.allDay ? 86400000 : 3600000; _syncServer!.changeDoc(calDocId, `update reminder event time`, (d) => { const ev = d.events[reminder.calendarEventId!]; if (ev) { ev.startTime = newRemindAt; ev.endTime = newRemindAt + duration; ev.updatedAt = Date.now(); } }); } return c.json(updated.reminders[id]); }); // ── Automation canvas page route ── routes.get("/reminders", (c) => { const space = c.req.param("space") || "demo"; return c.html( renderShell({ title: `${space} — Automations | rSpace`, moduleId: "rschedule", spaceSlug: space, modules: getModuleInfoList(), theme: "dark", body: ``, scripts: ``, styles: ``, }), ); }); // ── Workflow CRUD API ── routes.get("/api/workflows", (c) => { const space = c.req.param("space") || "demo"; const dataSpace = (c.get("effectiveSpace" as any) as string) || space; const doc = ensureDoc(dataSpace); // Ensure workflows field exists on older docs const workflows = Object.values(doc.workflows || {}); workflows.sort((a, b) => a.name.localeCompare(b.name)); return c.json({ count: workflows.length, results: workflows }); }); routes.post("/api/workflows", async (c) => { const space = c.req.param("space") || "demo"; const dataSpace = (c.get("effectiveSpace" as any) as string) || space; const body = await c.req.json(); const docId = scheduleDocId(dataSpace); ensureDoc(dataSpace); const wfId = crypto.randomUUID(); const now = Date.now(); const workflow: Workflow = { id: wfId, name: body.name || "New Workflow", enabled: body.enabled !== false, nodes: body.nodes || [], edges: body.edges || [], lastRunAt: null, lastRunStatus: null, runCount: 0, createdAt: now, updatedAt: now, }; _syncServer!.changeDoc(docId, `create workflow ${wfId}`, (d) => { if (!d.workflows) d.workflows = {} as any; (d.workflows as any)[wfId] = workflow; }); const updated = _syncServer!.getDoc(docId)!; return c.json(updated.workflows[wfId], 201); }); routes.get("/api/workflows/:id", (c) => { const space = c.req.param("space") || "demo"; const dataSpace = (c.get("effectiveSpace" as any) as string) || space; const id = c.req.param("id"); const doc = ensureDoc(dataSpace); const wf = doc.workflows?.[id]; if (!wf) return c.json({ error: "Workflow not found" }, 404); return c.json(wf); }); routes.put("/api/workflows/:id", async (c) => { const space = c.req.param("space") || "demo"; const dataSpace = (c.get("effectiveSpace" as any) as string) || space; const id = c.req.param("id"); const body = await c.req.json(); const docId = scheduleDocId(dataSpace); const doc = ensureDoc(dataSpace); if (!doc.workflows?.[id]) return c.json({ error: "Workflow not found" }, 404); _syncServer!.changeDoc(docId, `update workflow ${id}`, (d) => { const wf = d.workflows[id]; if (!wf) return; if (body.name !== undefined) wf.name = body.name; if (body.enabled !== undefined) wf.enabled = body.enabled; if (body.nodes !== undefined) { // Replace the nodes array while (wf.nodes.length > 0) wf.nodes.splice(0, 1); for (const n of body.nodes) wf.nodes.push(n); } if (body.edges !== undefined) { while (wf.edges.length > 0) wf.edges.splice(0, 1); for (const e of body.edges) wf.edges.push(e); } wf.updatedAt = Date.now(); }); const updated = _syncServer!.getDoc(docId)!; return c.json(updated.workflows[id]); }); routes.delete("/api/workflows/:id", (c) => { const space = c.req.param("space") || "demo"; const dataSpace = (c.get("effectiveSpace" as any) as string) || space; const id = c.req.param("id"); const docId = scheduleDocId(dataSpace); const doc = ensureDoc(dataSpace); if (!doc.workflows?.[id]) return c.json({ error: "Workflow not found" }, 404); _syncServer!.changeDoc(docId, `delete workflow ${id}`, (d) => { delete d.workflows[id]; }); return c.json({ ok: true }); }); // ── Workflow execution engine ── interface NodeResult { nodeId: string; status: "success" | "error" | "skipped"; message: string; durationMs: number; outputData?: unknown; } function topologicalSort(nodes: WorkflowNode[], edges: WorkflowEdge[]): WorkflowNode[] { const adj = new Map(); const inDegree = new Map(); for (const n of nodes) { adj.set(n.id, []); inDegree.set(n.id, 0); } for (const e of edges) { adj.get(e.fromNode)?.push(e.toNode); inDegree.set(e.toNode, (inDegree.get(e.toNode) || 0) + 1); } const queue: string[] = []; for (const [id, deg] of inDegree) { if (deg === 0) queue.push(id); } const sorted: string[] = []; while (queue.length > 0) { const id = queue.shift()!; sorted.push(id); for (const neighbor of adj.get(id) || []) { const newDeg = (inDegree.get(neighbor) || 1) - 1; inDegree.set(neighbor, newDeg); if (newDeg === 0) queue.push(neighbor); } } const nodeMap = new Map(nodes.map(n => [n.id, n])); return sorted.map(id => nodeMap.get(id)!).filter(Boolean); } function haversineKm(lat1: number, lng1: number, lat2: number, lng2: number): number { const R = 6371; const dLat = (lat2 - lat1) * Math.PI / 180; const dLng = (lng2 - lng1) * Math.PI / 180; const a = Math.sin(dLat / 2) ** 2 + Math.cos(lat1 * Math.PI / 180) * Math.cos(lat2 * Math.PI / 180) * Math.sin(dLng / 2) ** 2; return R * 2 * Math.atan2(Math.sqrt(a), Math.sqrt(1 - a)); } async function executeWorkflowNode( node: WorkflowNode, inputData: unknown, space: string, ): Promise<{ success: boolean; message: string; outputData?: unknown }> { const cfg = node.config; switch (node.type) { // ── Triggers ── case "trigger-cron": return { success: true, message: "Cron triggered", outputData: { timestamp: Date.now() } }; case "trigger-data-change": return { success: true, message: `Watching ${cfg.module || "any"} module`, outputData: inputData || {} }; case "trigger-webhook": return { success: true, message: "Webhook received", outputData: inputData || {} }; case "trigger-manual": return { success: true, message: "Manual trigger fired", outputData: { timestamp: Date.now() } }; case "trigger-proximity": { const data = inputData as { lat?: number; lng?: number } | undefined; if (!data?.lat || !data?.lng) return { success: true, message: "No location data", outputData: { distance: null } }; const dist = haversineKm(data.lat, data.lng, Number(cfg.lat) || 0, Number(cfg.lng) || 0); const inRange = dist <= (Number(cfg.radiusKm) || 1); return { success: true, message: `Distance: ${dist.toFixed(2)}km (${inRange ? "in range" : "out of range"})`, outputData: { distance: dist, inRange } }; } // ── Conditions ── case "condition-compare": { const val = String(inputData ?? ""); const cmp = String(cfg.compareValue ?? ""); let result = false; switch (cfg.operator) { case "equals": result = val === cmp; break; case "not-equals": result = val !== cmp; break; case "greater-than": result = Number(val) > Number(cmp); break; case "less-than": result = Number(val) < Number(cmp); break; case "contains": result = val.includes(cmp); break; } return { success: true, message: `Compare: ${result}`, outputData: result }; } case "condition-geofence": { const coords = inputData as { lat?: number; lng?: number } | undefined; if (!coords?.lat || !coords?.lng) return { success: true, message: "No coords", outputData: false }; const dist = haversineKm(coords.lat, coords.lng, Number(cfg.centerLat) || 0, Number(cfg.centerLng) || 0); const inside = dist <= (Number(cfg.radiusKm) || 5); return { success: true, message: `Geofence: ${inside ? "inside" : "outside"} (${dist.toFixed(2)}km)`, outputData: inside }; } case "condition-time-window": { const now = new Date(); const hour = now.getHours(); const day = now.getDay(); const startH = Number(cfg.startHour) || 0; const endH = Number(cfg.endHour) || 23; const days = String(cfg.days || "0,1,2,3,4,5,6").split(",").map(Number); const inWindow = hour >= startH && hour < endH && days.includes(day); return { success: true, message: `Time window: ${inWindow ? "in" : "outside"}`, outputData: inWindow }; } case "condition-data-filter": { const data = inputData as Record | undefined; const field = String(cfg.field || ""); const val = data?.[field]; let match = false; switch (cfg.operator) { case "equals": match = String(val) === String(cfg.value); break; case "not-equals": match = String(val) !== String(cfg.value); break; case "contains": match = String(val ?? "").includes(String(cfg.value ?? "")); break; case "exists": match = val !== undefined && val !== null; break; } return { success: true, message: `Filter: ${match ? "match" : "no match"}`, outputData: match ? data : null }; } // ── Actions ── case "action-send-email": { const transport = getSmtpTransport(); if (!transport) return { success: false, message: "SMTP not configured" }; if (!cfg.to) return { success: false, message: "No recipient" }; const vars: Record = { date: new Date().toLocaleDateString("en-US", { weekday: "long", year: "numeric", month: "long", day: "numeric" }), timestamp: new Date().toISOString(), ...(typeof inputData === "object" && inputData !== null ? Object.fromEntries(Object.entries(inputData as Record).map(([k, v]) => [k, String(v)])) : {}), }; const subject = renderTemplate(String(cfg.subject || "Automation Notification"), vars); const html = renderTemplate(String(cfg.bodyTemplate || `

Automation executed at ${vars.date}.

`), vars); await transport.sendMail({ from: process.env.SMTP_FROM || "rSchedule ", to: String(cfg.to), subject, html, }); return { success: true, message: `Email sent to ${cfg.to}` }; } case "action-post-webhook": { if (!cfg.url) return { success: false, message: "No URL configured" }; const method = String(cfg.method || "POST").toUpperCase(); const vars: Record = { timestamp: new Date().toISOString(), ...(typeof inputData === "object" && inputData !== null ? Object.fromEntries(Object.entries(inputData as Record).map(([k, v]) => [k, String(v)])) : {}), }; const body = renderTemplate(String(cfg.bodyTemplate || JSON.stringify({ timestamp: vars.timestamp })), vars); const res = await fetch(String(cfg.url), { method, headers: { "Content-Type": "application/json" }, body: method !== "GET" ? body : undefined, }); if (!res.ok) return { success: false, message: `Webhook ${res.status}` }; return { success: true, message: `Webhook ${method} ${cfg.url} -> ${res.status}`, outputData: await res.json().catch(() => null) }; } case "action-create-event": { if (!_syncServer) return { success: false, message: "SyncServer unavailable" }; const calDocId = calendarDocId(space); const calDoc = _syncServer.getDoc(calDocId); if (!calDoc) return { success: false, message: "Calendar doc not found" }; const eventId = crypto.randomUUID(); const now = Date.now(); const durationMs = (Number(cfg.durationMinutes) || 60) * 60 * 1000; _syncServer.changeDoc(calDocId, `automation: create event`, (d) => { d.events[eventId] = { id: eventId, title: String(cfg.title || "Automation Event"), description: "Created by rSchedule automation", startTime: now, endTime: now + durationMs, allDay: false, timezone: "UTC", rrule: null, status: null, visibility: null, sourceId: null, sourceName: null, sourceType: null, sourceColor: null, locationId: null, locationName: null, coordinates: null, locationGranularity: null, locationLat: null, locationLng: null, isVirtual: false, virtualUrl: null, virtualPlatform: null, rToolSource: "rSchedule", rToolEntityId: node.id, attendees: [], attendeeCount: 0, metadata: null, createdAt: now, updatedAt: now, }; }); return { success: true, message: `Event created: ${cfg.title || "Automation Event"}`, outputData: { eventId } }; } case "action-create-task": return { success: true, message: `Task "${cfg.title || "New task"}" queued`, outputData: { taskTitle: cfg.title } }; case "action-send-notification": console.log(`[Automation] Notification: ${cfg.title || "Notification"} — ${cfg.message || ""}`); return { success: true, message: `Notification: ${cfg.title}` }; case "action-update-data": return { success: true, message: `Data update queued for ${cfg.module || "unknown"}` }; default: return { success: false, message: `Unknown node type: ${node.type}` }; } } async function executeWorkflow( workflow: Workflow, space: string, triggerData?: unknown, ): Promise { const sorted = topologicalSort(workflow.nodes, workflow.edges); const results: NodeResult[] = []; const nodeOutputs = new Map(); for (const node of sorted) { const startMs = Date.now(); // Gather input data from upstream edges let inputData: unknown = triggerData; const incomingEdges = workflow.edges.filter(e => e.toNode === node.id); if (incomingEdges.length > 0) { // For conditions that output booleans: if the upstream condition result is false // and this node connects via the "true" port, skip it const upstreamNode = workflow.nodes.find(n => n.id === incomingEdges[0].fromNode); const upstreamOutput = nodeOutputs.get(incomingEdges[0].fromNode); if (upstreamNode?.type.startsWith("condition-")) { const port = incomingEdges[0].fromPort; // Boolean result from condition if (port === "true" && upstreamOutput === false) { results.push({ nodeId: node.id, status: "skipped", message: "Condition false, skipping true branch", durationMs: 0 }); continue; } if (port === "false" && upstreamOutput === true) { results.push({ nodeId: node.id, status: "skipped", message: "Condition true, skipping false branch", durationMs: 0 }); continue; } if (port === "inside" && upstreamOutput === false) { results.push({ nodeId: node.id, status: "skipped", message: "Outside geofence, skipping inside branch", durationMs: 0 }); continue; } if (port === "outside" && upstreamOutput === true) { results.push({ nodeId: node.id, status: "skipped", message: "Inside geofence, skipping outside branch", durationMs: 0 }); continue; } if (port === "in-window" && upstreamOutput === false) { results.push({ nodeId: node.id, status: "skipped", message: "Outside time window", durationMs: 0 }); continue; } if (port === "match" && upstreamOutput === null) { results.push({ nodeId: node.id, status: "skipped", message: "Data filter: no match", durationMs: 0 }); continue; } if (port === "no-match" && upstreamOutput !== null) { results.push({ nodeId: node.id, status: "skipped", message: "Data filter: matched", durationMs: 0 }); continue; } } // Use upstream output as input if (upstreamOutput !== undefined) inputData = upstreamOutput; } try { const result = await executeWorkflowNode(node, inputData, space); const durationMs = Date.now() - startMs; nodeOutputs.set(node.id, result.outputData); results.push({ nodeId: node.id, status: result.success ? "success" : "error", message: result.message, durationMs, outputData: result.outputData, }); } catch (e: any) { results.push({ nodeId: node.id, status: "error", message: e.message || String(e), durationMs: Date.now() - startMs, }); } } return results; } // POST /api/workflows/:id/run — manual execute routes.post("/api/workflows/:id/run", async (c) => { const space = c.req.param("space") || "demo"; const dataSpace = (c.get("effectiveSpace" as any) as string) || space; const id = c.req.param("id"); const docId = scheduleDocId(dataSpace); const doc = ensureDoc(dataSpace); const wf = doc.workflows?.[id]; if (!wf) return c.json({ error: "Workflow not found" }, 404); const results = await executeWorkflow(wf, dataSpace); const allOk = results.every(r => r.status !== "error"); _syncServer!.changeDoc(docId, `run workflow ${id}`, (d) => { const w = d.workflows[id]; if (!w) return; w.lastRunAt = Date.now(); w.lastRunStatus = allOk ? "success" : "error"; w.runCount = (w.runCount || 0) + 1; w.updatedAt = Date.now(); }); return c.json({ success: allOk, results }); }); // POST /api/workflows/webhook/:hookId — external webhook trigger routes.post("/api/workflows/webhook/:hookId", async (c) => { const space = c.req.param("space") || "demo"; const dataSpace = (c.get("effectiveSpace" as any) as string) || space; const hookId = c.req.param("hookId"); const doc = ensureDoc(dataSpace); let payload: unknown = {}; try { payload = await c.req.json(); } catch { /* empty payload */ } // Find workflows with a trigger-webhook node matching this hookId const matches: Workflow[] = []; for (const wf of Object.values(doc.workflows || {})) { if (!wf.enabled) continue; for (const node of wf.nodes) { if (node.type === "trigger-webhook" && node.config.hookId === hookId) { matches.push(wf); break; } } } if (matches.length === 0) return c.json({ error: "No matching workflow" }, 404); const allResults: { workflowId: string; results: NodeResult[] }[] = []; for (const wf of matches) { const results = await executeWorkflow(wf, dataSpace, payload); allResults.push({ workflowId: wf.id, results }); } return c.json({ triggered: matches.length, results: allResults }); }); // ── Demo workflow seeds ── function seedDemoWorkflows(space: string) { const docId = scheduleDocId(space); const doc = ensureDoc(space); if (Object.keys(doc.workflows || {}).length > 0) return; const now = Date.now(); const demo1Id = "demo-arriving-home"; const demo1: Workflow = { id: demo1Id, name: "Arriving Home Notification", enabled: false, nodes: [ { id: "n1", type: "trigger-proximity", label: "Location Proximity", position: { x: 50, y: 100 }, config: { lat: "49.2827", lng: "-123.1207", radiusKm: "1" } }, { id: "n2", type: "condition-geofence", label: "Geofence Check", position: { x: 350, y: 100 }, config: { centerLat: "49.2827", centerLng: "-123.1207", radiusKm: "2" } }, { id: "n3", type: "action-send-email", label: "Notify Family", position: { x: 650, y: 80 }, config: { to: "family@example.com", subject: "Almost home!", bodyTemplate: "

I'll be home in about {{distance}}km.

" } }, ], edges: [ { id: "e1", fromNode: "n1", fromPort: "trigger", toNode: "n2", toPort: "trigger" }, { id: "e2", fromNode: "n1", fromPort: "distance", toNode: "n2", toPort: "coords" }, { id: "e3", fromNode: "n2", fromPort: "inside", toNode: "n3", toPort: "trigger" }, ], lastRunAt: null, lastRunStatus: null, runCount: 0, createdAt: now, updatedAt: now, }; const demo2Id = "demo-signoff-pipeline"; const demo2: Workflow = { id: demo2Id, name: "Document Sign-off Pipeline", enabled: false, nodes: [ { id: "n1", type: "trigger-data-change", label: "Watch Sign-offs", position: { x: 50, y: 100 }, config: { module: "rnotes", field: "status" } }, { id: "n2", type: "condition-compare", label: "Status = Signed", position: { x: 350, y: 100 }, config: { operator: "equals", compareValue: "signed" } }, { id: "n3", type: "action-create-event", label: "Schedule Review", position: { x: 650, y: 60 }, config: { title: "Document Review Meeting", durationMinutes: "30" } }, { id: "n4", type: "action-send-notification", label: "Notify Comms", position: { x: 650, y: 180 }, config: { title: "Sign-off Complete", message: "Document has been signed off", level: "success" } }, ], edges: [ { id: "e1", fromNode: "n1", fromPort: "trigger", toNode: "n2", toPort: "trigger" }, { id: "e2", fromNode: "n1", fromPort: "data", toNode: "n2", toPort: "value" }, { id: "e3", fromNode: "n2", fromPort: "true", toNode: "n3", toPort: "trigger" }, { id: "e4", fromNode: "n2", fromPort: "true", toNode: "n4", toPort: "trigger" }, ], lastRunAt: null, lastRunStatus: null, runCount: 0, createdAt: now, updatedAt: now, }; _syncServer!.changeDoc(docId, "seed demo workflows", (d) => { if (!d.workflows) d.workflows = {} as any; (d.workflows as any)[demo1Id] = demo1; (d.workflows as any)[demo2Id] = demo2; }); console.log(`[Schedule] Seeded 2 demo workflows for space "${space}"`); } // ── Module export ── export const scheduleModule: RSpaceModule = { id: "rschedule", name: "rSchedule", icon: "⏱", description: "Persistent cron-based job scheduling with email, webhooks, and backlog briefings", scoping: { defaultScope: "global", userConfigurable: false }, docSchemas: [ { pattern: "{space}:schedule:jobs", description: "Scheduled jobs and execution log", init: scheduleSchema.init, }, ], routes, landingPage: renderLanding, seedTemplate: seedDefaultJobs, async onInit(ctx) { _syncServer = ctx.syncServer; seedDefaultJobs("demo"); seedDemoWorkflows("demo"); startTickLoop(); }, feeds: [ { id: "executions", name: "Executions", kind: "data", description: "Job execution events with status, timing, and output", }, ], acceptsFeeds: ["data", "governance"], outputPaths: [ { path: "jobs", name: "Jobs", icon: "⏱", description: "Scheduled jobs and their configurations" }, { path: "reminders", name: "Reminders", icon: "🔔", description: "Scheduled reminders with email notifications" }, { path: "workflows", name: "Automations", icon: "🔀", description: "Visual automation workflows with triggers, conditions, and actions" }, { path: "log", name: "Execution Log", icon: "📋", description: "History of job executions" }, ], };