/** * Canvas-wide pub/sub event bus. * * Shapes emit named events; other shapes subscribe to channels * and receive payloads via `onEventReceived()`. * * Events are persisted in the Automerge CRDT doc (bounded ring buffer, * last 100 entries) so remote users see them replayed via sync. * * Clock events are ephemeral — delivered via WebSocket but NOT persisted. */ import type { CommunitySync } from "./community-sync"; // ── Types ── export interface EventEntry { id: string; channel: string; sourceShapeId: string; payload: unknown; timestamp: number; } /** Shapes implement this to receive broadcast events. */ export interface EventReceiver { onEventReceived(channel: string, payload: unknown, sourceShapeId: string): void; } // ── Constants ── const MAX_LOG_SIZE = 100; const MAX_REENTRY_DEPTH = 10; const LOCAL_CLOCK_INTERVAL = 60_000; // 60s fallback tick // ── CanvasEventBus ── export class CanvasEventBus extends EventTarget { #sync: CommunitySync; #processedIds = new Set(); #reentryDepth = 0; #localClockTimer: ReturnType | null = null; #serverClockActive = false; #serverClockTimeout: ReturnType | null = null; constructor(sync: CommunitySync) { super(); this.#sync = sync; // When remote Automerge changes arrive, check for new events sync.addEventListener("eventlog-changed", () => { this.#processRemoteEvents(); }); // Handle ephemeral clock events from server sync.addEventListener("clock-event", ((e: CustomEvent) => { this.#handleServerClock(e.detail.channel, e.detail.payload); }) as EventListener); // Start local fallback clock (will be suppressed when server clock is active) this.#startLocalClock(); } /** * Emit an event on a named channel. * Writes to CRDT eventLog and delivers to local subscribers immediately. */ emit(channel: string, sourceShapeId: string, payload: unknown): void { if (this.#reentryDepth >= MAX_REENTRY_DEPTH) { console.warn( `[EventBus] Re-entrancy limit (${MAX_REENTRY_DEPTH}) reached, dropping:`, channel, ); return; } const entry: EventEntry = { id: crypto.randomUUID(), channel, sourceShapeId, payload, timestamp: Date.now(), }; // Persist to CRDT (ring-buffered) this.#sync.appendEvent(entry); // Mark as processed so we don't re-deliver from remote sync this.#processedIds.add(entry.id); // Deliver to local subscribers immediately this.#deliverEvent(entry); } /** * Subscribe a shape to one or more channels. * Subscriptions are stored in the CRDT so they sync across peers. */ subscribe(shapeId: string, channels: string[]): void { const current = this.#sync.getShapeSubscriptions(shapeId); const merged = Array.from(new Set([...current, ...channels])); this.#sync.setShapeSubscriptions(shapeId, merged); } /** * Unsubscribe a shape from channels (or all channels if none specified). */ unsubscribe(shapeId: string, channels?: string[]): void { if (!channels) { this.#sync.setShapeSubscriptions(shapeId, []); return; } const current = this.#sync.getShapeSubscriptions(shapeId); const updated = current.filter((c) => !channels.includes(c)); this.#sync.setShapeSubscriptions(shapeId, updated); } /** * Get all shape IDs subscribed to a given channel. */ getSubscribers(channel: string): string[] { return this.#sync.getShapesSubscribedTo(channel); } /** Clean up timers. */ destroy(): void { this.#stopLocalClock(); if (this.#serverClockTimeout) { clearTimeout(this.#serverClockTimeout); this.#serverClockTimeout = null; } } // ── Clock handling ── /** * Handle an ephemeral clock event from the server. * Resets the server-active flag so the local fallback stays suppressed. */ #handleServerClock(channel: string, payload: unknown): void { this.#serverClockActive = true; // Reset server-alive timeout (2.5x the tick interval) if (this.#serverClockTimeout) clearTimeout(this.#serverClockTimeout); this.#serverClockTimeout = setTimeout(() => { this.#serverClockActive = false; console.log("[EventBus] Server clock lost, local fallback active"); }, LOCAL_CLOCK_INTERVAL * 2.5); // Deliver as ephemeral event (no sourceShapeId — it's the system) this.#deliverEvent({ id: `clock-${Date.now()}`, channel, sourceShapeId: "__system__", payload, timestamp: Date.now(), }); } /** * Local fallback clock — fires when server clock is not active. */ #startLocalClock(): void { if (this.#localClockTimer) return; this.#localClockTimer = setInterval(() => { if (this.#serverClockActive) return; // Server is authoritative const now = new Date(); this.#deliverEvent({ id: `local-clock-${Date.now()}`, channel: "clock:tick", sourceShapeId: "__local__", payload: { timestamp: now.getTime(), isoString: now.toISOString(), hour: now.getUTCHours(), minute: now.getUTCMinutes(), second: now.getUTCSeconds(), dayOfWeek: now.getUTCDay(), tickCount: -1, // Local fallback marker }, timestamp: now.getTime(), }); }, LOCAL_CLOCK_INTERVAL); } #stopLocalClock(): void { if (this.#localClockTimer) { clearInterval(this.#localClockTimer); this.#localClockTimer = null; } } // ── Event processing ── /** * Process events that arrived via remote Automerge sync. * Skips any events we already delivered locally. */ #processRemoteEvents(): void { const log = this.#sync.getEventLog(); if (!log || log.length === 0) return; for (const entry of log) { if (this.#processedIds.has(entry.id)) continue; this.#processedIds.add(entry.id); this.#deliverEvent(entry); } // Prune tracked IDs to match log size (avoid unbounded growth) if (this.#processedIds.size > MAX_LOG_SIZE * 2) { const logIds = new Set(log.map((e) => e.id)); for (const id of this.#processedIds) { if (!logIds.has(id)) this.#processedIds.delete(id); } } } /** * Deliver a single event to all subscribed shapes. * Re-entrancy guard prevents infinite chains. */ #deliverEvent(entry: EventEntry): void { const subscribers = this.getSubscribers(entry.channel); if (subscribers.length === 0) return; this.#reentryDepth++; try { for (const shapeId of subscribers) { // Don't deliver back to emitter (skip for system clock) if (shapeId === entry.sourceShapeId) continue; const shape = this.#sync.getShapeElement(shapeId); if ( shape && typeof (shape as unknown as EventReceiver).onEventReceived === "function" ) { try { (shape as unknown as EventReceiver).onEventReceived( entry.channel, entry.payload, entry.sourceShapeId, ); } catch (err) { console.error( `[EventBus] Error in ${shapeId}.onEventReceived(${entry.channel}):`, err, ); } } } } finally { this.#reentryDepth--; } // Dispatch a DOM event for any external listeners this.dispatchEvent( new CustomEvent("event-delivered", { detail: entry }), ); } }