""" WebSocket manager — broadcasts real-time events to connected clients. """ import asyncio import json import logging from typing import Set from fastapi import WebSocket logger = logging.getLogger("rmesh.websocket") _clients: Set[WebSocket] = set() _loop: asyncio.AbstractEventLoop | None = None def set_event_loop(loop: asyncio.AbstractEventLoop): global _loop _loop = loop async def connect(websocket: WebSocket): await websocket.accept() _clients.add(websocket) logger.info("WebSocket client connected (%d total)", len(_clients)) def disconnect(websocket: WebSocket): _clients.discard(websocket) logger.info("WebSocket client disconnected (%d remaining)", len(_clients)) async def broadcast(data: dict): """Send event to all connected WebSocket clients.""" if not _clients: return message = json.dumps(data) dead = set() for client in _clients: try: await client.send_text(message) except Exception: dead.add(client) for client in dead: _clients.discard(client) def on_event(event: dict): """Synchronous callback that schedules broadcast on the event loop. Use this as the event_listener for reticulum_bridge and lxmf_bridge.""" if _loop is None or not _clients: return try: asyncio.run_coroutine_threadsafe(broadcast(event), _loop) except Exception: pass def client_count() -> int: return len(_clients)