feat: canvas event broadcasting pub/sub system (TASK-43)

Add CanvasEventBus with emit/subscribe/unsubscribe for cross-shape
communication. Events persisted in Automerge CRDT eventLog (ring buffer
100 entries), synced via WebSocket, with re-entrancy guard at depth 10.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Jeff Emmett 2026-03-11 16:14:30 -07:00
parent c049d7e8df
commit efb5810b31
5 changed files with 282 additions and 7 deletions

View File

@ -1,9 +1,10 @@
---
id: TASK-43
title: 'Implement Event Broadcasting: canvas-wide pub/sub system'
status: To Do
status: Done
assignee: []
created_date: '2026-02-18 20:06'
updated_date: '2026-03-11 23:14'
labels:
- feature
- phase-2
@ -39,10 +40,18 @@ Example: Timer emits "timer:done" → all subscribed Budget shapes recalculate.
## Acceptance Criteria
<!-- AC:BEGIN -->
- [ ] #1 CanvasEventBus emits events to CRDT eventLog
- [ ] #2 Shapes can subscribe to channels and receive events
- [ ] #3 Events sync to remote users via Automerge
- [ ] #4 Ring buffer bounded at 100 entries with GC
- [ ] #5 Re-entrancy guard prevents infinite event loops
- [ ] #6 Works offline (events queued in CRDT, replayed on reconnect)
- [x] #1 CanvasEventBus emits events to CRDT eventLog
- [x] #2 Shapes can subscribe to channels and receive events
- [x] #3 Events sync to remote users via Automerge
- [x] #4 Ring buffer bounded at 100 entries with GC
- [x] #5 Re-entrancy guard prevents infinite event loops
- [x] #6 Works offline (events queued in CRDT, replayed on reconnect)
<!-- AC:END -->
## Implementation Notes
<!-- SECTION:NOTES:BEGIN -->
Implementation started: CanvasEventBus class, CommunityDoc schema updates, CommunitySync helper methods
Complete. Created lib/event-bus.ts with CanvasEventBus class. Updated CommunityDoc with eventLog field, ShapeData with subscriptions field. Added appendEvent(), getEventLog(), setShapeSubscriptions(), getShapeSubscriptions(), getShapesSubscribedTo(), getShapeElement() methods to CommunitySync. Added eventlog-changed dispatch in both patch and full-sync paths. Added onEventReceived() optional method on FolkShape. Exported from lib/index.ts.
<!-- SECTION:NOTES:END -->

View File

