rmesh-reticulum/app/websocket_manager.py

64 lines
1.5 KiB
Python

"""
WebSocket manager — broadcasts real-time events to connected clients.
"""
import asyncio
import json
import logging
from typing import Set
from fastapi import WebSocket
logger = logging.getLogger("rmesh.websocket")
_clients: Set[WebSocket] = set()
_loop: asyncio.AbstractEventLoop | None = None
def set_event_loop(loop: asyncio.AbstractEventLoop):
global _loop
_loop = loop
async def connect(websocket: WebSocket):
await websocket.accept()
_clients.add(websocket)
logger.info("WebSocket client connected (%d total)", len(_clients))
def disconnect(websocket: WebSocket):
_clients.discard(websocket)
logger.info("WebSocket client disconnected (%d remaining)", len(_clients))
async def broadcast(data: dict):
"""Send event to all connected WebSocket clients."""
if not _clients:
return
message = json.dumps(data)
dead = set()
for client in _clients:
try:
await client.send_text(message)
except Exception:
dead.add(client)
for client in dead:
_clients.discard(client)
def on_event(event: dict):
"""Synchronous callback that schedules broadcast on the event loop.
Use this as the event_listener for reticulum_bridge and lxmf_bridge."""
if _loop is None or not _clients:
return
try:
asyncio.run_coroutine_threadsafe(broadcast(event), _loop)
except Exception:
pass
def client_count() -> int:
return len(_clients)