diff --git a/backlog/tasks/task-43 - Implement-Event-Broadcasting-canvas-wide-pub-sub-system.md b/backlog/tasks/task-43 - Implement-Event-Broadcasting-canvas-wide-pub-sub-system.md index c2a64cf..50c1b70 100644 --- a/backlog/tasks/task-43 - Implement-Event-Broadcasting-canvas-wide-pub-sub-system.md +++ b/backlog/tasks/task-43 - Implement-Event-Broadcasting-canvas-wide-pub-sub-system.md @@ -1,9 +1,10 @@ --- id: TASK-43 title: 'Implement Event Broadcasting: canvas-wide pub/sub system' -status: To Do +status: Done assignee: [] created_date: '2026-02-18 20:06' +updated_date: '2026-03-11 23:14' labels: - feature - phase-2 @@ -39,10 +40,18 @@ Example: Timer emits "timer:done" → all subscribed Budget shapes recalculate. ## Acceptance Criteria -- [ ] #1 CanvasEventBus emits events to CRDT eventLog -- [ ] #2 Shapes can subscribe to channels and receive events -- [ ] #3 Events sync to remote users via Automerge -- [ ] #4 Ring buffer bounded at 100 entries with GC -- [ ] #5 Re-entrancy guard prevents infinite event loops -- [ ] #6 Works offline (events queued in CRDT, replayed on reconnect) +- [x] #1 CanvasEventBus emits events to CRDT eventLog +- [x] #2 Shapes can subscribe to channels and receive events +- [x] #3 Events sync to remote users via Automerge +- [x] #4 Ring buffer bounded at 100 entries with GC +- [x] #5 Re-entrancy guard prevents infinite event loops +- [x] #6 Works offline (events queued in CRDT, replayed on reconnect) + +## Implementation Notes + + +Implementation started: CanvasEventBus class, CommunityDoc schema updates, CommunitySync helper methods + +Complete. Created lib/event-bus.ts with CanvasEventBus class. Updated CommunityDoc with eventLog field, ShapeData with subscriptions field. Added appendEvent(), getEventLog(), setShapeSubscriptions(), getShapeSubscriptions(), getShapesSubscribedTo(), getShapeElement() methods to CommunitySync. Added eventlog-changed dispatch in both patch and full-sync paths. Added onEventReceived() optional method on FolkShape. Exported from lib/index.ts. + diff --git a/lib/community-sync.ts b/lib/community-sync.ts index 5cdd32a..f831c0a 100644 --- a/lib/community-sync.ts +++ b/lib/community-sync.ts @@ -10,6 +10,7 @@ import type { import { computeMembranePermeability } from "./connection-types"; import { makeChangeMessage, parseChangeMessage } from "../shared/local-first/change-message"; import type { HistoryEntry } from "../shared/components/rstack-history-panel"; +import type { EventEntry } from "./event-bus"; // Shape data stored in Automerge document export interface ShapeData { @@ -40,6 +41,8 @@ export interface ShapeData { targetPort?: string; // Shape port values (for data piping) ports?: Record; + // Event bus subscriptions (channel names this shape listens to) + subscriptions?: string[]; // Allow arbitrary shape-specific properties from toJSON() [key: string]: unknown; } @@ -111,6 +114,8 @@ export interface CommunityDoc { activeLayerId?: string; /** Layer view mode: flat (tabs) or stack (side view) */ layerViewMode?: "flat" | "stack"; + /** Pub/sub event log — bounded ring buffer (last 100 entries) */ + eventLog?: EventEntry[]; } type SyncState = Automerge.SyncState; @@ -770,6 +775,11 @@ export class CommunitySync extends EventTarget { } } + // Notify event bus if there are any events to process + if (this.#doc.eventLog && this.#doc.eventLog.length > 0) { + this.dispatchEvent(new CustomEvent("eventlog-changed")); + } + // Debounce the synced event — during initial sync negotiation, #applyDocToDOM() // is called for every Automerge sync message (100+ round-trips). Debounce to // fire once after the burst settles. @@ -785,9 +795,17 @@ export class CommunitySync extends EventTarget { * Three-state: forgotten shapes stay in DOM (faded), deleted shapes are removed. */ #applyPatchesToDOM(patches: Automerge.Patch[]): void { + let eventLogChanged = false; + for (const patch of patches) { const path = patch.path; + // Detect eventLog changes for the event bus + if (path[0] === "eventLog") { + eventLogChanged = true; + continue; + } + // Handle shape updates: ["shapes", shapeId, ...] if (path[0] === "shapes" && typeof path[1] === "string") { const shapeId = path[1]; @@ -824,6 +842,11 @@ export class CommunitySync extends EventTarget { } } } + + // Notify event bus of new remote events + if (eventLogChanged) { + this.dispatchEvent(new CustomEvent("eventlog-changed")); + } } /** @@ -1124,6 +1147,66 @@ export class CommunitySync extends EventTarget { return computeMembranePermeability(this.getConnections(), this.#communitySlug); } + // ── Event Bus API ── + + /** Max event log entries before oldest are pruned. */ + static readonly MAX_EVENT_LOG = 100; + + /** Append an event entry to the CRDT eventLog (ring-buffered). */ + appendEvent(entry: EventEntry): void { + this.#doc = Automerge.change(this.#doc, makeChangeMessage(`Event: ${entry.channel}`), (doc) => { + if (!doc.eventLog) doc.eventLog = []; + doc.eventLog.push(entry as any); + // Ring buffer: prune oldest entries beyond limit + while (doc.eventLog.length > CommunitySync.MAX_EVENT_LOG) { + doc.eventLog.splice(0, 1); + } + }); + this.#scheduleSave(); + this.#syncToServer(); + } + + /** Read the current event log. */ + getEventLog(): EventEntry[] { + return (this.#doc.eventLog as EventEntry[] | undefined) || []; + } + + /** Set event subscriptions for a shape. */ + setShapeSubscriptions(shapeId: string, channels: string[]): void { + if (!this.#doc.shapes?.[shapeId]) return; + this.#doc = Automerge.change(this.#doc, makeChangeMessage(`Subscribe ${shapeId}`), (doc) => { + if (doc.shapes[shapeId]) { + (doc.shapes[shapeId] as Record).subscriptions = channels; + } + }); + this.#scheduleSave(); + this.#syncToServer(); + } + + /** Get event subscriptions for a shape. */ + getShapeSubscriptions(shapeId: string): string[] { + const data = this.#doc.shapes?.[shapeId] as Record | undefined; + return (data?.subscriptions as string[] | undefined) || []; + } + + /** Get all shape IDs subscribed to a given channel. */ + getShapesSubscribedTo(channel: string): string[] { + const shapes = this.#doc.shapes || {}; + const result: string[] = []; + for (const [id, data] of Object.entries(shapes)) { + const subs = (data as Record).subscriptions as string[] | undefined; + if (subs && subs.includes(channel)) { + result.push(id); + } + } + return result; + } + + /** Get a shape DOM element by ID (for event delivery). */ + getShapeElement(shapeId: string): FolkShape | undefined { + return this.#shapes.get(shapeId); + } + // ── History API ── /** diff --git a/lib/event-bus.ts b/lib/event-bus.ts new file mode 100644 index 0000000..2f772b6 --- /dev/null +++ b/lib/event-bus.ts @@ -0,0 +1,176 @@ +/** + * Canvas-wide pub/sub event bus. + * + * Shapes emit named events; other shapes subscribe to channels + * and receive payloads via `onEventReceived()`. + * + * Events are persisted in the Automerge CRDT doc (bounded ring buffer, + * last 100 entries) so remote users see them replayed via sync. + */ + +import type { CommunitySync } from "./community-sync"; + +// ── Types ── + +export interface EventEntry { + id: string; + channel: string; + sourceShapeId: string; + payload: unknown; + timestamp: number; +} + +/** Shapes implement this to receive broadcast events. */ +export interface EventReceiver { + onEventReceived(channel: string, payload: unknown, sourceShapeId: string): void; +} + +// ── Constants ── + +const MAX_LOG_SIZE = 100; +const MAX_REENTRY_DEPTH = 10; + +// ── CanvasEventBus ── + +export class CanvasEventBus extends EventTarget { + #sync: CommunitySync; + #processedIds = new Set(); + #reentryDepth = 0; + + constructor(sync: CommunitySync) { + super(); + this.#sync = sync; + + // When remote Automerge changes arrive, check for new events + sync.addEventListener("eventlog-changed", () => { + this.#processRemoteEvents(); + }); + } + + /** + * Emit an event on a named channel. + * Writes to CRDT eventLog and delivers to local subscribers immediately. + */ + emit(channel: string, sourceShapeId: string, payload: unknown): void { + if (this.#reentryDepth >= MAX_REENTRY_DEPTH) { + console.warn( + `[EventBus] Re-entrancy limit (${MAX_REENTRY_DEPTH}) reached, dropping:`, + channel, + ); + return; + } + + const entry: EventEntry = { + id: crypto.randomUUID(), + channel, + sourceShapeId, + payload, + timestamp: Date.now(), + }; + + // Persist to CRDT (ring-buffered) + this.#sync.appendEvent(entry); + + // Mark as processed so we don't re-deliver from remote sync + this.#processedIds.add(entry.id); + + // Deliver to local subscribers immediately + this.#deliverEvent(entry); + } + + /** + * Subscribe a shape to one or more channels. + * Subscriptions are stored in the CRDT so they sync across peers. + */ + subscribe(shapeId: string, channels: string[]): void { + const current = this.#sync.getShapeSubscriptions(shapeId); + const merged = Array.from(new Set([...current, ...channels])); + this.#sync.setShapeSubscriptions(shapeId, merged); + } + + /** + * Unsubscribe a shape from channels (or all channels if none specified). + */ + unsubscribe(shapeId: string, channels?: string[]): void { + if (!channels) { + this.#sync.setShapeSubscriptions(shapeId, []); + return; + } + const current = this.#sync.getShapeSubscriptions(shapeId); + const updated = current.filter((c) => !channels.includes(c)); + this.#sync.setShapeSubscriptions(shapeId, updated); + } + + /** + * Get all shape IDs subscribed to a given channel. + */ + getSubscribers(channel: string): string[] { + return this.#sync.getShapesSubscribedTo(channel); + } + + /** + * Process events that arrived via remote Automerge sync. + * Skips any events we already delivered locally. + */ + #processRemoteEvents(): void { + const log = this.#sync.getEventLog(); + if (!log || log.length === 0) return; + + for (const entry of log) { + if (this.#processedIds.has(entry.id)) continue; + this.#processedIds.add(entry.id); + this.#deliverEvent(entry); + } + + // Prune tracked IDs to match log size (avoid unbounded growth) + if (this.#processedIds.size > MAX_LOG_SIZE * 2) { + const logIds = new Set(log.map((e) => e.id)); + for (const id of this.#processedIds) { + if (!logIds.has(id)) this.#processedIds.delete(id); + } + } + } + + /** + * Deliver a single event to all subscribed shapes. + * Re-entrancy guard prevents infinite chains. + */ + #deliverEvent(entry: EventEntry): void { + const subscribers = this.getSubscribers(entry.channel); + if (subscribers.length === 0) return; + + this.#reentryDepth++; + try { + for (const shapeId of subscribers) { + // Don't deliver back to emitter + if (shapeId === entry.sourceShapeId) continue; + + const shape = this.#sync.getShapeElement(shapeId); + if ( + shape && + typeof (shape as unknown as EventReceiver).onEventReceived === "function" + ) { + try { + (shape as unknown as EventReceiver).onEventReceived( + entry.channel, + entry.payload, + entry.sourceShapeId, + ); + } catch (err) { + console.error( + `[EventBus] Error in ${shapeId}.onEventReceived(${entry.channel}):`, + err, + ); + } + } + } + } finally { + this.#reentryDepth--; + } + + // Dispatch a DOM event for any external listeners + this.dispatchEvent( + new CustomEvent("event-delivered", { detail: entry }), + ); + } +} diff --git a/lib/folk-shape.ts b/lib/folk-shape.ts index e0ebd28..4a7956e 100644 --- a/lib/folk-shape.ts +++ b/lib/folk-shape.ts @@ -849,6 +849,12 @@ export class FolkShape extends FolkElement { return (this.constructor as typeof FolkShape).portDescriptors.filter(p => p.direction === "output"); } + /** + * Called by CanvasEventBus when a subscribed channel fires. + * Override in subclasses to handle broadcast events. + */ + onEventReceived?(channel: string, payload: unknown, sourceShapeId: string): void; + /** * Serialize shape to JSON for Automerge sync * Subclasses should override and call super.toJSON() diff --git a/lib/index.ts b/lib/index.ts index 5a4d7fa..7cd6533 100644 --- a/lib/index.ts +++ b/lib/index.ts @@ -92,6 +92,7 @@ export * from "./shape-registry"; // Sync export * from "./community-sync"; export * from "./presence"; +export * from "./event-bus"; // Offline support export * from "./offline-store";