@ -10,6 +10,7 @@ import type {
import { computeMembranePermeability } from "./connection-types";
import { makeChangeMessage, parseChangeMessage } from "../shared/local-first/change-message";
import type { HistoryEntry } from "../shared/components/rstack-history-panel";
import type { EventEntry } from "./event-bus";
// Shape data stored in Automerge document
export interface ShapeData {
@ -40,6 +41,8 @@ export interface ShapeData {
targetPort?: string;
// Shape port values (for data piping)
ports?: Record<string, unknown>;
// Event bus subscriptions (channel names this shape listens to)
subscriptions?: string[];
// Allow arbitrary shape-specific properties from toJSON()
[key: string]: unknown;
}
@ -111,6 +114,8 @@ export interface CommunityDoc {
activeLayerId?: string;
/** Layer view mode: flat (tabs) or stack (side view) */
layerViewMode?: "flat" | "stack";
/** Pub/sub event log — bounded ring buffer (last 100 entries) */
eventLog?: EventEntry[];
}
type SyncState = Automerge.SyncState;
@ -770,6 +775,11 @@ export class CommunitySync extends EventTarget {
}
}
// Notify event bus if there are any events to process
if (this.#doc.eventLog && this.#doc.eventLog.length > 0) {
this.dispatchEvent(new CustomEvent("eventlog-changed"));
}
// Debounce the synced event — during initial sync negotiation, #applyDocToDOM()
// is called for every Automerge sync message (100+ round-trips). Debounce to
// fire once after the burst settles.
@ -785,9 +795,17 @@ export class CommunitySync extends EventTarget {
* Three-state: forgotten shapes stay in DOM (faded), deleted shapes are removed.
*/
#applyPatchesToDOM(patches: Automerge.Patch[]): void {
let eventLogChanged = false;
for (const patch of patches) {
const path = patch.path;
// Detect eventLog changes for the event bus
if (path[0] === "eventLog") {
eventLogChanged = true;
continue;
}
// Handle shape updates: ["shapes", shapeId, ...]
if (path[0] === "shapes" && typeof path[1] === "string") {
const shapeId = path[1];
@ -824,6 +842,11 @@ export class CommunitySync extends EventTarget {
}
}
}
// Notify event bus of new remote events
if (eventLogChanged) {
this.dispatchEvent(new CustomEvent("eventlog-changed"));
}
}
/**
@ -1124,6 +1147,66 @@ export class CommunitySync extends EventTarget {
return computeMembranePermeability(this.getConnections(), this.#communitySlug);
}
// ── Event Bus API ──
/** Max event log entries before oldest are pruned. */
static readonly MAX_EVENT_LOG = 100;
/** Append an event entry to the CRDT eventLog (ring-buffered). */
appendEvent(entry: EventEntry): void {
this.#doc = Automerge.change(this.#doc, makeChangeMessage(`Event: ${entry.channel}`), (doc) => {
if (!doc.eventLog) doc.eventLog = [];
doc.eventLog.push(entry as any);
// Ring buffer: prune oldest entries beyond limit
while (doc.eventLog.length > CommunitySync.MAX_EVENT_LOG) {
doc.eventLog.splice(0, 1);
}
});
this.#scheduleSave();
this.#syncToServer();
}
/** Read the current event log. */
getEventLog(): EventEntry[] {
return (this.#doc.eventLog as EventEntry[] | undefined) || [];
}
/** Set event subscriptions for a shape. */
setShapeSubscriptions(shapeId: string, channels: string[]): void {
if (!this.#doc.shapes?.[shapeId]) return;
this.#doc = Automerge.change(this.#doc, makeChangeMessage(`Subscribe ${shapeId}`), (doc) => {
if (doc.shapes[shapeId]) {
(doc.shapes[shapeId] as Record<string, unknown>).subscriptions = channels;
}
});
this.#scheduleSave();
this.#syncToServer();
}
/** Get event subscriptions for a shape. */
getShapeSubscriptions(shapeId: string): string[] {
const data = this.#doc.shapes?.[shapeId] as Record<string, unknown> | undefined;
return (data?.subscriptions as string[] | undefined) || [];
}
/** Get all shape IDs subscribed to a given channel. */
getShapesSubscribedTo(channel: string): string[] {
const shapes = this.#doc.shapes || {};
const result: string[] = [];
for (const [id, data] of Object.entries(shapes)) {
const subs = (data as Record<string, unknown>).subscriptions as string[] | undefined;
if (subs && subs.includes(channel)) {
result.push(id);
}
}
return result;
}
/** Get a shape DOM element by ID (for event delivery). */
getShapeElement(shapeId: string): FolkShape | undefined {
return this.#shapes.get(shapeId);
}
// ── History API ──
/**

176
lib/event-bus.ts Normal file
View File

@ -0,0 +1,176 @@
/**
* Canvas-wide pub/sub event bus.
*
* Shapes emit named events; other shapes subscribe to channels
* and receive payloads via `onEventReceived()`.
*
* Events are persisted in the Automerge CRDT doc (bounded ring buffer,
* last 100 entries) so remote users see them replayed via sync.
*/
import type { CommunitySync } from "./community-sync";
// ── Types ──
export interface EventEntry {
id: string;
channel: string;
sourceShapeId: string;
payload: unknown;
timestamp: number;
}
/** Shapes implement this to receive broadcast events. */
export interface EventReceiver {
onEventReceived(channel: string, payload: unknown, sourceShapeId: string): void;
}
// ── Constants ──
const MAX_LOG_SIZE = 100;
const MAX_REENTRY_DEPTH = 10;
// ── CanvasEventBus ──
export class CanvasEventBus extends EventTarget {
#sync: CommunitySync;
#processedIds = new Set<string>();
#reentryDepth = 0;
constructor(sync: CommunitySync) {
super();
this.#sync = sync;
// When remote Automerge changes arrive, check for new events
sync.addEventListener("eventlog-changed", () => {
this.#processRemoteEvents();
});
}
/**
* Emit an event on a named channel.
* Writes to CRDT eventLog and delivers to local subscribers immediately.
*/
emit(channel: string, sourceShapeId: string, payload: unknown): void {
if (this.#reentryDepth >= MAX_REENTRY_DEPTH) {
console.warn(
`[EventBus] Re-entrancy limit (${MAX_REENTRY_DEPTH}) reached, dropping:`,
channel,
);
return;
}
const entry: EventEntry = {
id: crypto.randomUUID(),
channel,
sourceShapeId,
payload,
timestamp: Date.now(),
};
// Persist to CRDT (ring-buffered)
this.#sync.appendEvent(entry);
// Mark as processed so we don't re-deliver from remote sync
this.#processedIds.add(entry.id);
// Deliver to local subscribers immediately
this.#deliverEvent(entry);
}
/**
* Subscribe a shape to one or more channels.
* Subscriptions are stored in the CRDT so they sync across peers.
*/
subscribe(shapeId: string, channels: string[]): void {
const current = this.#sync.getShapeSubscriptions(shapeId);
const merged = Array.from(new Set([...current, ...channels]));
this.#sync.setShapeSubscriptions(shapeId, merged);
}
/**
* Unsubscribe a shape from channels (or all channels if none specified).
*/
unsubscribe(shapeId: string, channels?: string[]): void {
if (!channels) {
this.#sync.setShapeSubscriptions(shapeId, []);
return;
}
const current = this.#sync.getShapeSubscriptions(shapeId);
const updated = current.filter((c) => !channels.includes(c));
this.#sync.setShapeSubscriptions(shapeId, updated);
}
/**
* Get all shape IDs subscribed to a given channel.
*/
getSubscribers(channel: string): string[] {
return this.#sync.getShapesSubscribedTo(channel);
}
/**
* Process events that arrived via remote Automerge sync.
* Skips any events we already delivered locally.
*/
#processRemoteEvents(): void {
const log = this.#sync.getEventLog();
if (!log || log.length === 0) return;
for (const entry of log) {
if (this.#processedIds.has(entry.id)) continue;
this.#processedIds.add(entry.id);
this.#deliverEvent(entry);
}
// Prune tracked IDs to match log size (avoid unbounded growth)
if (this.#processedIds.size > MAX_LOG_SIZE * 2) {
const logIds = new Set(log.map((e) => e.id));
for (const id of this.#processedIds) {
if (!logIds.has(id)) this.#processedIds.delete(id);
}
}
}
/**
* Deliver a single event to all subscribed shapes.
* Re-entrancy guard prevents infinite chains.
*/
#deliverEvent(entry: EventEntry): void {
const subscribers = this.getSubscribers(entry.channel);
if (subscribers.length === 0) return;
this.#reentryDepth++;
try {
for (const shapeId of subscribers) {
// Don't deliver back to emitter
if (shapeId === entry.sourceShapeId) continue;
const shape = this.#sync.getShapeElement(shapeId);
if (
shape &&
typeof (shape as unknown as EventReceiver).onEventReceived === "function"
) {
try {
(shape as unknown as EventReceiver).onEventReceived(
entry.channel,
entry.payload,
entry.sourceShapeId,
);
} catch (err) {
console.error(
`[EventBus] Error in ${shapeId}.onEventReceived(${entry.channel}):`,
err,
);
}
}
}
} finally {
this.#reentryDepth--;
}
// Dispatch a DOM event for any external listeners
this.dispatchEvent(
new CustomEvent("event-delivered", { detail: entry }),
);
}
}

View File

@ -849,6 +849,12 @@ export class FolkShape extends FolkElement {
return (this.constructor as typeof FolkShape).portDescriptors.filter(p => p.direction === "output");
}
/**
* Called by CanvasEventBus when a subscribed channel fires.
* Override in subclasses to handle broadcast events.
*/
onEventReceived?(channel: string, payload: unknown, sourceShapeId: string): void;
/**
* Serialize shape to JSON for Automerge sync
* Subclasses should override and call super.toJSON()

View File

@ -92,6 +92,7 @@ export * from "./shape-registry";
// Sync
export * from "./community-sync";
export * from "./presence";
export * from "./event-bus";
// Offline support
export * from "./offline-store";