rspace-online/lib/event-bus.ts

261 lines
7.0 KiB
TypeScript

/**
* Canvas-wide pub/sub event bus.
*
* Shapes emit named events; other shapes subscribe to channels
* and receive payloads via `onEventReceived()`.
*
* Events are persisted in the Automerge CRDT doc (bounded ring buffer,
* last 100 entries) so remote users see them replayed via sync.
*
* Clock events are ephemeral — delivered via WebSocket but NOT persisted.
*/
import type { CommunitySync } from "./community-sync";
// ── Types ──
export interface EventEntry {
id: string;
channel: string;
sourceShapeId: string;
payload: unknown;
timestamp: number;
}
/** Shapes implement this to receive broadcast events. */
export interface EventReceiver {
onEventReceived(channel: string, payload: unknown, sourceShapeId: string): void;
}
// ── Constants ──
const MAX_LOG_SIZE = 100;
const MAX_REENTRY_DEPTH = 10;
const LOCAL_CLOCK_INTERVAL = 60_000; // 60s fallback tick
// ── CanvasEventBus ──
export class CanvasEventBus extends EventTarget {
#sync: CommunitySync;
#processedIds = new Set<string>();
#reentryDepth = 0;
#localClockTimer: ReturnType<typeof setInterval> | null = null;
#serverClockActive = false;
#serverClockTimeout: ReturnType<typeof setTimeout> | null = null;
constructor(sync: CommunitySync) {
super();
this.#sync = sync;
// When remote Automerge changes arrive, check for new events
sync.addEventListener("eventlog-changed", () => {
this.#processRemoteEvents();
});
// Handle ephemeral clock events from server
sync.addEventListener("clock-event", ((e: CustomEvent) => {
this.#handleServerClock(e.detail.channel, e.detail.payload);
}) as EventListener);
// Start local fallback clock (will be suppressed when server clock is active)
this.#startLocalClock();
}
/**
* Emit an event on a named channel.
* Writes to CRDT eventLog and delivers to local subscribers immediately.
*/
emit(channel: string, sourceShapeId: string, payload: unknown): void {
if (this.#reentryDepth >= MAX_REENTRY_DEPTH) {
console.warn(
`[EventBus] Re-entrancy limit (${MAX_REENTRY_DEPTH}) reached, dropping:`,
channel,
);
return;
}
const entry: EventEntry = {
id: crypto.randomUUID(),
channel,
sourceShapeId,
payload,
timestamp: Date.now(),
};
// Persist to CRDT (ring-buffered)
this.#sync.appendEvent(entry);
// Mark as processed so we don't re-deliver from remote sync
this.#processedIds.add(entry.id);
// Deliver to local subscribers immediately
this.#deliverEvent(entry);
}
/**
* Subscribe a shape to one or more channels.
* Subscriptions are stored in the CRDT so they sync across peers.
*/
subscribe(shapeId: string, channels: string[]): void {
const current = this.#sync.getShapeSubscriptions(shapeId);
const merged = Array.from(new Set([...current, ...channels]));
this.#sync.setShapeSubscriptions(shapeId, merged);
}
/**
* Unsubscribe a shape from channels (or all channels if none specified).
*/
unsubscribe(shapeId: string, channels?: string[]): void {
if (!channels) {
this.#sync.setShapeSubscriptions(shapeId, []);
return;
}
const current = this.#sync.getShapeSubscriptions(shapeId);
const updated = current.filter((c) => !channels.includes(c));
this.#sync.setShapeSubscriptions(shapeId, updated);
}
/**
* Get all shape IDs subscribed to a given channel.
*/
getSubscribers(channel: string): string[] {
return this.#sync.getShapesSubscribedTo(channel);
}
/** Clean up timers. */
destroy(): void {
this.#stopLocalClock();
if (this.#serverClockTimeout) {
clearTimeout(this.#serverClockTimeout);
this.#serverClockTimeout = null;
}
}
// ── Clock handling ──
/**
* Handle an ephemeral clock event from the server.
* Resets the server-active flag so the local fallback stays suppressed.
*/
#handleServerClock(channel: string, payload: unknown): void {
this.#serverClockActive = true;
// Reset server-alive timeout (2.5x the tick interval)
if (this.#serverClockTimeout) clearTimeout(this.#serverClockTimeout);
this.#serverClockTimeout = setTimeout(() => {
this.#serverClockActive = false;
console.log("[EventBus] Server clock lost, local fallback active");
}, LOCAL_CLOCK_INTERVAL * 2.5);
// Deliver as ephemeral event (no sourceShapeId — it's the system)
this.#deliverEvent({
id: `clock-${Date.now()}`,
channel,
sourceShapeId: "__system__",
payload,
timestamp: Date.now(),
});
}
/**
* Local fallback clock — fires when server clock is not active.
*/
#startLocalClock(): void {
if (this.#localClockTimer) return;
this.#localClockTimer = setInterval(() => {
if (this.#serverClockActive) return; // Server is authoritative
const now = new Date();
this.#deliverEvent({
id: `local-clock-${Date.now()}`,
channel: "clock:tick",
sourceShapeId: "__local__",
payload: {
timestamp: now.getTime(),
isoString: now.toISOString(),
hour: now.getUTCHours(),
minute: now.getUTCMinutes(),
second: now.getUTCSeconds(),
dayOfWeek: now.getUTCDay(),
tickCount: -1, // Local fallback marker
},
timestamp: now.getTime(),
});
}, LOCAL_CLOCK_INTERVAL);
}
#stopLocalClock(): void {
if (this.#localClockTimer) {
clearInterval(this.#localClockTimer);
this.#localClockTimer = null;
}
}
// ── Event processing ──
/**
* Process events that arrived via remote Automerge sync.
* Skips any events we already delivered locally.
*/
#processRemoteEvents(): void {
const log = this.#sync.getEventLog();
if (!log || log.length === 0) return;
for (const entry of log) {
if (this.#processedIds.has(entry.id)) continue;
this.#processedIds.add(entry.id);
this.#deliverEvent(entry);
}
// Prune tracked IDs to match log size (avoid unbounded growth)
if (this.#processedIds.size > MAX_LOG_SIZE * 2) {
const logIds = new Set(log.map((e) => e.id));
for (const id of this.#processedIds) {
if (!logIds.has(id)) this.#processedIds.delete(id);
}
}
}
/**
* Deliver a single event to all subscribed shapes.
* Re-entrancy guard prevents infinite chains.
*/
#deliverEvent(entry: EventEntry): void {
const subscribers = this.getSubscribers(entry.channel);
if (subscribers.length === 0) return;
this.#reentryDepth++;
try {
for (const shapeId of subscribers) {
// Don't deliver back to emitter (skip for system clock)
if (shapeId === entry.sourceShapeId) continue;
const shape = this.#sync.getShapeElement(shapeId);
if (
shape &&
typeof (shape as unknown as EventReceiver).onEventReceived === "function"
) {
try {
(shape as unknown as EventReceiver).onEventReceived(
entry.channel,
entry.payload,
entry.sourceShapeId,
);
} catch (err) {
console.error(
`[EventBus] Error in ${shapeId}.onEventReceived(${entry.channel}):`,
err,
);
}
}
}
} finally {
this.#reentryDepth--;
}
// Dispatch a DOM event for any external listeners
this.dispatchEvent(
new CustomEvent("event-delivered", { detail: entry }),
);
}
}