diff --git a/app/config.py b/app/config.py index 03f4a5e..f5a207b 100644 --- a/app/config.py +++ b/app/config.py @@ -4,12 +4,23 @@ BRIDGE_API_KEY = os.environ.get("BRIDGE_API_KEY", "") RNS_CONFIG_DIR = os.environ.get("RNS_CONFIG_DIR", "/data/reticulum") LXMF_STORAGE_DIR = os.environ.get("LXMF_STORAGE_DIR", "/data/lxmf") LXMF_MESSAGES_FILE = os.path.join(LXMF_STORAGE_DIR, "bridge_messages.json") +LXMF_ATTACHMENTS_DIR = os.path.join(LXMF_STORAGE_DIR, "attachments") LOG_LEVEL = int(os.environ.get("RNS_LOG_LEVEL", "4")) # MeshCore companion node connection MESHCORE_ENABLED = os.environ.get("MESHCORE_ENABLED", "false").lower() == "true" -MESHCORE_HOST = os.environ.get("MESHCORE_HOST", "") # TCP host (WiFi companion) +MESHCORE_HOST = os.environ.get("MESHCORE_HOST", "") MESHCORE_PORT = int(os.environ.get("MESHCORE_PORT", "5000")) -MESHCORE_SERIAL = os.environ.get("MESHCORE_SERIAL", "") # Serial port (USB companion) +MESHCORE_SERIAL = os.environ.get("MESHCORE_SERIAL", "") MESHCORE_DATA_DIR = os.environ.get("MESHCORE_DATA_DIR", "/data/meshcore") MESHCORE_MESSAGES_FILE = os.path.join(MESHCORE_DATA_DIR, "messages.json") + +# Feature flags +AUTO_RESEND_ON_ANNOUNCE = os.environ.get("AUTO_RESEND_ON_ANNOUNCE", "true").lower() == "true" +LOCAL_PROPAGATION_NODE = os.environ.get("LOCAL_PROPAGATION_NODE", "true").lower() == "true" +PREFERRED_PROPAGATION_NODE = os.environ.get("PREFERRED_PROPAGATION_NODE", "") +PROPAGATION_SYNC_INTERVAL = int(os.environ.get("PROPAGATION_SYNC_INTERVAL", "300")) + +# rChats bridge +RCHATS_BRIDGE_ENABLED = os.environ.get("RCHATS_BRIDGE_ENABLED", "false").lower() == "true" +RCHATS_INTERNAL_URL = os.environ.get("RCHATS_INTERNAL_URL", "http://rchats:3000") diff --git a/app/lxmf_bridge.py b/app/lxmf_bridge.py index ac916c5..52960c8 100644 --- a/app/lxmf_bridge.py +++ b/app/lxmf_bridge.py @@ -1,7 +1,9 @@ """ -LXMF bridge — message send/receive via the LXMF protocol on Reticulum. +LXMF bridge — rich message handling, propagation node management, +auto-resend on announce, file/image/audio attachments. """ +import base64 import json import time import uuid @@ -12,7 +14,11 @@ import os import RNS import LXMF -from .config import LXMF_STORAGE_DIR, LXMF_MESSAGES_FILE +from .config import ( + LXMF_STORAGE_DIR, LXMF_MESSAGES_FILE, LXMF_ATTACHMENTS_DIR, + AUTO_RESEND_ON_ANNOUNCE, LOCAL_PROPAGATION_NODE, + PREFERRED_PROPAGATION_NODE, PROPAGATION_SYNC_INTERVAL, +) logger = logging.getLogger("rmesh.lxmf") @@ -21,6 +27,9 @@ _local_delivery_destination: RNS.Destination | None = None _identity: RNS.Identity | None = None _messages: list[dict] = [] _lock = threading.Lock() +_event_listeners: list = [] # WebSocket broadcast callbacks +_propagation_nodes: dict[str, dict] = {} +_last_propagation_sync: float = 0 def init(identity: RNS.Identity): @@ -30,16 +39,30 @@ def init(identity: RNS.Identity): _identity = identity os.makedirs(LXMF_STORAGE_DIR, exist_ok=True) + os.makedirs(LXMF_ATTACHMENTS_DIR, exist_ok=True) _router = LXMF.LXMRouter( identity=_identity, storagepath=LXMF_STORAGE_DIR, ) - # Enable propagation node - _router.enable_propagation() + # Increase transfer limit for attachments (10 MB) + _router.delivery_per_transfer_limit = 10000 - # Register local delivery destination for receiving messages + # Enable local propagation node + if LOCAL_PROPAGATION_NODE: + _router.enable_propagation() + logger.info("Local LXMF propagation node enabled") + + # Set preferred propagation node + if PREFERRED_PROPAGATION_NODE: + try: + _router.set_outbound_propagation_node(bytes.fromhex(PREFERRED_PROPAGATION_NODE)) + logger.info("Set preferred propagation node: %s", PREFERRED_PROPAGATION_NODE[:16]) + except Exception as e: + logger.warning("Failed to set propagation node: %s", e) + + # Register local delivery _local_delivery_destination = _router.register_delivery_identity( _identity, display_name="rMesh Bridge", @@ -47,14 +70,152 @@ def init(identity: RNS.Identity): _router.register_delivery_callback(_delivery_callback) - # Load persisted messages + # Register announce handlers for auto-resend and propagation tracking + RNS.Transport.register_announce_handler(_LxmfAnnounceHandler()) + RNS.Transport.register_announce_handler(_PropagationAnnounceHandler()) + _load_messages() + # Start propagation sync thread + if PROPAGATION_SYNC_INTERVAL > 0: + sync_thread = threading.Thread(target=_propagation_sync_loop, daemon=True) + sync_thread.start() + logger.info("LXMF bridge ready — delivery hash: %s", _identity.hexhash) +def register_event_listener(callback): + """Register a callback for real-time events (WebSocket broadcast).""" + _event_listeners.append(callback) + + +def _emit_event(event_type: str, data: dict): + """Emit event to all registered listeners.""" + event = {"type": event_type, **data} + for listener in _event_listeners: + try: + listener(event) + except Exception: + pass + + +# ═══════════════════════════════════════════════════════════════════ +# LXMF Rich Field Parsing +# ═══════════════════════════════════════════════════════════════════ + +def _parse_lxmf_fields(lxmf_message) -> dict: + """Extract all LXMF fields from a message into a serializable dict.""" + fields = {} + + try: + lxmf_fields = lxmf_message.get_fields() + except Exception: + return fields + + # Image attachment + if LXMF.FIELD_IMAGE in lxmf_fields: + try: + image_data = lxmf_fields[LXMF.FIELD_IMAGE] + image_type = image_data[0] if len(image_data) > 0 else "unknown" + image_bytes = image_data[1] if len(image_data) > 1 else b"" + if image_bytes: + attachment_id = str(uuid.uuid4()) + ext = "jpg" if "jpeg" in str(image_type).lower() or "jpg" in str(image_type).lower() else "png" + filename = f"{attachment_id}.{ext}" + filepath = os.path.join(LXMF_ATTACHMENTS_DIR, filename) + with open(filepath, "wb") as f: + f.write(image_bytes if isinstance(image_bytes, bytes) else bytes(image_bytes)) + fields["image"] = { + "type": str(image_type), + "filename": filename, + "size": len(image_bytes), + "base64": base64.b64encode(image_bytes if isinstance(image_bytes, bytes) else bytes(image_bytes)).decode("ascii")[:200] + "...", + } + except Exception as e: + logger.warning("Failed to parse image field: %s", e) + + # File attachments + if LXMF.FIELD_FILE_ATTACHMENTS in lxmf_fields: + try: + file_list = lxmf_fields[LXMF.FIELD_FILE_ATTACHMENTS] + attachments = [] + for file_data in file_list: + file_name = file_data[0] if len(file_data) > 0 else "unknown" + file_bytes = file_data[1] if len(file_data) > 1 else b"" + if file_bytes: + safe_name = "".join(c for c in str(file_name) if c.isalnum() or c in ".-_") + attachment_id = str(uuid.uuid4()) + stored_name = f"{attachment_id}_{safe_name}" + filepath = os.path.join(LXMF_ATTACHMENTS_DIR, stored_name) + with open(filepath, "wb") as f: + f.write(file_bytes if isinstance(file_bytes, bytes) else bytes(file_bytes)) + attachments.append({ + "name": str(file_name), + "stored_name": stored_name, + "size": len(file_bytes), + }) + if attachments: + fields["file_attachments"] = attachments + except Exception as e: + logger.warning("Failed to parse file attachments: %s", e) + + # Audio message (Codec2) + if LXMF.FIELD_AUDIO in lxmf_fields: + try: + audio_data = lxmf_fields[LXMF.FIELD_AUDIO] + audio_mode = audio_data[0] if len(audio_data) > 0 else 0 + audio_bytes = audio_data[1] if len(audio_data) > 1 else b"" + if audio_bytes: + attachment_id = str(uuid.uuid4()) + filename = f"{attachment_id}.c2" + filepath = os.path.join(LXMF_ATTACHMENTS_DIR, filename) + with open(filepath, "wb") as f: + f.write(audio_bytes if isinstance(audio_bytes, bytes) else bytes(audio_bytes)) + fields["audio"] = { + "codec": "codec2", + "mode": audio_mode, + "filename": filename, + "size": len(audio_bytes), + } + except Exception as e: + logger.warning("Failed to parse audio field: %s", e) + + # Icon appearance + if LXMF.FIELD_ICON_APPEARANCE in lxmf_fields: + try: + icon_data = lxmf_fields[LXMF.FIELD_ICON_APPEARANCE] + fields["icon"] = { + "name": str(icon_data[0]) if len(icon_data) > 0 else "", + "foreground": "#" + icon_data[1].hex() if len(icon_data) > 1 and hasattr(icon_data[1], "hex") else "", + "background": "#" + icon_data[2].hex() if len(icon_data) > 2 and hasattr(icon_data[2], "hex") else "", + } + except Exception as e: + logger.warning("Failed to parse icon field: %s", e) + + # Commands + if LXMF.FIELD_COMMANDS in lxmf_fields: + try: + fields["commands"] = [str(cmd) for cmd in lxmf_fields[LXMF.FIELD_COMMANDS]] + except Exception: + pass + + return fields + + +# ═══════════════════════════════════════════════════════════════════ +# Message Handling +# ═══════════════════════════════════════════════════════════════════ + def _delivery_callback(message): - """Handle incoming LXMF message.""" + """Handle incoming LXMF message with rich field extraction.""" + # Parse rich fields + fields = _parse_lxmf_fields(message) + + # Get signal quality if available + rssi = getattr(message, "rssi", None) + snr = getattr(message, "snr", None) + quality = getattr(message, "quality", None) + msg_record = { "id": str(uuid.uuid4()), "direction": "inbound", @@ -62,6 +223,10 @@ def _delivery_callback(message): "recipient_hash": _identity.hexhash if _identity else "", "title": message.title.decode("utf-8") if isinstance(message.title, bytes) else str(message.title or ""), "content": message.content.decode("utf-8") if isinstance(message.content, bytes) else str(message.content or ""), + "fields": fields, + "rssi": rssi, + "snr": snr, + "quality": quality, "status": "delivered", "timestamp": time.time(), } @@ -70,11 +235,19 @@ def _delivery_callback(message): _messages.append(msg_record) _save_messages() - logger.info("Received LXMF message from %s", msg_record["sender_hash"][:16]) + logger.info("Received LXMF message from %s (fields: %s)", + msg_record["sender_hash"][:16], + list(fields.keys()) if fields else "none") + + _emit_event("lxmf.delivery", {"message": msg_record}) -def send_message(destination_hash_hex: str, content: str, title: str = "") -> dict: - """Send an LXMF message to a destination hash.""" +def send_message(destination_hash_hex: str, content: str, title: str = "", + image_bytes: bytes = None, image_type: str = "image/jpeg", + file_attachments: list = None, + audio_bytes: bytes = None, audio_mode: int = 0, + try_propagation_on_fail: bool = True) -> dict: + """Send an LXMF message with optional attachments.""" if _router is None or _identity is None: return {"id": "", "status": "failed"} @@ -83,72 +256,82 @@ def send_message(destination_hash_hex: str, content: str, title: str = "") -> di except ValueError: return {"id": "", "status": "failed"} - # Look up identity for destination dest_identity = RNS.Identity.recall(dest_hash) - msg_id = str(uuid.uuid4()) if dest_identity is None: - # Request path first, the message may be deliverable later RNS.Transport.request_path(dest_hash) msg_record = { - "id": msg_id, - "direction": "outbound", - "sender_hash": _identity.hexhash, - "recipient_hash": destination_hash_hex, - "title": title, - "content": content, - "status": "pending", - "timestamp": time.time(), + "id": msg_id, "direction": "outbound", + "sender_hash": _identity.hexhash, "recipient_hash": destination_hash_hex, + "title": title, "content": content, "fields": {}, + "status": "pending", "timestamp": time.time(), } with _lock: _messages.append(msg_record) _save_messages() return msg_record - # Create destination for the recipient dest = RNS.Destination( dest_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, "lxmf", "delivery", ) lxm = LXMF.LXMessage( - dest, - _router.get_delivery_destination(), + dest, _router.get_delivery_destination(), content.encode("utf-8") if isinstance(content, str) else content, title=title.encode("utf-8") if isinstance(title, str) else title, desired_method=LXMF.LXMessage.DIRECT, ) + # Add rich fields + if image_bytes: + lxm.fields[LXMF.FIELD_IMAGE] = [image_type, image_bytes] + if file_attachments: + lxm.fields[LXMF.FIELD_FILE_ATTACHMENTS] = [ + [att["name"], att["bytes"]] for att in file_attachments + ] + if audio_bytes: + lxm.fields[LXMF.FIELD_AUDIO] = [audio_mode, audio_bytes] + + # Enable propagation fallback + lxm.try_propagation_on_fail = try_propagation_on_fail + msg_record = { - "id": msg_id, - "direction": "outbound", - "sender_hash": _identity.hexhash, - "recipient_hash": destination_hash_hex, - "title": title, - "content": content, - "status": "pending", - "timestamp": time.time(), + "id": msg_id, "direction": "outbound", + "sender_hash": _identity.hexhash, "recipient_hash": destination_hash_hex, + "title": title, "content": content, "fields": {}, + "status": "pending", "timestamp": time.time(), } - def _delivery_callback_outbound(message): + def _on_delivered(message): with _lock: for m in _messages: if m["id"] == msg_id: m["status"] = "delivered" break _save_messages() + _emit_event("lxmf.state_updated", {"message_id": msg_id, "status": "delivered"}) + + def _on_failed(message): + # Try propagation node if direct failed + if getattr(message, "try_propagation_on_fail", False) and _router.outbound_propagation_node: + logger.info("Direct delivery failed, trying propagation node for %s", destination_hash_hex[:16]) + message.desired_method = LXMF.LXMessage.PROPAGATED + message.try_propagation_on_fail = False + _router.handle_outbound(message) + return - def _failed_callback(message): with _lock: for m in _messages: if m["id"] == msg_id: m["status"] = "failed" break _save_messages() + _emit_event("lxmf.state_updated", {"message_id": msg_id, "status": "failed"}) - lxm.delivery_callback = _delivery_callback_outbound - lxm.failed_callback = _failed_callback + lxm.delivery_callback = _on_delivered + lxm.failed_callback = _on_failed _router.handle_outbound(lxm) @@ -156,18 +339,159 @@ def send_message(destination_hash_hex: str, content: str, title: str = "") -> di _messages.append(msg_record) _save_messages() + _emit_event("lxmf.created", {"message": msg_record}) return msg_record +# ═══════════════════════════════════════════════════════════════════ +# Auto-Resend Failed Messages on Announce +# ═══════════════════════════════════════════════════════════════════ + +class _LxmfAnnounceHandler: + """Re-attempt failed messages when a peer announces.""" + aspect_filter = "lxmf.delivery" + + def received_announce(self, destination_hash, announced_identity, app_data, announce_packet_hash=None): + if not AUTO_RESEND_ON_ANNOUNCE: + return + + dest_hex = destination_hash.hex() + resend_count = 0 + + with _lock: + for msg in _messages: + if (msg["status"] == "failed" and + msg["direction"] == "outbound" and + msg["recipient_hash"] == dest_hex): + msg["status"] = "pending" + resend_count += 1 + # Re-send in background + threading.Thread( + target=_resend_message, args=(msg,), daemon=True + ).start() + + if resend_count > 0: + _save_messages() + + if resend_count > 0: + logger.info("Auto-resending %d failed messages to %s", resend_count, dest_hex[:16]) + + +def _resend_message(msg_record: dict): + """Resend a previously failed message.""" + time.sleep(1) # Brief delay to let path establish + result = send_message( + msg_record["recipient_hash"], + msg_record["content"], + msg_record.get("title", ""), + ) + if result.get("status") == "pending": + # Remove the duplicate — original is being retried + with _lock: + _messages[:] = [m for m in _messages if m["id"] != result["id"]] + _save_messages() + + +# ═══════════════════════════════════════════════════════════════════ +# Propagation Node Management +# ═══════════════════════════════════════════════════════════════════ + +class _PropagationAnnounceHandler: + """Track LXMF propagation node announces.""" + aspect_filter = "lxmf.propagation" + + def received_announce(self, destination_hash, announced_identity, app_data, announce_packet_hash=None): + dest_hex = destination_hash.hex() + node_info = { + "destination_hash": dest_hex, + "identity_hash": announced_identity.hexhash if announced_identity else "", + "last_heard": time.time(), + "enabled": True, + "per_transfer_limit": 0, + } + + # Parse propagation node app data + if app_data: + try: + import RNS.vendor.umsgpack as msgpack + data = msgpack.unpackb(app_data) + if isinstance(data, dict): + node_info["enabled"] = data.get("enabled", True) + node_info["per_transfer_limit"] = data.get("per_transfer_limit", 0) + except Exception: + pass + + _propagation_nodes[dest_hex] = node_info + _emit_event("propagation.announce", {"node": node_info}) + + +def get_propagation_nodes() -> list[dict]: + """Return list of known propagation nodes.""" + return list(_propagation_nodes.values()) + + +def set_preferred_propagation_node(destination_hash_hex: str) -> bool: + """Set the preferred outbound propagation node.""" + if _router is None: + return False + try: + if destination_hash_hex: + _router.set_outbound_propagation_node(bytes.fromhex(destination_hash_hex)) + else: + _router.outbound_propagation_node = None + return True + except Exception as e: + logger.warning("Failed to set propagation node: %s", e) + return False + + +def get_propagation_status() -> dict: + """Return propagation node status.""" + if _router is None: + return {"enabled": False, "outbound_node": None, "transfer_state": "idle"} + + outbound = _router.get_outbound_propagation_node() + return { + "local_enabled": LOCAL_PROPAGATION_NODE, + "outbound_node": outbound.hex() if outbound else None, + "transfer_state": str(getattr(_router, "propagation_transfer_state", "unknown")), + "known_nodes": len(_propagation_nodes), + "last_sync": _last_propagation_sync, + } + + +def sync_propagation_messages(): + """Request messages from the propagation node.""" + global _last_propagation_sync + if _router is None or _identity is None: + return + try: + _router.request_messages_from_propagation_node(_identity) + _last_propagation_sync = time.time() + logger.info("Syncing messages from propagation node") + except Exception as e: + logger.warning("Propagation sync failed: %s", e) + + +def _propagation_sync_loop(): + """Background thread to periodically sync propagation messages.""" + while True: + time.sleep(PROPAGATION_SYNC_INTERVAL) + if _router and _router.get_outbound_propagation_node(): + sync_propagation_messages() + + +# ═══════════════════════════════════════════════════════════════════ +# Query / Persistence +# ═══════════════════════════════════════════════════════════════════ + def get_messages(limit: int = 100, offset: int = 0) -> list[dict]: - """Return stored messages.""" with _lock: sorted_msgs = sorted(_messages, key=lambda m: m["timestamp"], reverse=True) return sorted_msgs[offset:offset + limit] def get_message(msg_id: str) -> dict | None: - """Return a single message by ID.""" with _lock: for m in _messages: if m["id"] == msg_id: @@ -176,13 +500,19 @@ def get_message(msg_id: str) -> dict | None: def get_total_count() -> int: - """Return total message count.""" with _lock: return len(_messages) +def get_attachment_path(filename: str) -> str | None: + """Return full path to an attachment file if it exists.""" + filepath = os.path.join(LXMF_ATTACHMENTS_DIR, filename) + if os.path.exists(filepath): + return filepath + return None + + def _load_messages(): - """Load messages from persistent storage.""" global _messages if os.path.exists(LXMF_MESSAGES_FILE): try: @@ -195,7 +525,6 @@ def _load_messages(): def _save_messages(): - """Persist messages to storage (call with _lock held).""" try: os.makedirs(os.path.dirname(LXMF_MESSAGES_FILE), exist_ok=True) with open(LXMF_MESSAGES_FILE, "w") as f: diff --git a/app/main.py b/app/main.py index 92aa6b5..c3749fd 100644 --- a/app/main.py +++ b/app/main.py @@ -1,18 +1,19 @@ """ -rMesh Bridge — FastAPI service exposing Reticulum (Internet backbone) -and MeshCore (LoRa mesh) as a unified REST API. +rMesh Bridge — Unified REST + WebSocket API for Reticulum (Internet backbone), +MeshCore (LoRa mesh), LXMF messaging, audio calls, and NomadNet browsing. """ +import asyncio import logging from contextlib import asynccontextmanager -from typing import Optional -from fastapi import FastAPI, HTTPException, Header +from fastapi import FastAPI, HTTPException, Header, WebSocket, WebSocketDisconnect +from fastapi.responses import FileResponse from pydantic import BaseModel -from typing import Annotated +from typing import Annotated, Optional -from . import reticulum_bridge, lxmf_bridge, meshcore_bridge -from .config import BRIDGE_API_KEY, MESHCORE_ENABLED +from . import reticulum_bridge, lxmf_bridge, meshcore_bridge, websocket_manager, rchats_bridge +from .config import BRIDGE_API_KEY, MESHCORE_ENABLED, RCHATS_BRIDGE_ENABLED from .models import ( StatusResponse, NodesResponse, TopologyResponse, MessageIn, MessageOut, MessagesResponse, @@ -25,9 +26,22 @@ logger = logging.getLogger("rmesh.api") @asynccontextmanager async def lifespan(app: FastAPI): - """Initialize Reticulum and MeshCore on startup.""" + """Initialize all subsystems on startup.""" logger.info("Starting rMesh bridge...") + # Set up WebSocket event loop + loop = asyncio.get_event_loop() + websocket_manager.set_event_loop(loop) + + # Register WebSocket broadcast as event listener for all subsystems + reticulum_bridge.register_event_listener(websocket_manager.on_event) + lxmf_bridge.register_event_listener(websocket_manager.on_event) + + # Register rChats bridge as event listener + if RCHATS_BRIDGE_ENABLED: + lxmf_bridge.register_event_listener(rchats_bridge.on_lxmf_event) + rchats_bridge.init() + # Reticulum (Internet backbone) reticulum_bridge.init() from .reticulum_bridge import _identity @@ -36,6 +50,7 @@ async def lifespan(app: FastAPI): else: logger.error("No Reticulum identity available for LXMF") reticulum_bridge.announce() + reticulum_bridge.announce_call_capability() # MeshCore (LoRa mesh) if MESHCORE_ENABLED: @@ -60,6 +75,25 @@ def verify_api_key(x_bridge_api_key: Annotated[str, Header()] = ""): raise HTTPException(status_code=401, detail="Invalid API key") +# ═══════════════════════════════════════════════════════════════════ +# WebSocket (real-time events) +# ═══════════════════════════════════════════════════════════════════ + +@app.websocket("/ws") +async def websocket_endpoint(websocket: WebSocket): + await websocket_manager.connect(websocket) + try: + while True: + data = await websocket.receive_text() + # Handle ping/pong + if data == "ping": + await websocket.send_text('{"type":"pong"}') + except WebSocketDisconnect: + websocket_manager.disconnect(websocket) + except Exception: + websocket_manager.disconnect(websocket) + + # ═══════════════════════════════════════════════════════════════════ # Health & Combined Status # ═══════════════════════════════════════════════════════════════════ @@ -72,15 +106,18 @@ async def health(): @app.get("/api/status") async def combined_status(x_bridge_api_key: Annotated[str, Header()] = ""): verify_api_key(x_bridge_api_key) - result = { + return { "reticulum": reticulum_bridge.get_status(), - "meshcore": meshcore_bridge.get_status() if MESHCORE_ENABLED else {"connected": False, "device_info": {}, "contact_count": 0, "message_count": 0}, + "meshcore": meshcore_bridge.get_status() if MESHCORE_ENABLED else { + "connected": False, "device_info": {}, "contact_count": 0, "message_count": 0 + }, + "propagation": lxmf_bridge.get_propagation_status(), + "websocket_clients": websocket_manager.client_count(), } - return result # ═══════════════════════════════════════════════════════════════════ -# Reticulum Endpoints (Internet backbone) +# Reticulum Endpoints # ═══════════════════════════════════════════════════════════════════ @app.get("/api/reticulum/status", response_model=StatusResponse) @@ -89,14 +126,14 @@ async def reticulum_status(x_bridge_api_key: Annotated[str, Header()] = ""): return reticulum_bridge.get_status() -@app.get("/api/reticulum/nodes", response_model=NodesResponse) +@app.get("/api/reticulum/nodes") async def reticulum_nodes(x_bridge_api_key: Annotated[str, Header()] = ""): verify_api_key(x_bridge_api_key) node_list = reticulum_bridge.get_nodes() return {"nodes": node_list, "total": len(node_list)} -@app.get("/api/reticulum/topology", response_model=TopologyResponse) +@app.get("/api/reticulum/topology") async def reticulum_topology(x_bridge_api_key: Annotated[str, Header()] = ""): verify_api_key(x_bridge_api_key) return reticulum_bridge.get_topology() @@ -114,7 +151,11 @@ async def reticulum_identity(x_bridge_api_key: Annotated[str, Header()] = ""): return reticulum_bridge.get_identity_info() -@app.get("/api/reticulum/messages", response_model=MessagesResponse) +# ═══════════════════════════════════════════════════════════════════ +# LXMF Messages (with rich fields) +# ═══════════════════════════════════════════════════════════════════ + +@app.get("/api/reticulum/messages") async def reticulum_messages( x_bridge_api_key: Annotated[str, Header()] = "", limit: int = 100, offset: int = 0, @@ -124,7 +165,7 @@ async def reticulum_messages( return {"messages": msgs, "total": lxmf_bridge.get_total_count()} -@app.post("/api/reticulum/messages", response_model=MessageOut) +@app.post("/api/reticulum/messages") async def reticulum_send_message( body: MessageIn, x_bridge_api_key: Annotated[str, Header()] = "", @@ -140,24 +181,120 @@ async def reticulum_send_message( return result -# Keep old /api/status path as alias for backwards compat -@app.get("/api/nodes", response_model=NodesResponse) -async def nodes_compat(x_bridge_api_key: Annotated[str, Header()] = ""): +@app.get("/api/reticulum/messages/{msg_id}") +async def reticulum_get_message(msg_id: str, x_bridge_api_key: Annotated[str, Header()] = ""): verify_api_key(x_bridge_api_key) - node_list = reticulum_bridge.get_nodes() - return {"nodes": node_list, "total": len(node_list)} + msg = lxmf_bridge.get_message(msg_id) + if msg is None: + raise HTTPException(status_code=404, detail="Message not found") + return msg -@app.get("/api/topology", response_model=TopologyResponse) -async def topology_compat(x_bridge_api_key: Annotated[str, Header()] = ""): +@app.get("/api/attachments/{filename}") +async def get_attachment(filename: str, x_bridge_api_key: Annotated[str, Header()] = ""): verify_api_key(x_bridge_api_key) - return reticulum_bridge.get_topology() + # Sanitize filename + safe = "".join(c for c in filename if c.isalnum() or c in ".-_") + path = lxmf_bridge.get_attachment_path(safe) + if path is None: + raise HTTPException(status_code=404, detail="Attachment not found") + return FileResponse(path) -@app.get("/api/identity", response_model=IdentityResponse) -async def identity_compat(x_bridge_api_key: Annotated[str, Header()] = ""): +# ═══════════════════════════════════════════════════════════════════ +# Propagation Node Management +# ═══════════════════════════════════════════════════════════════════ + +@app.get("/api/propagation/status") +async def propagation_status(x_bridge_api_key: Annotated[str, Header()] = ""): verify_api_key(x_bridge_api_key) - return reticulum_bridge.get_identity_info() + return lxmf_bridge.get_propagation_status() + + +@app.get("/api/propagation/nodes") +async def propagation_nodes(x_bridge_api_key: Annotated[str, Header()] = ""): + verify_api_key(x_bridge_api_key) + nodes = lxmf_bridge.get_propagation_nodes() + return {"nodes": nodes, "total": len(nodes)} + + +class PropagationNodeSet(BaseModel): + destination_hash: str + + +@app.post("/api/propagation/preferred") +async def set_preferred_propagation(body: PropagationNodeSet, x_bridge_api_key: Annotated[str, Header()] = ""): + verify_api_key(x_bridge_api_key) + ok = lxmf_bridge.set_preferred_propagation_node(body.destination_hash) + if not ok: + raise HTTPException(status_code=400, detail="Failed to set propagation node") + return {"status": "ok", "destination_hash": body.destination_hash} + + +@app.post("/api/propagation/sync") +async def sync_propagation(x_bridge_api_key: Annotated[str, Header()] = ""): + verify_api_key(x_bridge_api_key) + lxmf_bridge.sync_propagation_messages() + return {"status": "syncing"} + + +# ═══════════════════════════════════════════════════════════════════ +# Audio Calls +# ═══════════════════════════════════════════════════════════════════ + +class CallInitiate(BaseModel): + destination_hash: str + + +@app.get("/api/calls") +async def list_calls(x_bridge_api_key: Annotated[str, Header()] = ""): + verify_api_key(x_bridge_api_key) + return {"calls": reticulum_bridge.get_active_calls()} + + +@app.post("/api/calls") +async def initiate_call(body: CallInitiate, x_bridge_api_key: Annotated[str, Header()] = ""): + verify_api_key(x_bridge_api_key) + result = reticulum_bridge.initiate_call(body.destination_hash) + if not result.get("call_id"): + raise HTTPException(status_code=400, detail=result.get("error", "Call failed")) + return result + + +class CallHangup(BaseModel): + call_id: str + + +@app.post("/api/calls/hangup") +async def hangup_call(body: CallHangup, x_bridge_api_key: Annotated[str, Header()] = ""): + verify_api_key(x_bridge_api_key) + reticulum_bridge.hangup_call(body.call_id) + return {"status": "ok"} + + +# ═══════════════════════════════════════════════════════════════════ +# NomadNet Node Browser +# ═══════════════════════════════════════════════════════════════════ + +@app.get("/api/nomadnet/nodes") +async def nomadnet_nodes(x_bridge_api_key: Annotated[str, Header()] = ""): + verify_api_key(x_bridge_api_key) + nodes = reticulum_bridge.get_nomadnet_nodes() + return {"nodes": nodes, "total": len(nodes)} + + +class NomadNetBrowse(BaseModel): + destination_hash: str + path: str = "/" + + +@app.post("/api/nomadnet/browse") +async def nomadnet_browse(body: NomadNetBrowse, x_bridge_api_key: Annotated[str, Header()] = ""): + verify_api_key(x_bridge_api_key) + result = reticulum_bridge.browse_nomadnet_node(body.destination_hash, body.path) + if "error" in result: + raise HTTPException(status_code=400, detail=result["error"]) + return result # ═══════════════════════════════════════════════════════════════════ @@ -238,3 +375,26 @@ async def meshcore_advert(x_bridge_api_key: Annotated[str, Header()] = ""): async def meshcore_stats(x_bridge_api_key: Annotated[str, Header()] = ""): verify_api_key(x_bridge_api_key) return await meshcore_bridge.get_device_stats() + + +# ═══════════════════════════════════════════════════════════════════ +# Backwards Compatibility Aliases +# ═══════════════════════════════════════════════════════════════════ + +@app.get("/api/nodes") +async def nodes_compat(x_bridge_api_key: Annotated[str, Header()] = ""): + verify_api_key(x_bridge_api_key) + node_list = reticulum_bridge.get_nodes() + return {"nodes": node_list, "total": len(node_list)} + + +@app.get("/api/topology") +async def topology_compat(x_bridge_api_key: Annotated[str, Header()] = ""): + verify_api_key(x_bridge_api_key) + return reticulum_bridge.get_topology() + + +@app.get("/api/identity") +async def identity_compat(x_bridge_api_key: Annotated[str, Header()] = ""): + verify_api_key(x_bridge_api_key) + return reticulum_bridge.get_identity_info() diff --git a/app/rchats_bridge.py b/app/rchats_bridge.py new file mode 100644 index 0000000..6c93d66 --- /dev/null +++ b/app/rchats_bridge.py @@ -0,0 +1,79 @@ +""" +rChats bridge — relays LXMF and MeshCore messages to/from rChats. +Messages arriving over mesh are forwarded to rChats internal API. +Messages from rChats can be sent out over mesh. +""" + +import json +import logging +import threading +import time + +import httpx + +from .config import RCHATS_BRIDGE_ENABLED, RCHATS_INTERNAL_URL + +logger = logging.getLogger("rmesh.rchats_bridge") + + +def init(): + """Initialize the rChats bridge.""" + if not RCHATS_BRIDGE_ENABLED: + logger.info("rChats bridge disabled") + return + logger.info("rChats bridge enabled — forwarding to %s", RCHATS_INTERNAL_URL) + + +def on_lxmf_event(event: dict): + """Handle LXMF events and forward relevant ones to rChats.""" + if not RCHATS_BRIDGE_ENABLED: + return + + event_type = event.get("type", "") + + if event_type == "lxmf.delivery": + msg = event.get("message", {}) + _forward_to_rchats(msg, source="reticulum") + + elif event_type == "meshcore.message": + msg = event.get("message", {}) + _forward_to_rchats(msg, source="meshcore") + + +def _forward_to_rchats(msg: dict, source: str): + """Forward a mesh message to rChats internal API.""" + space_slug = msg.get("space_slug") + if not space_slug: + return # Only forward space-scoped messages + + payload = { + "source": source, + "sender": msg.get("sender_hash", "") or msg.get("sender", ""), + "content": msg.get("content", ""), + "title": msg.get("title", ""), + "has_image": "image" in msg.get("fields", {}), + "has_audio": "audio" in msg.get("fields", {}), + "has_files": "file_attachments" in msg.get("fields", {}), + "space": space_slug, + "timestamp": msg.get("timestamp", time.time()), + } + + threading.Thread( + target=_post_to_rchats, args=(space_slug, payload), daemon=True + ).start() + + +def _post_to_rchats(space_slug: str, payload: dict): + """POST message to rChats internal mesh-relay endpoint.""" + try: + url = f"{RCHATS_INTERNAL_URL}/api/internal/mesh-relay" + with httpx.Client(timeout=10) as client: + response = client.post(url, json=payload) + if response.status_code < 400: + logger.info("Forwarded mesh message to rChats space '%s'", space_slug) + else: + logger.warning("rChats rejected message: %d %s", response.status_code, response.text[:100]) + except httpx.ConnectError: + logger.debug("rChats not reachable at %s", RCHATS_INTERNAL_URL) + except Exception as e: + logger.warning("Failed to forward to rChats: %s", e) diff --git a/app/reticulum_bridge.py b/app/reticulum_bridge.py index 7ea0f09..8b018ac 100644 --- a/app/reticulum_bridge.py +++ b/app/reticulum_bridge.py @@ -1,5 +1,6 @@ """ -Reticulum bridge — singleton RNS instance, identity management, topology queries. +Reticulum bridge — singleton RNS instance, identity management, +topology queries, signal quality tracking, audio call support. """ import time @@ -14,17 +15,44 @@ logger = logging.getLogger("rmesh.reticulum") _rns_instance: RNS.Reticulum | None = None _identity: RNS.Identity | None = None _destination: RNS.Destination | None = None +_call_destination: RNS.Destination | None = None _start_time: float = 0 _announced_destinations: dict[str, dict] = {} _lock = threading.Lock() +_event_listeners: list = [] APP_NAME = "rmesh" ASPECT = "bridge" +def _safe_decode(data) -> str | None: + """Safely decode app_data which may be binary.""" + if data is None: + return None + if isinstance(data, bytes): + try: + return data.decode("utf-8") + except UnicodeDecodeError: + return data.hex() + return str(data) + + +def register_event_listener(callback): + _event_listeners.append(callback) + + +def _emit_event(event_type: str, data: dict): + event = {"type": event_type, **data} + for listener in _event_listeners: + try: + listener(event) + except Exception: + pass + + def init(): """Initialize the Reticulum instance and server identity.""" - global _rns_instance, _identity, _destination, _start_time + global _rns_instance, _identity, _destination, _call_destination, _start_time if _rns_instance is not None: return @@ -43,30 +71,275 @@ def init(): _identity.to_file(identity_path) logger.info("Created new identity") - # Create a destination for this bridge + # Bridge destination _destination = RNS.Destination( _identity, RNS.Destination.IN, RNS.Destination.SINGLE, APP_NAME, ASPECT, ) - # Register announce handler to track network - RNS.Transport.register_announce_handler(_announce_handler) + # Audio call destination + _call_destination = RNS.Destination( + _identity, RNS.Destination.IN, RNS.Destination.SINGLE, + "call", "audio", + ) + _call_destination.set_link_established_callback(_on_incoming_call_link) + + # Register announce handlers with signal quality + RNS.Transport.register_announce_handler(_AnnounceHandler()) logger.info("Reticulum bridge ready — identity: %s", _identity.hexhash) -def _announce_handler(destination_hash, announced_identity, app_data): - """Callback when we hear an announce from another node.""" - with _lock: - _announced_destinations[destination_hash.hex()] = { - "destination_hash": destination_hash.hex(), - "app_data": app_data.decode("utf-8") if app_data else None, +class _AnnounceHandler: + """Track announces with RSSI/SNR/quality metrics.""" + aspect_filter = None # Catch all announces + + def received_announce(self, destination_hash, announced_identity, app_data, announce_packet_hash=None): + # Extract signal quality from the announce packet + rssi = None + snr = None + quality = None + + # Try to get signal info from the packet + try: + if announce_packet_hash: + packet = RNS.Transport.packet_hashmap.get(announce_packet_hash) + if packet: + rssi = getattr(packet, "rssi", None) + snr = getattr(packet, "snr", None) + quality = getattr(packet, "q", None) or getattr(packet, "quality", None) + except Exception: + pass + + dest_hex = destination_hash.hex() + node_data = { + "destination_hash": dest_hex, + "identity_hash": announced_identity.hexhash if announced_identity else "", + "app_data": _safe_decode(app_data), "last_heard": time.time(), + "rssi": rssi, + "snr": snr, + "quality": quality, } + with _lock: + _announced_destinations[dest_hex] = node_data + + _emit_event("announce", {"node": node_data}) + + +# ═══════════════════════════════════════════════════════════════════ +# Audio Calls over Reticulum Links +# ═══════════════════════════════════════════════════════════════════ + +_active_calls: dict[str, dict] = {} + + +def _on_incoming_call_link(link): + """Handle incoming audio call link.""" + call_id = str(link.hash.hex()) if hasattr(link, "hash") else str(id(link)) + _active_calls[call_id] = { + "id": call_id, + "link": link, + "direction": "inbound", + "started_at": time.time(), + "peer_hash": link.destination.hexhash if hasattr(link.destination, "hexhash") else "", + } + + link.set_packet_callback(lambda msg, pkt: _on_call_audio_packet(call_id, msg)) + link.set_link_closed_callback(lambda l: _on_call_ended(call_id)) + + logger.info("Incoming audio call: %s", call_id[:16]) + _emit_event("call.incoming", {"call_id": call_id, "peer": _active_calls[call_id]["peer_hash"]}) + + +def initiate_call(destination_hash_hex: str) -> dict: + """Initiate an audio call to a destination.""" + if _identity is None: + return {"call_id": "", "error": "No identity"} + + try: + dest_hash = bytes.fromhex(destination_hash_hex) + dest_identity = RNS.Identity.recall(dest_hash) + if dest_identity is None: + RNS.Transport.request_path(dest_hash) + return {"call_id": "", "error": "Peer not found, path requested"} + + dest = RNS.Destination( + dest_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, + "call", "audio", + ) + + link = RNS.Link(dest) + call_id = str(id(link)) + + _active_calls[call_id] = { + "id": call_id, + "link": link, + "direction": "outbound", + "started_at": time.time(), + "peer_hash": destination_hash_hex, + } + + link.set_link_established_callback(lambda l: _emit_event("call.established", {"call_id": call_id})) + link.set_packet_callback(lambda msg, pkt: _on_call_audio_packet(call_id, msg)) + link.set_link_closed_callback(lambda l: _on_call_ended(call_id)) + + return {"call_id": call_id, "peer": destination_hash_hex} + except Exception as e: + return {"call_id": "", "error": str(e)} + + +def send_call_audio(call_id: str, audio_data: bytes) -> bool: + """Send audio data over an active call link.""" + call = _active_calls.get(call_id) + if not call or not call["link"]: + return False + + try: + link = call["link"] + if link.status == RNS.Link.ACTIVE and len(audio_data) <= RNS.Link.MDU: + RNS.Packet(link, audio_data).send() + return True + except Exception: + pass + return False + + +def hangup_call(call_id: str): + """End an active call.""" + call = _active_calls.pop(call_id, None) + if call and call["link"]: + try: + call["link"].teardown() + except Exception: + pass + _emit_event("call.ended", {"call_id": call_id}) + + +def get_active_calls() -> list[dict]: + """Return active calls (without link objects).""" + return [{ + "id": c["id"], + "direction": c["direction"], + "started_at": c["started_at"], + "peer_hash": c["peer_hash"], + "duration": time.time() - c["started_at"], + } for c in _active_calls.values()] + + +def _on_call_audio_packet(call_id: str, audio_data): + """Handle received audio data from a call.""" + _emit_event("call.audio", {"call_id": call_id, "size": len(audio_data) if audio_data else 0}) + + +def _on_call_ended(call_id: str): + """Handle call link closure.""" + _active_calls.pop(call_id, None) + logger.info("Call ended: %s", call_id[:16]) + _emit_event("call.ended", {"call_id": call_id}) + + +def announce_call_capability(): + """Announce that this node can receive audio calls.""" + if _call_destination: + _call_destination.announce(app_data=b"rMesh Audio") + return True + return False + + +# ═══════════════════════════════════════════════════════════════════ +# NomadNet Node Browser +# ═══════════════════════════════════════════════════════════════════ + +_nomadnet_nodes: dict[str, dict] = {} + + +class _NomadNetAnnounceHandler: + """Track NomadNet node announces.""" + aspect_filter = "nomadnetwork.node" + + def received_announce(self, destination_hash, announced_identity, app_data, announce_packet_hash=None): + dest_hex = destination_hash.hex() + node_name = "" + if app_data: + try: + node_name = app_data.decode("utf-8") if isinstance(app_data, bytes) else str(app_data) + except Exception: + pass + + _nomadnet_nodes[dest_hex] = { + "destination_hash": dest_hex, + "name": node_name, + "identity_hash": announced_identity.hexhash if announced_identity else "", + "last_heard": time.time(), + } + _emit_event("nomadnet.announce", {"node": _nomadnet_nodes[dest_hex]}) + + +def get_nomadnet_nodes() -> list[dict]: + """Return discovered NomadNet nodes.""" + return list(_nomadnet_nodes.values()) + + +def browse_nomadnet_node(destination_hash_hex: str, page_path: str = "/") -> dict: + """Request a page from a NomadNet node. Returns async — result via event.""" + if _identity is None: + return {"error": "No identity"} + + try: + dest_hash = bytes.fromhex(destination_hash_hex) + dest_identity = RNS.Identity.recall(dest_hash) + if dest_identity is None: + RNS.Transport.request_path(dest_hash) + return {"error": "Node not found, path requested"} + + dest = RNS.Destination( + dest_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, + "nomadnetwork", "node", + ) + + link = RNS.Link(dest) + + def on_established(l): + # Request page + l.request( + page_path.encode("utf-8"), + response_callback=lambda receipt: _on_nomadnet_response(destination_hash_hex, page_path, receipt), + failed_callback=lambda receipt: _emit_event("nomadnet.page.error", { + "node": destination_hash_hex, "path": page_path, "error": "Request failed" + }), + ) + + link.set_link_established_callback(on_established) + return {"status": "requesting", "node": destination_hash_hex, "path": page_path} + + except Exception as e: + return {"error": str(e)} + + +def _on_nomadnet_response(node_hash: str, page_path: str, receipt): + """Handle NomadNet page response.""" + try: + response = receipt.response + content = response.decode("utf-8") if isinstance(response, bytes) else str(response) + _emit_event("nomadnet.page.downloaded", { + "node": node_hash, + "path": page_path, + "content": content, + "size": len(content), + }) + except Exception as e: + _emit_event("nomadnet.page.error", { + "node": node_hash, "path": page_path, "error": str(e) + }) + + +# ═══════════════════════════════════════════════════════════════════ +# Status / Query +# ═══════════════════════════════════════════════════════════════════ def get_status() -> dict: - """Return transport status info.""" if _rns_instance is None: return {"online": False, "transport_enabled": False, "identity_hash": "", "uptime_seconds": 0, "announced_count": 0, "path_count": 0} @@ -81,21 +354,20 @@ def get_status() -> dict: "uptime_seconds": time.time() - _start_time, "announced_count": announced_count, "path_count": len(RNS.Transport.destinations) if hasattr(RNS.Transport, "destinations") else 0, + "active_calls": len(_active_calls), + "nomadnet_nodes": len(_nomadnet_nodes), } def get_nodes() -> list[dict]: - """Return list of known announced destinations.""" with _lock: return list(_announced_destinations.values()) def get_topology() -> dict: - """Return nodes and links for visualization.""" nodes = get_nodes() links = [] - # Build links from destinations/path tables if available dest_table = getattr(RNS.Transport, "destinations", {}) if isinstance(dest_table, (dict, list)): try: @@ -110,7 +382,7 @@ def get_topology() -> dict: "active": True, }) except Exception: - pass # Transport internals may vary between RNS versions + pass return { "nodes": nodes, @@ -121,7 +393,6 @@ def get_topology() -> dict: def get_identity_info() -> dict: - """Return server identity information.""" if _identity is None: return {"identity_hash": "", "public_key_hex": ""} return { @@ -131,7 +402,6 @@ def get_identity_info() -> dict: def announce(): - """Announce this bridge on the network.""" if _destination is None: return {"announced": False, "identity_hash": ""} _destination.announce(app_data=b"rMesh Bridge") diff --git a/app/websocket_manager.py b/app/websocket_manager.py new file mode 100644 index 0000000..d756ca7 --- /dev/null +++ b/app/websocket_manager.py @@ -0,0 +1,63 @@ +""" +WebSocket manager — broadcasts real-time events to connected clients. +""" + +import asyncio +import json +import logging +from typing import Set + +from fastapi import WebSocket + +logger = logging.getLogger("rmesh.websocket") + +_clients: Set[WebSocket] = set() +_loop: asyncio.AbstractEventLoop | None = None + + +def set_event_loop(loop: asyncio.AbstractEventLoop): + global _loop + _loop = loop + + +async def connect(websocket: WebSocket): + await websocket.accept() + _clients.add(websocket) + logger.info("WebSocket client connected (%d total)", len(_clients)) + + +def disconnect(websocket: WebSocket): + _clients.discard(websocket) + logger.info("WebSocket client disconnected (%d remaining)", len(_clients)) + + +async def broadcast(data: dict): + """Send event to all connected WebSocket clients.""" + if not _clients: + return + + message = json.dumps(data) + dead = set() + for client in _clients: + try: + await client.send_text(message) + except Exception: + dead.add(client) + + for client in dead: + _clients.discard(client) + + +def on_event(event: dict): + """Synchronous callback that schedules broadcast on the event loop. + Use this as the event_listener for reticulum_bridge and lxmf_bridge.""" + if _loop is None or not _clients: + return + try: + asyncio.run_coroutine_threadsafe(broadcast(event), _loop) + except Exception: + pass + + +def client_count() -> int: + return len(_clients) diff --git a/requirements.txt b/requirements.txt index bc2ef53..00ba57d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,3 +4,5 @@ fastapi>=0.115.0 uvicorn[standard]>=0.30.0 pydantic>=2.0 meshcore>=2.3.0 +httpx>=0.27.0 +websockets>=12.0