/** * Notification Service — Core notification dispatch + WebSocket delivery. * * Modules call `notify()` to persist a notification to PostgreSQL and * attempt real-time delivery via WebSocket. The WS registry tracks * which user DIDs have active connections. */ import type { ServerWebSocket } from "bun"; import webpush from "web-push"; import { createNotification, getUnreadCount, markNotificationDelivered, listSpaceMembers, getUserPushSubscriptions, deletePushSubscriptionByEndpoint, updatePushSubscriptionLastUsed, getNotificationPreferences, getProfileEmailsByDids, type StoredNotification, } from "../src/encryptid/db"; // ── VAPID setup ── const VAPID_PUBLIC_KEY = process.env.VAPID_PUBLIC_KEY; const VAPID_PRIVATE_KEY = process.env.VAPID_PRIVATE_KEY; const VAPID_SUBJECT = process.env.VAPID_SUBJECT || "mailto:admin@rspace.online"; if (VAPID_PUBLIC_KEY && VAPID_PRIVATE_KEY) { webpush.setVapidDetails(VAPID_SUBJECT, VAPID_PUBLIC_KEY, VAPID_PRIVATE_KEY); console.log("[push] VAPID configured"); } else { console.warn("[push] VAPID keys not set — Web Push disabled"); } // ── SMTP setup (email delivery) ── const SMTP_HOST = process.env.SMTP_HOST || "mail.rmail.online"; const SMTP_PORT = Number(process.env.SMTP_PORT) || 587; const SMTP_USER = process.env.SMTP_USER || ""; const SMTP_PASS = process.env.SMTP_PASS || ""; let _smtpTransport: any = null; async function getSmtpTransport() { if (_smtpTransport) return _smtpTransport; if (!SMTP_PASS) return null; try { const nodemailer = await import("nodemailer"); const createTransport = (nodemailer as any).default?.createTransport || nodemailer.createTransport; _smtpTransport = createTransport({ host: SMTP_HOST, port: SMTP_PORT, secure: SMTP_PORT === 465, auth: SMTP_USER ? { user: SMTP_USER, pass: SMTP_PASS } : undefined, tls: { rejectUnauthorized: false }, }); console.log("[email] SMTP transport configured"); return _smtpTransport; } catch (e) { console.warn("[email] Failed to create SMTP transport:", e); return null; } } // ============================================================================ // TYPES // ============================================================================ export type NotificationCategory = 'space' | 'module' | 'system' | 'social'; export type NotificationEventType = // Space | 'access_request' | 'access_approved' | 'access_denied' | 'member_joined' | 'member_left' | 'role_changed' | 'nest_request' | 'nest_created' | 'space_invite' // Module | 'inbox_new_mail' | 'inbox_approval_needed' | 'choices_result' | 'notes_shared' | 'canvas_mention' // System | 'guardian_invite' | 'guardian_accepted' | 'recovery_initiated' | 'recovery_approved' | 'device_linked' | 'security_alert' // Social | 'mention' | 'ping_user' // Delegation | 'delegation_received' | 'delegation_revoked' | 'delegation_expired' // Commitment (rTime) | 'commitment_requested' | 'commitment_accepted' | 'commitment_declined'; export interface NotifyOptions { userDid: string; category: NotificationCategory; eventType: NotificationEventType; title: string; body?: string; spaceSlug?: string; moduleId?: string; actionUrl?: string; actorDid?: string; actorUsername?: string; metadata?: Record; expiresAt?: Date; } // ============================================================================ // WS CONNECTION REGISTRY // ============================================================================ const userConnections = new Map>>(); export function registerUserConnection(userDid: string, ws: ServerWebSocket): void { let conns = userConnections.get(userDid); if (!conns) { conns = new Set(); userConnections.set(userDid, conns); } conns.add(ws); } export function unregisterUserConnection(userDid: string, ws: ServerWebSocket): void { const conns = userConnections.get(userDid); if (!conns) return; conns.delete(ws); if (conns.size === 0) userConnections.delete(userDid); } // ============================================================================ // CORE DISPATCH // ============================================================================ export async function notify(opts: NotifyOptions): Promise { const id = crypto.randomUUID(); // 1. Persist to DB const stored = await createNotification({ id, userDid: opts.userDid, category: opts.category, eventType: opts.eventType, title: opts.title, body: opts.body, spaceSlug: opts.spaceSlug, moduleId: opts.moduleId, actionUrl: opts.actionUrl, actorDid: opts.actorDid, actorUsername: opts.actorUsername, metadata: opts.metadata, expiresAt: opts.expiresAt, }); // 2. Attempt WS delivery const conns = userConnections.get(opts.userDid); if (conns && conns.size > 0) { const unreadCount = await getUnreadCount(opts.userDid); const payload = JSON.stringify({ type: "notification", notification: { id: stored.id, category: stored.category, eventType: stored.eventType, title: stored.title, body: stored.body, spaceSlug: stored.spaceSlug, actorUsername: stored.actorUsername, actionUrl: stored.actionUrl, metadata: stored.metadata, createdAt: stored.createdAt, }, unreadCount, }); let delivered = false; for (const ws of conns) { try { if (ws.readyState === WebSocket.OPEN) { ws.send(payload); delivered = true; } } catch { // Connection may have closed between check and send } } if (delivered) { await markNotificationDelivered(stored.id, 'ws'); } } // 3. Attempt Web Push delivery (non-blocking) if (VAPID_PUBLIC_KEY && VAPID_PRIVATE_KEY) { sendWebPush(stored, opts).catch(() => {}); } // 4. Attempt email delivery (non-blocking) sendEmailNotification(stored, opts).catch(() => {}); return stored; } /** Send Web Push to all of a user's subscriptions. */ async function sendWebPush(stored: StoredNotification, opts: NotifyOptions): Promise { // Check if user has push enabled const prefs = await getNotificationPreferences(opts.userDid); if (prefs && !prefs.pushEnabled) return; const subs = await getUserPushSubscriptions(opts.userDid); if (subs.length === 0) return; const pushPayload = JSON.stringify({ title: stored.title, body: stored.body || "", icon: "/icons/icon-192.png", badge: "/icons/icon-192.png", tag: `${stored.category}-${stored.eventType}`, data: { url: stored.actionUrl || "/", notificationId: stored.id, }, }); let anyDelivered = false; await Promise.allSettled( subs.map(async (sub) => { try { await webpush.sendNotification( { endpoint: sub.endpoint, keys: { p256dh: sub.keyP256dh, auth: sub.keyAuth }, }, pushPayload, ); anyDelivered = true; updatePushSubscriptionLastUsed(sub.id).catch(() => {}); } catch (err: any) { // 404/410 = subscription expired, clean up if (err?.statusCode === 404 || err?.statusCode === 410) { await deletePushSubscriptionByEndpoint(sub.endpoint); } } }), ); if (anyDelivered) { await markNotificationDelivered(stored.id, 'push'); } } /** Send email notification from {space}-agent@rspace.online. */ async function sendEmailNotification(stored: StoredNotification, opts: NotifyOptions): Promise { // Check if user has email enabled const prefs = await getNotificationPreferences(opts.userDid); if (prefs && !prefs.emailEnabled) return; // Look up user's email const emailMap = await getProfileEmailsByDids([opts.userDid]); const userEmail = emailMap.get(opts.userDid); if (!userEmail) return; const transport = await getSmtpTransport(); if (!transport) return; const space = opts.spaceSlug || "rspace"; const fromAddr = `${space} agent <${space}-agent@rspace.online>`; const actionLink = opts.actionUrl ? `https://${space}.rspace.online${opts.actionUrl}` : `https://${space}.rspace.online`; const html = `

${escapeHtml(stored.title)}

${stored.body ? `

${escapeHtml(stored.body)}

` : ""} View in rSpace

Sent by ${space}.rspace.online

`; try { await transport.sendMail({ from: fromAddr, to: userEmail, subject: stored.title, html, replyTo: `${space}-agent@rspace.online`, }); await markNotificationDelivered(stored.id, 'email'); console.log(`[email] Sent "${stored.title}" to ${opts.userDid}`); } catch (err: any) { console.error(`[email] Failed to send to ${opts.userDid}:`, err.message); } } function escapeHtml(s: string): string { return s.replace(/&/g, "&").replace(//g, ">").replace(/"/g, """); } // ============================================================================ // CONVENIENCE: NOTIFY SPACE ADMINS/MODS // ============================================================================ export async function notifySpaceAdmins( spaceSlug: string, opts: Omit, ): Promise { const members = await listSpaceMembers(spaceSlug); const targets = members.filter(m => m.role === 'admin' || m.role === 'moderator'); await Promise.all( targets.map(m => notify({ ...opts, userDid: m.userDID })), ); }