/** * Canvas-wide pub/sub event bus. * * Shapes emit named events; other shapes subscribe to channels * and receive payloads via `onEventReceived()`. * * Events are persisted in the Automerge CRDT doc (bounded ring buffer, * last 100 entries) so remote users see them replayed via sync. */ import type { CommunitySync } from "./community-sync"; // ── Types ── export interface EventEntry { id: string; channel: string; sourceShapeId: string; payload: unknown; timestamp: number; } /** Shapes implement this to receive broadcast events. */ export interface EventReceiver { onEventReceived(channel: string, payload: unknown, sourceShapeId: string): void; } // ── Constants ── const MAX_LOG_SIZE = 100; const MAX_REENTRY_DEPTH = 10; // ── CanvasEventBus ── export class CanvasEventBus extends EventTarget { #sync: CommunitySync; #processedIds = new Set(); #reentryDepth = 0; constructor(sync: CommunitySync) { super(); this.#sync = sync; // When remote Automerge changes arrive, check for new events sync.addEventListener("eventlog-changed", () => { this.#processRemoteEvents(); }); } /** * Emit an event on a named channel. * Writes to CRDT eventLog and delivers to local subscribers immediately. */ emit(channel: string, sourceShapeId: string, payload: unknown): void { if (this.#reentryDepth >= MAX_REENTRY_DEPTH) { console.warn( `[EventBus] Re-entrancy limit (${MAX_REENTRY_DEPTH}) reached, dropping:`, channel, ); return; } const entry: EventEntry = { id: crypto.randomUUID(), channel, sourceShapeId, payload, timestamp: Date.now(), }; // Persist to CRDT (ring-buffered) this.#sync.appendEvent(entry); // Mark as processed so we don't re-deliver from remote sync this.#processedIds.add(entry.id); // Deliver to local subscribers immediately this.#deliverEvent(entry); } /** * Subscribe a shape to one or more channels. * Subscriptions are stored in the CRDT so they sync across peers. */ subscribe(shapeId: string, channels: string[]): void { const current = this.#sync.getShapeSubscriptions(shapeId); const merged = Array.from(new Set([...current, ...channels])); this.#sync.setShapeSubscriptions(shapeId, merged); } /** * Unsubscribe a shape from channels (or all channels if none specified). */ unsubscribe(shapeId: string, channels?: string[]): void { if (!channels) { this.#sync.setShapeSubscriptions(shapeId, []); return; } const current = this.#sync.getShapeSubscriptions(shapeId); const updated = current.filter((c) => !channels.includes(c)); this.#sync.setShapeSubscriptions(shapeId, updated); } /** * Get all shape IDs subscribed to a given channel. */ getSubscribers(channel: string): string[] { return this.#sync.getShapesSubscribedTo(channel); } /** * Process events that arrived via remote Automerge sync. * Skips any events we already delivered locally. */ #processRemoteEvents(): void { const log = this.#sync.getEventLog(); if (!log || log.length === 0) return; for (const entry of log) { if (this.#processedIds.has(entry.id)) continue; this.#processedIds.add(entry.id); this.#deliverEvent(entry); } // Prune tracked IDs to match log size (avoid unbounded growth) if (this.#processedIds.size > MAX_LOG_SIZE * 2) { const logIds = new Set(log.map((e) => e.id)); for (const id of this.#processedIds) { if (!logIds.has(id)) this.#processedIds.delete(id); } } } /** * Deliver a single event to all subscribed shapes. * Re-entrancy guard prevents infinite chains. */ #deliverEvent(entry: EventEntry): void { const subscribers = this.getSubscribers(entry.channel); if (subscribers.length === 0) return; this.#reentryDepth++; try { for (const shapeId of subscribers) { // Don't deliver back to emitter if (shapeId === entry.sourceShapeId) continue; const shape = this.#sync.getShapeElement(shapeId); if ( shape && typeof (shape as unknown as EventReceiver).onEventReceived === "function" ) { try { (shape as unknown as EventReceiver).onEventReceived( entry.channel, entry.payload, entry.sourceShapeId, ); } catch (err) { console.error( `[EventBus] Error in ${shapeId}.onEventReceived(${entry.channel}):`, err, ); } } } } finally { this.#reentryDepth--; } // Dispatch a DOM event for any external listeners this.dispatchEvent( new CustomEvent("event-delivered", { detail: entry }), ); } }