Compare commits

..

No commits in common. "main" and "dev" have entirely different histories.
main ... dev

7 changed files with 88 additions and 1002 deletions

View File

@ -4,23 +4,12 @@ BRIDGE_API_KEY = os.environ.get("BRIDGE_API_KEY", "")
RNS_CONFIG_DIR = os.environ.get("RNS_CONFIG_DIR", "/data/reticulum") RNS_CONFIG_DIR = os.environ.get("RNS_CONFIG_DIR", "/data/reticulum")
LXMF_STORAGE_DIR = os.environ.get("LXMF_STORAGE_DIR", "/data/lxmf") LXMF_STORAGE_DIR = os.environ.get("LXMF_STORAGE_DIR", "/data/lxmf")
LXMF_MESSAGES_FILE = os.path.join(LXMF_STORAGE_DIR, "bridge_messages.json") LXMF_MESSAGES_FILE = os.path.join(LXMF_STORAGE_DIR, "bridge_messages.json")
LXMF_ATTACHMENTS_DIR = os.path.join(LXMF_STORAGE_DIR, "attachments")
LOG_LEVEL = int(os.environ.get("RNS_LOG_LEVEL", "4")) LOG_LEVEL = int(os.environ.get("RNS_LOG_LEVEL", "4"))
# MeshCore companion node connection # MeshCore companion node connection
MESHCORE_ENABLED = os.environ.get("MESHCORE_ENABLED", "false").lower() == "true" MESHCORE_ENABLED = os.environ.get("MESHCORE_ENABLED", "false").lower() == "true"
MESHCORE_HOST = os.environ.get("MESHCORE_HOST", "") MESHCORE_HOST = os.environ.get("MESHCORE_HOST", "") # TCP host (WiFi companion)
MESHCORE_PORT = int(os.environ.get("MESHCORE_PORT", "5000")) MESHCORE_PORT = int(os.environ.get("MESHCORE_PORT", "5000"))
MESHCORE_SERIAL = os.environ.get("MESHCORE_SERIAL", "") MESHCORE_SERIAL = os.environ.get("MESHCORE_SERIAL", "") # Serial port (USB companion)
MESHCORE_DATA_DIR = os.environ.get("MESHCORE_DATA_DIR", "/data/meshcore") MESHCORE_DATA_DIR = os.environ.get("MESHCORE_DATA_DIR", "/data/meshcore")
MESHCORE_MESSAGES_FILE = os.path.join(MESHCORE_DATA_DIR, "messages.json") MESHCORE_MESSAGES_FILE = os.path.join(MESHCORE_DATA_DIR, "messages.json")
# Feature flags
AUTO_RESEND_ON_ANNOUNCE = os.environ.get("AUTO_RESEND_ON_ANNOUNCE", "true").lower() == "true"
LOCAL_PROPAGATION_NODE = os.environ.get("LOCAL_PROPAGATION_NODE", "true").lower() == "true"
PREFERRED_PROPAGATION_NODE = os.environ.get("PREFERRED_PROPAGATION_NODE", "")
PROPAGATION_SYNC_INTERVAL = int(os.environ.get("PROPAGATION_SYNC_INTERVAL", "300"))
# rChats bridge
RCHATS_BRIDGE_ENABLED = os.environ.get("RCHATS_BRIDGE_ENABLED", "false").lower() == "true"
RCHATS_INTERNAL_URL = os.environ.get("RCHATS_INTERNAL_URL", "http://rchats:3000")

View File

@ -1,9 +1,7 @@
""" """
LXMF bridge rich message handling, propagation node management, LXMF bridge message send/receive via the LXMF protocol on Reticulum.
auto-resend on announce, file/image/audio attachments.
""" """
import base64
import json import json
import time import time
import uuid import uuid
@ -14,11 +12,7 @@ import os
import RNS import RNS
import LXMF import LXMF
from .config import ( from .config import LXMF_STORAGE_DIR, LXMF_MESSAGES_FILE
LXMF_STORAGE_DIR, LXMF_MESSAGES_FILE, LXMF_ATTACHMENTS_DIR,
AUTO_RESEND_ON_ANNOUNCE, LOCAL_PROPAGATION_NODE,
PREFERRED_PROPAGATION_NODE, PROPAGATION_SYNC_INTERVAL,
)
logger = logging.getLogger("rmesh.lxmf") logger = logging.getLogger("rmesh.lxmf")
@ -27,9 +21,6 @@ _local_delivery_destination: RNS.Destination | None = None
_identity: RNS.Identity | None = None _identity: RNS.Identity | None = None
_messages: list[dict] = [] _messages: list[dict] = []
_lock = threading.Lock() _lock = threading.Lock()
_event_listeners: list = [] # WebSocket broadcast callbacks
_propagation_nodes: dict[str, dict] = {}
_last_propagation_sync: float = 0
def init(identity: RNS.Identity): def init(identity: RNS.Identity):
@ -39,30 +30,16 @@ def init(identity: RNS.Identity):
_identity = identity _identity = identity
os.makedirs(LXMF_STORAGE_DIR, exist_ok=True) os.makedirs(LXMF_STORAGE_DIR, exist_ok=True)
os.makedirs(LXMF_ATTACHMENTS_DIR, exist_ok=True)
_router = LXMF.LXMRouter( _router = LXMF.LXMRouter(
identity=_identity, identity=_identity,
storagepath=LXMF_STORAGE_DIR, storagepath=LXMF_STORAGE_DIR,
) )
# Increase transfer limit for attachments (10 MB) # Enable propagation node
_router.delivery_per_transfer_limit = 10000
# Enable local propagation node
if LOCAL_PROPAGATION_NODE:
_router.enable_propagation() _router.enable_propagation()
logger.info("Local LXMF propagation node enabled")
# Set preferred propagation node # Register local delivery destination for receiving messages
if PREFERRED_PROPAGATION_NODE:
try:
_router.set_outbound_propagation_node(bytes.fromhex(PREFERRED_PROPAGATION_NODE))
logger.info("Set preferred propagation node: %s", PREFERRED_PROPAGATION_NODE[:16])
except Exception as e:
logger.warning("Failed to set propagation node: %s", e)
# Register local delivery
_local_delivery_destination = _router.register_delivery_identity( _local_delivery_destination = _router.register_delivery_identity(
_identity, _identity,
display_name="rMesh Bridge", display_name="rMesh Bridge",
@ -70,152 +47,14 @@ def init(identity: RNS.Identity):
_router.register_delivery_callback(_delivery_callback) _router.register_delivery_callback(_delivery_callback)
# Register announce handlers for auto-resend and propagation tracking # Load persisted messages
RNS.Transport.register_announce_handler(_LxmfAnnounceHandler())
RNS.Transport.register_announce_handler(_PropagationAnnounceHandler())
_load_messages() _load_messages()
# Start propagation sync thread
if PROPAGATION_SYNC_INTERVAL > 0:
sync_thread = threading.Thread(target=_propagation_sync_loop, daemon=True)
sync_thread.start()
logger.info("LXMF bridge ready — delivery hash: %s", _identity.hexhash) logger.info("LXMF bridge ready — delivery hash: %s", _identity.hexhash)
def register_event_listener(callback):
"""Register a callback for real-time events (WebSocket broadcast)."""
_event_listeners.append(callback)
def _emit_event(event_type: str, data: dict):
"""Emit event to all registered listeners."""
event = {"type": event_type, **data}
for listener in _event_listeners:
try:
listener(event)
except Exception:
pass
# ═══════════════════════════════════════════════════════════════════
# LXMF Rich Field Parsing
# ═══════════════════════════════════════════════════════════════════
def _parse_lxmf_fields(lxmf_message) -> dict:
"""Extract all LXMF fields from a message into a serializable dict."""
fields = {}
try:
lxmf_fields = lxmf_message.get_fields()
except Exception:
return fields
# Image attachment
if LXMF.FIELD_IMAGE in lxmf_fields:
try:
image_data = lxmf_fields[LXMF.FIELD_IMAGE]
image_type = image_data[0] if len(image_data) > 0 else "unknown"
image_bytes = image_data[1] if len(image_data) > 1 else b""
if image_bytes:
attachment_id = str(uuid.uuid4())
ext = "jpg" if "jpeg" in str(image_type).lower() or "jpg" in str(image_type).lower() else "png"
filename = f"{attachment_id}.{ext}"
filepath = os.path.join(LXMF_ATTACHMENTS_DIR, filename)
with open(filepath, "wb") as f:
f.write(image_bytes if isinstance(image_bytes, bytes) else bytes(image_bytes))
fields["image"] = {
"type": str(image_type),
"filename": filename,
"size": len(image_bytes),
"base64": base64.b64encode(image_bytes if isinstance(image_bytes, bytes) else bytes(image_bytes)).decode("ascii")[:200] + "...",
}
except Exception as e:
logger.warning("Failed to parse image field: %s", e)
# File attachments
if LXMF.FIELD_FILE_ATTACHMENTS in lxmf_fields:
try:
file_list = lxmf_fields[LXMF.FIELD_FILE_ATTACHMENTS]
attachments = []
for file_data in file_list:
file_name = file_data[0] if len(file_data) > 0 else "unknown"
file_bytes = file_data[1] if len(file_data) > 1 else b""
if file_bytes:
safe_name = "".join(c for c in str(file_name) if c.isalnum() or c in ".-_")
attachment_id = str(uuid.uuid4())
stored_name = f"{attachment_id}_{safe_name}"
filepath = os.path.join(LXMF_ATTACHMENTS_DIR, stored_name)
with open(filepath, "wb") as f:
f.write(file_bytes if isinstance(file_bytes, bytes) else bytes(file_bytes))
attachments.append({
"name": str(file_name),
"stored_name": stored_name,
"size": len(file_bytes),
})
if attachments:
fields["file_attachments"] = attachments
except Exception as e:
logger.warning("Failed to parse file attachments: %s", e)
# Audio message (Codec2)
if LXMF.FIELD_AUDIO in lxmf_fields:
try:
audio_data = lxmf_fields[LXMF.FIELD_AUDIO]
audio_mode = audio_data[0] if len(audio_data) > 0 else 0
audio_bytes = audio_data[1] if len(audio_data) > 1 else b""
if audio_bytes:
attachment_id = str(uuid.uuid4())
filename = f"{attachment_id}.c2"
filepath = os.path.join(LXMF_ATTACHMENTS_DIR, filename)
with open(filepath, "wb") as f:
f.write(audio_bytes if isinstance(audio_bytes, bytes) else bytes(audio_bytes))
fields["audio"] = {
"codec": "codec2",
"mode": audio_mode,
"filename": filename,
"size": len(audio_bytes),
}
except Exception as e:
logger.warning("Failed to parse audio field: %s", e)
# Icon appearance
if LXMF.FIELD_ICON_APPEARANCE in lxmf_fields:
try:
icon_data = lxmf_fields[LXMF.FIELD_ICON_APPEARANCE]
fields["icon"] = {
"name": str(icon_data[0]) if len(icon_data) > 0 else "",
"foreground": "#" + icon_data[1].hex() if len(icon_data) > 1 and hasattr(icon_data[1], "hex") else "",
"background": "#" + icon_data[2].hex() if len(icon_data) > 2 and hasattr(icon_data[2], "hex") else "",
}
except Exception as e:
logger.warning("Failed to parse icon field: %s", e)
# Commands
if LXMF.FIELD_COMMANDS in lxmf_fields:
try:
fields["commands"] = [str(cmd) for cmd in lxmf_fields[LXMF.FIELD_COMMANDS]]
except Exception:
pass
return fields
# ═══════════════════════════════════════════════════════════════════
# Message Handling
# ═══════════════════════════════════════════════════════════════════
def _delivery_callback(message): def _delivery_callback(message):
"""Handle incoming LXMF message with rich field extraction.""" """Handle incoming LXMF message."""
# Parse rich fields
fields = _parse_lxmf_fields(message)
# Get signal quality if available
rssi = getattr(message, "rssi", None)
snr = getattr(message, "snr", None)
quality = getattr(message, "quality", None)
msg_record = { msg_record = {
"id": str(uuid.uuid4()), "id": str(uuid.uuid4()),
"direction": "inbound", "direction": "inbound",
@ -223,10 +62,6 @@ def _delivery_callback(message):
"recipient_hash": _identity.hexhash if _identity else "", "recipient_hash": _identity.hexhash if _identity else "",
"title": message.title.decode("utf-8") if isinstance(message.title, bytes) else str(message.title or ""), "title": message.title.decode("utf-8") if isinstance(message.title, bytes) else str(message.title or ""),
"content": message.content.decode("utf-8") if isinstance(message.content, bytes) else str(message.content or ""), "content": message.content.decode("utf-8") if isinstance(message.content, bytes) else str(message.content or ""),
"fields": fields,
"rssi": rssi,
"snr": snr,
"quality": quality,
"status": "delivered", "status": "delivered",
"timestamp": time.time(), "timestamp": time.time(),
} }
@ -235,19 +70,11 @@ def _delivery_callback(message):
_messages.append(msg_record) _messages.append(msg_record)
_save_messages() _save_messages()
logger.info("Received LXMF message from %s (fields: %s)", logger.info("Received LXMF message from %s", msg_record["sender_hash"][:16])
msg_record["sender_hash"][:16],
list(fields.keys()) if fields else "none")
_emit_event("lxmf.delivery", {"message": msg_record})
def send_message(destination_hash_hex: str, content: str, title: str = "", def send_message(destination_hash_hex: str, content: str, title: str = "") -> dict:
image_bytes: bytes = None, image_type: str = "image/jpeg", """Send an LXMF message to a destination hash."""
file_attachments: list = None,
audio_bytes: bytes = None, audio_mode: int = 0,
try_propagation_on_fail: bool = True) -> dict:
"""Send an LXMF message with optional attachments."""
if _router is None or _identity is None: if _router is None or _identity is None:
return {"id": "", "status": "failed"} return {"id": "", "status": "failed"}
@ -256,82 +83,72 @@ def send_message(destination_hash_hex: str, content: str, title: str = "",
except ValueError: except ValueError:
return {"id": "", "status": "failed"} return {"id": "", "status": "failed"}
# Look up identity for destination
dest_identity = RNS.Identity.recall(dest_hash) dest_identity = RNS.Identity.recall(dest_hash)
msg_id = str(uuid.uuid4()) msg_id = str(uuid.uuid4())
if dest_identity is None: if dest_identity is None:
# Request path first, the message may be deliverable later
RNS.Transport.request_path(dest_hash) RNS.Transport.request_path(dest_hash)
msg_record = { msg_record = {
"id": msg_id, "direction": "outbound", "id": msg_id,
"sender_hash": _identity.hexhash, "recipient_hash": destination_hash_hex, "direction": "outbound",
"title": title, "content": content, "fields": {}, "sender_hash": _identity.hexhash,
"status": "pending", "timestamp": time.time(), "recipient_hash": destination_hash_hex,
"title": title,
"content": content,
"status": "pending",
"timestamp": time.time(),
} }
with _lock: with _lock:
_messages.append(msg_record) _messages.append(msg_record)
_save_messages() _save_messages()
return msg_record return msg_record
# Create destination for the recipient
dest = RNS.Destination( dest = RNS.Destination(
dest_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, dest_identity, RNS.Destination.OUT, RNS.Destination.SINGLE,
"lxmf", "delivery", "lxmf", "delivery",
) )
lxm = LXMF.LXMessage( lxm = LXMF.LXMessage(
dest, _router.get_delivery_destination(), dest,
_router.get_delivery_destination(),
content.encode("utf-8") if isinstance(content, str) else content, content.encode("utf-8") if isinstance(content, str) else content,
title=title.encode("utf-8") if isinstance(title, str) else title, title=title.encode("utf-8") if isinstance(title, str) else title,
desired_method=LXMF.LXMessage.DIRECT, desired_method=LXMF.LXMessage.DIRECT,
) )
# Add rich fields
if image_bytes:
lxm.fields[LXMF.FIELD_IMAGE] = [image_type, image_bytes]
if file_attachments:
lxm.fields[LXMF.FIELD_FILE_ATTACHMENTS] = [
[att["name"], att["bytes"]] for att in file_attachments
]
if audio_bytes:
lxm.fields[LXMF.FIELD_AUDIO] = [audio_mode, audio_bytes]
# Enable propagation fallback
lxm.try_propagation_on_fail = try_propagation_on_fail
msg_record = { msg_record = {
"id": msg_id, "direction": "outbound", "id": msg_id,
"sender_hash": _identity.hexhash, "recipient_hash": destination_hash_hex, "direction": "outbound",
"title": title, "content": content, "fields": {}, "sender_hash": _identity.hexhash,
"status": "pending", "timestamp": time.time(), "recipient_hash": destination_hash_hex,
"title": title,
"content": content,
"status": "pending",
"timestamp": time.time(),
} }
def _on_delivered(message): def _delivery_callback_outbound(message):
with _lock: with _lock:
for m in _messages: for m in _messages:
if m["id"] == msg_id: if m["id"] == msg_id:
m["status"] = "delivered" m["status"] = "delivered"
break break
_save_messages() _save_messages()
_emit_event("lxmf.state_updated", {"message_id": msg_id, "status": "delivered"})
def _on_failed(message):
# Try propagation node if direct failed
if getattr(message, "try_propagation_on_fail", False) and _router.outbound_propagation_node:
logger.info("Direct delivery failed, trying propagation node for %s", destination_hash_hex[:16])
message.desired_method = LXMF.LXMessage.PROPAGATED
message.try_propagation_on_fail = False
_router.handle_outbound(message)
return
def _failed_callback(message):
with _lock: with _lock:
for m in _messages: for m in _messages:
if m["id"] == msg_id: if m["id"] == msg_id:
m["status"] = "failed" m["status"] = "failed"
break break
_save_messages() _save_messages()
_emit_event("lxmf.state_updated", {"message_id": msg_id, "status": "failed"})
lxm.delivery_callback = _on_delivered lxm.delivery_callback = _delivery_callback_outbound
lxm.failed_callback = _on_failed lxm.failed_callback = _failed_callback
_router.handle_outbound(lxm) _router.handle_outbound(lxm)
@ -339,159 +156,18 @@ def send_message(destination_hash_hex: str, content: str, title: str = "",
_messages.append(msg_record) _messages.append(msg_record)
_save_messages() _save_messages()
_emit_event("lxmf.created", {"message": msg_record})
return msg_record return msg_record
# ═══════════════════════════════════════════════════════════════════
# Auto-Resend Failed Messages on Announce
# ═══════════════════════════════════════════════════════════════════
class _LxmfAnnounceHandler:
"""Re-attempt failed messages when a peer announces."""
aspect_filter = "lxmf.delivery"
def received_announce(self, destination_hash, announced_identity, app_data, announce_packet_hash=None):
if not AUTO_RESEND_ON_ANNOUNCE:
return
dest_hex = destination_hash.hex()
resend_count = 0
with _lock:
for msg in _messages:
if (msg["status"] == "failed" and
msg["direction"] == "outbound" and
msg["recipient_hash"] == dest_hex):
msg["status"] = "pending"
resend_count += 1
# Re-send in background
threading.Thread(
target=_resend_message, args=(msg,), daemon=True
).start()
if resend_count > 0:
_save_messages()
if resend_count > 0:
logger.info("Auto-resending %d failed messages to %s", resend_count, dest_hex[:16])
def _resend_message(msg_record: dict):
"""Resend a previously failed message."""
time.sleep(1) # Brief delay to let path establish
result = send_message(
msg_record["recipient_hash"],
msg_record["content"],
msg_record.get("title", ""),
)
if result.get("status") == "pending":
# Remove the duplicate — original is being retried
with _lock:
_messages[:] = [m for m in _messages if m["id"] != result["id"]]
_save_messages()
# ═══════════════════════════════════════════════════════════════════
# Propagation Node Management
# ═══════════════════════════════════════════════════════════════════
class _PropagationAnnounceHandler:
"""Track LXMF propagation node announces."""
aspect_filter = "lxmf.propagation"
def received_announce(self, destination_hash, announced_identity, app_data, announce_packet_hash=None):
dest_hex = destination_hash.hex()
node_info = {
"destination_hash": dest_hex,
"identity_hash": announced_identity.hexhash if announced_identity else "",
"last_heard": time.time(),
"enabled": True,
"per_transfer_limit": 0,
}
# Parse propagation node app data
if app_data:
try:
import RNS.vendor.umsgpack as msgpack
data = msgpack.unpackb(app_data)
if isinstance(data, dict):
node_info["enabled"] = data.get("enabled", True)
node_info["per_transfer_limit"] = data.get("per_transfer_limit", 0)
except Exception:
pass
_propagation_nodes[dest_hex] = node_info
_emit_event("propagation.announce", {"node": node_info})
def get_propagation_nodes() -> list[dict]:
"""Return list of known propagation nodes."""
return list(_propagation_nodes.values())
def set_preferred_propagation_node(destination_hash_hex: str) -> bool:
"""Set the preferred outbound propagation node."""
if _router is None:
return False
try:
if destination_hash_hex:
_router.set_outbound_propagation_node(bytes.fromhex(destination_hash_hex))
else:
_router.outbound_propagation_node = None
return True
except Exception as e:
logger.warning("Failed to set propagation node: %s", e)
return False
def get_propagation_status() -> dict:
"""Return propagation node status."""
if _router is None:
return {"enabled": False, "outbound_node": None, "transfer_state": "idle"}
outbound = _router.get_outbound_propagation_node()
return {
"local_enabled": LOCAL_PROPAGATION_NODE,
"outbound_node": outbound.hex() if outbound else None,
"transfer_state": str(getattr(_router, "propagation_transfer_state", "unknown")),
"known_nodes": len(_propagation_nodes),
"last_sync": _last_propagation_sync,
}
def sync_propagation_messages():
"""Request messages from the propagation node."""
global _last_propagation_sync
if _router is None or _identity is None:
return
try:
_router.request_messages_from_propagation_node(_identity)
_last_propagation_sync = time.time()
logger.info("Syncing messages from propagation node")
except Exception as e:
logger.warning("Propagation sync failed: %s", e)
def _propagation_sync_loop():
"""Background thread to periodically sync propagation messages."""
while True:
time.sleep(PROPAGATION_SYNC_INTERVAL)
if _router and _router.get_outbound_propagation_node():
sync_propagation_messages()
# ═══════════════════════════════════════════════════════════════════
# Query / Persistence
# ═══════════════════════════════════════════════════════════════════
def get_messages(limit: int = 100, offset: int = 0) -> list[dict]: def get_messages(limit: int = 100, offset: int = 0) -> list[dict]:
"""Return stored messages."""
with _lock: with _lock:
sorted_msgs = sorted(_messages, key=lambda m: m["timestamp"], reverse=True) sorted_msgs = sorted(_messages, key=lambda m: m["timestamp"], reverse=True)
return sorted_msgs[offset:offset + limit] return sorted_msgs[offset:offset + limit]
def get_message(msg_id: str) -> dict | None: def get_message(msg_id: str) -> dict | None:
"""Return a single message by ID."""
with _lock: with _lock:
for m in _messages: for m in _messages:
if m["id"] == msg_id: if m["id"] == msg_id:
@ -500,19 +176,13 @@ def get_message(msg_id: str) -> dict | None:
def get_total_count() -> int: def get_total_count() -> int:
"""Return total message count."""
with _lock: with _lock:
return len(_messages) return len(_messages)
def get_attachment_path(filename: str) -> str | None:
"""Return full path to an attachment file if it exists."""
filepath = os.path.join(LXMF_ATTACHMENTS_DIR, filename)
if os.path.exists(filepath):
return filepath
return None
def _load_messages(): def _load_messages():
"""Load messages from persistent storage."""
global _messages global _messages
if os.path.exists(LXMF_MESSAGES_FILE): if os.path.exists(LXMF_MESSAGES_FILE):
try: try:
@ -525,6 +195,7 @@ def _load_messages():
def _save_messages(): def _save_messages():
"""Persist messages to storage (call with _lock held)."""
try: try:
os.makedirs(os.path.dirname(LXMF_MESSAGES_FILE), exist_ok=True) os.makedirs(os.path.dirname(LXMF_MESSAGES_FILE), exist_ok=True)
with open(LXMF_MESSAGES_FILE, "w") as f: with open(LXMF_MESSAGES_FILE, "w") as f:

View File

@ -1,19 +1,18 @@
""" """
rMesh Bridge Unified REST + WebSocket API for Reticulum (Internet backbone), rMesh Bridge FastAPI service exposing Reticulum (Internet backbone)
MeshCore (LoRa mesh), LXMF messaging, audio calls, and NomadNet browsing. and MeshCore (LoRa mesh) as a unified REST API.
""" """
import asyncio
import logging import logging
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from typing import Optional
from fastapi import FastAPI, HTTPException, Header, WebSocket, WebSocketDisconnect from fastapi import FastAPI, HTTPException, Header
from fastapi.responses import FileResponse
from pydantic import BaseModel from pydantic import BaseModel
from typing import Annotated, Optional from typing import Annotated
from . import reticulum_bridge, lxmf_bridge, meshcore_bridge, websocket_manager, rchats_bridge from . import reticulum_bridge, lxmf_bridge, meshcore_bridge
from .config import BRIDGE_API_KEY, MESHCORE_ENABLED, RCHATS_BRIDGE_ENABLED from .config import BRIDGE_API_KEY, MESHCORE_ENABLED
from .models import ( from .models import (
StatusResponse, NodesResponse, TopologyResponse, StatusResponse, NodesResponse, TopologyResponse,
MessageIn, MessageOut, MessagesResponse, MessageIn, MessageOut, MessagesResponse,
@ -26,22 +25,9 @@ logger = logging.getLogger("rmesh.api")
@asynccontextmanager @asynccontextmanager
async def lifespan(app: FastAPI): async def lifespan(app: FastAPI):
"""Initialize all subsystems on startup.""" """Initialize Reticulum and MeshCore on startup."""
logger.info("Starting rMesh bridge...") logger.info("Starting rMesh bridge...")
# Set up WebSocket event loop
loop = asyncio.get_event_loop()
websocket_manager.set_event_loop(loop)
# Register WebSocket broadcast as event listener for all subsystems
reticulum_bridge.register_event_listener(websocket_manager.on_event)
lxmf_bridge.register_event_listener(websocket_manager.on_event)
# Register rChats bridge as event listener
if RCHATS_BRIDGE_ENABLED:
lxmf_bridge.register_event_listener(rchats_bridge.on_lxmf_event)
rchats_bridge.init()
# Reticulum (Internet backbone) # Reticulum (Internet backbone)
reticulum_bridge.init() reticulum_bridge.init()
from .reticulum_bridge import _identity from .reticulum_bridge import _identity
@ -50,7 +36,6 @@ async def lifespan(app: FastAPI):
else: else:
logger.error("No Reticulum identity available for LXMF") logger.error("No Reticulum identity available for LXMF")
reticulum_bridge.announce() reticulum_bridge.announce()
reticulum_bridge.announce_call_capability()
# MeshCore (LoRa mesh) # MeshCore (LoRa mesh)
if MESHCORE_ENABLED: if MESHCORE_ENABLED:
@ -75,25 +60,6 @@ def verify_api_key(x_bridge_api_key: Annotated[str, Header()] = ""):
raise HTTPException(status_code=401, detail="Invalid API key") raise HTTPException(status_code=401, detail="Invalid API key")
# ═══════════════════════════════════════════════════════════════════
# WebSocket (real-time events)
# ═══════════════════════════════════════════════════════════════════
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket_manager.connect(websocket)
try:
while True:
data = await websocket.receive_text()
# Handle ping/pong
if data == "ping":
await websocket.send_text('{"type":"pong"}')
except WebSocketDisconnect:
websocket_manager.disconnect(websocket)
except Exception:
websocket_manager.disconnect(websocket)
# ═══════════════════════════════════════════════════════════════════ # ═══════════════════════════════════════════════════════════════════
# Health & Combined Status # Health & Combined Status
# ═══════════════════════════════════════════════════════════════════ # ═══════════════════════════════════════════════════════════════════
@ -106,18 +72,15 @@ async def health():
@app.get("/api/status") @app.get("/api/status")
async def combined_status(x_bridge_api_key: Annotated[str, Header()] = ""): async def combined_status(x_bridge_api_key: Annotated[str, Header()] = ""):
verify_api_key(x_bridge_api_key) verify_api_key(x_bridge_api_key)
return { result = {
"reticulum": reticulum_bridge.get_status(), "reticulum": reticulum_bridge.get_status(),
"meshcore": meshcore_bridge.get_status() if MESHCORE_ENABLED else { "meshcore": meshcore_bridge.get_status() if MESHCORE_ENABLED else {"connected": False, "device_info": {}, "contact_count": 0, "message_count": 0},
"connected": False, "device_info": {}, "contact_count": 0, "message_count": 0
},
"propagation": lxmf_bridge.get_propagation_status(),
"websocket_clients": websocket_manager.client_count(),
} }
return result
# ═══════════════════════════════════════════════════════════════════ # ═══════════════════════════════════════════════════════════════════
# Reticulum Endpoints # Reticulum Endpoints (Internet backbone)
# ═══════════════════════════════════════════════════════════════════ # ═══════════════════════════════════════════════════════════════════
@app.get("/api/reticulum/status", response_model=StatusResponse) @app.get("/api/reticulum/status", response_model=StatusResponse)
@ -126,14 +89,14 @@ async def reticulum_status(x_bridge_api_key: Annotated[str, Header()] = ""):
return reticulum_bridge.get_status() return reticulum_bridge.get_status()
@app.get("/api/reticulum/nodes") @app.get("/api/reticulum/nodes", response_model=NodesResponse)
async def reticulum_nodes(x_bridge_api_key: Annotated[str, Header()] = ""): async def reticulum_nodes(x_bridge_api_key: Annotated[str, Header()] = ""):
verify_api_key(x_bridge_api_key) verify_api_key(x_bridge_api_key)
node_list = reticulum_bridge.get_nodes() node_list = reticulum_bridge.get_nodes()
return {"nodes": node_list, "total": len(node_list)} return {"nodes": node_list, "total": len(node_list)}
@app.get("/api/reticulum/topology") @app.get("/api/reticulum/topology", response_model=TopologyResponse)
async def reticulum_topology(x_bridge_api_key: Annotated[str, Header()] = ""): async def reticulum_topology(x_bridge_api_key: Annotated[str, Header()] = ""):
verify_api_key(x_bridge_api_key) verify_api_key(x_bridge_api_key)
return reticulum_bridge.get_topology() return reticulum_bridge.get_topology()
@ -151,11 +114,7 @@ async def reticulum_identity(x_bridge_api_key: Annotated[str, Header()] = ""):
return reticulum_bridge.get_identity_info() return reticulum_bridge.get_identity_info()
# ═══════════════════════════════════════════════════════════════════ @app.get("/api/reticulum/messages", response_model=MessagesResponse)
# LXMF Messages (with rich fields)
# ═══════════════════════════════════════════════════════════════════
@app.get("/api/reticulum/messages")
async def reticulum_messages( async def reticulum_messages(
x_bridge_api_key: Annotated[str, Header()] = "", x_bridge_api_key: Annotated[str, Header()] = "",
limit: int = 100, offset: int = 0, limit: int = 100, offset: int = 0,
@ -165,7 +124,7 @@ async def reticulum_messages(
return {"messages": msgs, "total": lxmf_bridge.get_total_count()} return {"messages": msgs, "total": lxmf_bridge.get_total_count()}
@app.post("/api/reticulum/messages") @app.post("/api/reticulum/messages", response_model=MessageOut)
async def reticulum_send_message( async def reticulum_send_message(
body: MessageIn, body: MessageIn,
x_bridge_api_key: Annotated[str, Header()] = "", x_bridge_api_key: Annotated[str, Header()] = "",
@ -181,120 +140,24 @@ async def reticulum_send_message(
return result return result
@app.get("/api/reticulum/messages/{msg_id}") # Keep old /api/status path as alias for backwards compat
async def reticulum_get_message(msg_id: str, x_bridge_api_key: Annotated[str, Header()] = ""): @app.get("/api/nodes", response_model=NodesResponse)
async def nodes_compat(x_bridge_api_key: Annotated[str, Header()] = ""):
verify_api_key(x_bridge_api_key) verify_api_key(x_bridge_api_key)
msg = lxmf_bridge.get_message(msg_id) node_list = reticulum_bridge.get_nodes()
if msg is None: return {"nodes": node_list, "total": len(node_list)}
raise HTTPException(status_code=404, detail="Message not found")
return msg
@app.get("/api/attachments/{filename}") @app.get("/api/topology", response_model=TopologyResponse)
async def get_attachment(filename: str, x_bridge_api_key: Annotated[str, Header()] = ""): async def topology_compat(x_bridge_api_key: Annotated[str, Header()] = ""):
verify_api_key(x_bridge_api_key) verify_api_key(x_bridge_api_key)
# Sanitize filename return reticulum_bridge.get_topology()
safe = "".join(c for c in filename if c.isalnum() or c in ".-_")
path = lxmf_bridge.get_attachment_path(safe)
if path is None:
raise HTTPException(status_code=404, detail="Attachment not found")
return FileResponse(path)
# ═══════════════════════════════════════════════════════════════════ @app.get("/api/identity", response_model=IdentityResponse)
# Propagation Node Management async def identity_compat(x_bridge_api_key: Annotated[str, Header()] = ""):
# ═══════════════════════════════════════════════════════════════════
@app.get("/api/propagation/status")
async def propagation_status(x_bridge_api_key: Annotated[str, Header()] = ""):
verify_api_key(x_bridge_api_key) verify_api_key(x_bridge_api_key)
return lxmf_bridge.get_propagation_status() return reticulum_bridge.get_identity_info()
@app.get("/api/propagation/nodes")
async def propagation_nodes(x_bridge_api_key: Annotated[str, Header()] = ""):
verify_api_key(x_bridge_api_key)
nodes = lxmf_bridge.get_propagation_nodes()
return {"nodes": nodes, "total": len(nodes)}
class PropagationNodeSet(BaseModel):
destination_hash: str
@app.post("/api/propagation/preferred")
async def set_preferred_propagation(body: PropagationNodeSet, x_bridge_api_key: Annotated[str, Header()] = ""):
verify_api_key(x_bridge_api_key)
ok = lxmf_bridge.set_preferred_propagation_node(body.destination_hash)
if not ok:
raise HTTPException(status_code=400, detail="Failed to set propagation node")
return {"status": "ok", "destination_hash": body.destination_hash}
@app.post("/api/propagation/sync")
async def sync_propagation(x_bridge_api_key: Annotated[str, Header()] = ""):
verify_api_key(x_bridge_api_key)
lxmf_bridge.sync_propagation_messages()
return {"status": "syncing"}
# ═══════════════════════════════════════════════════════════════════
# Audio Calls
# ═══════════════════════════════════════════════════════════════════
class CallInitiate(BaseModel):
destination_hash: str
@app.get("/api/calls")
async def list_calls(x_bridge_api_key: Annotated[str, Header()] = ""):
verify_api_key(x_bridge_api_key)
return {"calls": reticulum_bridge.get_active_calls()}
@app.post("/api/calls")
async def initiate_call(body: CallInitiate, x_bridge_api_key: Annotated[str, Header()] = ""):
verify_api_key(x_bridge_api_key)
result = reticulum_bridge.initiate_call(body.destination_hash)
if not result.get("call_id"):
raise HTTPException(status_code=400, detail=result.get("error", "Call failed"))
return result
class CallHangup(BaseModel):
call_id: str
@app.post("/api/calls/hangup")
async def hangup_call(body: CallHangup, x_bridge_api_key: Annotated[str, Header()] = ""):
verify_api_key(x_bridge_api_key)
reticulum_bridge.hangup_call(body.call_id)
return {"status": "ok"}
# ═══════════════════════════════════════════════════════════════════
# NomadNet Node Browser
# ═══════════════════════════════════════════════════════════════════
@app.get("/api/nomadnet/nodes")
async def nomadnet_nodes(x_bridge_api_key: Annotated[str, Header()] = ""):
verify_api_key(x_bridge_api_key)
nodes = reticulum_bridge.get_nomadnet_nodes()
return {"nodes": nodes, "total": len(nodes)}
class NomadNetBrowse(BaseModel):
destination_hash: str
path: str = "/"
@app.post("/api/nomadnet/browse")
async def nomadnet_browse(body: NomadNetBrowse, x_bridge_api_key: Annotated[str, Header()] = ""):
verify_api_key(x_bridge_api_key)
result = reticulum_bridge.browse_nomadnet_node(body.destination_hash, body.path)
if "error" in result:
raise HTTPException(status_code=400, detail=result["error"])
return result
# ═══════════════════════════════════════════════════════════════════ # ═══════════════════════════════════════════════════════════════════
@ -375,26 +238,3 @@ async def meshcore_advert(x_bridge_api_key: Annotated[str, Header()] = ""):
async def meshcore_stats(x_bridge_api_key: Annotated[str, Header()] = ""): async def meshcore_stats(x_bridge_api_key: Annotated[str, Header()] = ""):
verify_api_key(x_bridge_api_key) verify_api_key(x_bridge_api_key)
return await meshcore_bridge.get_device_stats() return await meshcore_bridge.get_device_stats()
# ═══════════════════════════════════════════════════════════════════
# Backwards Compatibility Aliases
# ═══════════════════════════════════════════════════════════════════
@app.get("/api/nodes")
async def nodes_compat(x_bridge_api_key: Annotated[str, Header()] = ""):
verify_api_key(x_bridge_api_key)
node_list = reticulum_bridge.get_nodes()
return {"nodes": node_list, "total": len(node_list)}
@app.get("/api/topology")
async def topology_compat(x_bridge_api_key: Annotated[str, Header()] = ""):
verify_api_key(x_bridge_api_key)
return reticulum_bridge.get_topology()
@app.get("/api/identity")
async def identity_compat(x_bridge_api_key: Annotated[str, Header()] = ""):
verify_api_key(x_bridge_api_key)
return reticulum_bridge.get_identity_info()

View File

@ -1,79 +0,0 @@
"""
rChats bridge relays LXMF and MeshCore messages to/from rChats.
Messages arriving over mesh are forwarded to rChats internal API.
Messages from rChats can be sent out over mesh.
"""
import json
import logging
import threading
import time
import httpx
from .config import RCHATS_BRIDGE_ENABLED, RCHATS_INTERNAL_URL
logger = logging.getLogger("rmesh.rchats_bridge")
def init():
"""Initialize the rChats bridge."""
if not RCHATS_BRIDGE_ENABLED:
logger.info("rChats bridge disabled")
return
logger.info("rChats bridge enabled — forwarding to %s", RCHATS_INTERNAL_URL)
def on_lxmf_event(event: dict):
"""Handle LXMF events and forward relevant ones to rChats."""
if not RCHATS_BRIDGE_ENABLED:
return
event_type = event.get("type", "")
if event_type == "lxmf.delivery":
msg = event.get("message", {})
_forward_to_rchats(msg, source="reticulum")
elif event_type == "meshcore.message":
msg = event.get("message", {})
_forward_to_rchats(msg, source="meshcore")
def _forward_to_rchats(msg: dict, source: str):
"""Forward a mesh message to rChats internal API."""
space_slug = msg.get("space_slug")
if not space_slug:
return # Only forward space-scoped messages
payload = {
"source": source,
"sender": msg.get("sender_hash", "") or msg.get("sender", ""),
"content": msg.get("content", ""),
"title": msg.get("title", ""),
"has_image": "image" in msg.get("fields", {}),
"has_audio": "audio" in msg.get("fields", {}),
"has_files": "file_attachments" in msg.get("fields", {}),
"space": space_slug,
"timestamp": msg.get("timestamp", time.time()),
}
threading.Thread(
target=_post_to_rchats, args=(space_slug, payload), daemon=True
).start()
def _post_to_rchats(space_slug: str, payload: dict):
"""POST message to rChats internal mesh-relay endpoint."""
try:
url = f"{RCHATS_INTERNAL_URL}/api/internal/mesh-relay"
with httpx.Client(timeout=10) as client:
response = client.post(url, json=payload)
if response.status_code < 400:
logger.info("Forwarded mesh message to rChats space '%s'", space_slug)
else:
logger.warning("rChats rejected message: %d %s", response.status_code, response.text[:100])
except httpx.ConnectError:
logger.debug("rChats not reachable at %s", RCHATS_INTERNAL_URL)
except Exception as e:
logger.warning("Failed to forward to rChats: %s", e)

View File

@ -1,6 +1,5 @@
""" """
Reticulum bridge singleton RNS instance, identity management, Reticulum bridge singleton RNS instance, identity management, topology queries.
topology queries, signal quality tracking, audio call support.
""" """
import time import time
@ -15,44 +14,17 @@ logger = logging.getLogger("rmesh.reticulum")
_rns_instance: RNS.Reticulum | None = None _rns_instance: RNS.Reticulum | None = None
_identity: RNS.Identity | None = None _identity: RNS.Identity | None = None
_destination: RNS.Destination | None = None _destination: RNS.Destination | None = None
_call_destination: RNS.Destination | None = None
_start_time: float = 0 _start_time: float = 0
_announced_destinations: dict[str, dict] = {} _announced_destinations: dict[str, dict] = {}
_lock = threading.Lock() _lock = threading.Lock()
_event_listeners: list = []
APP_NAME = "rmesh" APP_NAME = "rmesh"
ASPECT = "bridge" ASPECT = "bridge"
def _safe_decode(data) -> str | None:
"""Safely decode app_data which may be binary."""
if data is None:
return None
if isinstance(data, bytes):
try:
return data.decode("utf-8")
except UnicodeDecodeError:
return data.hex()
return str(data)
def register_event_listener(callback):
_event_listeners.append(callback)
def _emit_event(event_type: str, data: dict):
event = {"type": event_type, **data}
for listener in _event_listeners:
try:
listener(event)
except Exception:
pass
def init(): def init():
"""Initialize the Reticulum instance and server identity.""" """Initialize the Reticulum instance and server identity."""
global _rns_instance, _identity, _destination, _call_destination, _start_time global _rns_instance, _identity, _destination, _start_time
if _rns_instance is not None: if _rns_instance is not None:
return return
@ -71,275 +43,30 @@ def init():
_identity.to_file(identity_path) _identity.to_file(identity_path)
logger.info("Created new identity") logger.info("Created new identity")
# Bridge destination # Create a destination for this bridge
_destination = RNS.Destination( _destination = RNS.Destination(
_identity, RNS.Destination.IN, RNS.Destination.SINGLE, _identity, RNS.Destination.IN, RNS.Destination.SINGLE,
APP_NAME, ASPECT, APP_NAME, ASPECT,
) )
# Audio call destination # Register announce handler to track network
_call_destination = RNS.Destination( RNS.Transport.register_announce_handler(_announce_handler)
_identity, RNS.Destination.IN, RNS.Destination.SINGLE,
"call", "audio",
)
_call_destination.set_link_established_callback(_on_incoming_call_link)
# Register announce handlers with signal quality
RNS.Transport.register_announce_handler(_AnnounceHandler())
logger.info("Reticulum bridge ready — identity: %s", _identity.hexhash) logger.info("Reticulum bridge ready — identity: %s", _identity.hexhash)
class _AnnounceHandler: def _announce_handler(destination_hash, announced_identity, app_data):
"""Track announces with RSSI/SNR/quality metrics.""" """Callback when we hear an announce from another node."""
aspect_filter = None # Catch all announces
def received_announce(self, destination_hash, announced_identity, app_data, announce_packet_hash=None):
# Extract signal quality from the announce packet
rssi = None
snr = None
quality = None
# Try to get signal info from the packet
try:
if announce_packet_hash:
packet = RNS.Transport.packet_hashmap.get(announce_packet_hash)
if packet:
rssi = getattr(packet, "rssi", None)
snr = getattr(packet, "snr", None)
quality = getattr(packet, "q", None) or getattr(packet, "quality", None)
except Exception:
pass
dest_hex = destination_hash.hex()
node_data = {
"destination_hash": dest_hex,
"identity_hash": announced_identity.hexhash if announced_identity else "",
"app_data": _safe_decode(app_data),
"last_heard": time.time(),
"rssi": rssi,
"snr": snr,
"quality": quality,
}
with _lock: with _lock:
_announced_destinations[dest_hex] = node_data _announced_destinations[destination_hash.hex()] = {
"destination_hash": destination_hash.hex(),
_emit_event("announce", {"node": node_data}) "app_data": app_data.decode("utf-8") if app_data else None,
# ═══════════════════════════════════════════════════════════════════
# Audio Calls over Reticulum Links
# ═══════════════════════════════════════════════════════════════════
_active_calls: dict[str, dict] = {}
def _on_incoming_call_link(link):
"""Handle incoming audio call link."""
call_id = str(link.hash.hex()) if hasattr(link, "hash") else str(id(link))
_active_calls[call_id] = {
"id": call_id,
"link": link,
"direction": "inbound",
"started_at": time.time(),
"peer_hash": link.destination.hexhash if hasattr(link.destination, "hexhash") else "",
}
link.set_packet_callback(lambda msg, pkt: _on_call_audio_packet(call_id, msg))
link.set_link_closed_callback(lambda l: _on_call_ended(call_id))
logger.info("Incoming audio call: %s", call_id[:16])
_emit_event("call.incoming", {"call_id": call_id, "peer": _active_calls[call_id]["peer_hash"]})
def initiate_call(destination_hash_hex: str) -> dict:
"""Initiate an audio call to a destination."""
if _identity is None:
return {"call_id": "", "error": "No identity"}
try:
dest_hash = bytes.fromhex(destination_hash_hex)
dest_identity = RNS.Identity.recall(dest_hash)
if dest_identity is None:
RNS.Transport.request_path(dest_hash)
return {"call_id": "", "error": "Peer not found, path requested"}
dest = RNS.Destination(
dest_identity, RNS.Destination.OUT, RNS.Destination.SINGLE,
"call", "audio",
)
link = RNS.Link(dest)
call_id = str(id(link))
_active_calls[call_id] = {
"id": call_id,
"link": link,
"direction": "outbound",
"started_at": time.time(),
"peer_hash": destination_hash_hex,
}
link.set_link_established_callback(lambda l: _emit_event("call.established", {"call_id": call_id}))
link.set_packet_callback(lambda msg, pkt: _on_call_audio_packet(call_id, msg))
link.set_link_closed_callback(lambda l: _on_call_ended(call_id))
return {"call_id": call_id, "peer": destination_hash_hex}
except Exception as e:
return {"call_id": "", "error": str(e)}
def send_call_audio(call_id: str, audio_data: bytes) -> bool:
"""Send audio data over an active call link."""
call = _active_calls.get(call_id)
if not call or not call["link"]:
return False
try:
link = call["link"]
if link.status == RNS.Link.ACTIVE and len(audio_data) <= RNS.Link.MDU:
RNS.Packet(link, audio_data).send()
return True
except Exception:
pass
return False
def hangup_call(call_id: str):
"""End an active call."""
call = _active_calls.pop(call_id, None)
if call and call["link"]:
try:
call["link"].teardown()
except Exception:
pass
_emit_event("call.ended", {"call_id": call_id})
def get_active_calls() -> list[dict]:
"""Return active calls (without link objects)."""
return [{
"id": c["id"],
"direction": c["direction"],
"started_at": c["started_at"],
"peer_hash": c["peer_hash"],
"duration": time.time() - c["started_at"],
} for c in _active_calls.values()]
def _on_call_audio_packet(call_id: str, audio_data):
"""Handle received audio data from a call."""
_emit_event("call.audio", {"call_id": call_id, "size": len(audio_data) if audio_data else 0})
def _on_call_ended(call_id: str):
"""Handle call link closure."""
_active_calls.pop(call_id, None)
logger.info("Call ended: %s", call_id[:16])
_emit_event("call.ended", {"call_id": call_id})
def announce_call_capability():
"""Announce that this node can receive audio calls."""
if _call_destination:
_call_destination.announce(app_data=b"rMesh Audio")
return True
return False
# ═══════════════════════════════════════════════════════════════════
# NomadNet Node Browser
# ═══════════════════════════════════════════════════════════════════
_nomadnet_nodes: dict[str, dict] = {}
class _NomadNetAnnounceHandler:
"""Track NomadNet node announces."""
aspect_filter = "nomadnetwork.node"
def received_announce(self, destination_hash, announced_identity, app_data, announce_packet_hash=None):
dest_hex = destination_hash.hex()
node_name = ""
if app_data:
try:
node_name = app_data.decode("utf-8") if isinstance(app_data, bytes) else str(app_data)
except Exception:
pass
_nomadnet_nodes[dest_hex] = {
"destination_hash": dest_hex,
"name": node_name,
"identity_hash": announced_identity.hexhash if announced_identity else "",
"last_heard": time.time(), "last_heard": time.time(),
} }
_emit_event("nomadnet.announce", {"node": _nomadnet_nodes[dest_hex]})
def get_nomadnet_nodes() -> list[dict]:
"""Return discovered NomadNet nodes."""
return list(_nomadnet_nodes.values())
def browse_nomadnet_node(destination_hash_hex: str, page_path: str = "/") -> dict:
"""Request a page from a NomadNet node. Returns async — result via event."""
if _identity is None:
return {"error": "No identity"}
try:
dest_hash = bytes.fromhex(destination_hash_hex)
dest_identity = RNS.Identity.recall(dest_hash)
if dest_identity is None:
RNS.Transport.request_path(dest_hash)
return {"error": "Node not found, path requested"}
dest = RNS.Destination(
dest_identity, RNS.Destination.OUT, RNS.Destination.SINGLE,
"nomadnetwork", "node",
)
link = RNS.Link(dest)
def on_established(l):
# Request page
l.request(
page_path.encode("utf-8"),
response_callback=lambda receipt: _on_nomadnet_response(destination_hash_hex, page_path, receipt),
failed_callback=lambda receipt: _emit_event("nomadnet.page.error", {
"node": destination_hash_hex, "path": page_path, "error": "Request failed"
}),
)
link.set_link_established_callback(on_established)
return {"status": "requesting", "node": destination_hash_hex, "path": page_path}
except Exception as e:
return {"error": str(e)}
def _on_nomadnet_response(node_hash: str, page_path: str, receipt):
"""Handle NomadNet page response."""
try:
response = receipt.response
content = response.decode("utf-8") if isinstance(response, bytes) else str(response)
_emit_event("nomadnet.page.downloaded", {
"node": node_hash,
"path": page_path,
"content": content,
"size": len(content),
})
except Exception as e:
_emit_event("nomadnet.page.error", {
"node": node_hash, "path": page_path, "error": str(e)
})
# ═══════════════════════════════════════════════════════════════════
# Status / Query
# ═══════════════════════════════════════════════════════════════════
def get_status() -> dict: def get_status() -> dict:
"""Return transport status info."""
if _rns_instance is None: if _rns_instance is None:
return {"online": False, "transport_enabled": False, "identity_hash": "", return {"online": False, "transport_enabled": False, "identity_hash": "",
"uptime_seconds": 0, "announced_count": 0, "path_count": 0} "uptime_seconds": 0, "announced_count": 0, "path_count": 0}
@ -354,20 +81,21 @@ def get_status() -> dict:
"uptime_seconds": time.time() - _start_time, "uptime_seconds": time.time() - _start_time,
"announced_count": announced_count, "announced_count": announced_count,
"path_count": len(RNS.Transport.destinations) if hasattr(RNS.Transport, "destinations") else 0, "path_count": len(RNS.Transport.destinations) if hasattr(RNS.Transport, "destinations") else 0,
"active_calls": len(_active_calls),
"nomadnet_nodes": len(_nomadnet_nodes),
} }
def get_nodes() -> list[dict]: def get_nodes() -> list[dict]:
"""Return list of known announced destinations."""
with _lock: with _lock:
return list(_announced_destinations.values()) return list(_announced_destinations.values())
def get_topology() -> dict: def get_topology() -> dict:
"""Return nodes and links for visualization."""
nodes = get_nodes() nodes = get_nodes()
links = [] links = []
# Build links from destinations/path tables if available
dest_table = getattr(RNS.Transport, "destinations", {}) dest_table = getattr(RNS.Transport, "destinations", {})
if isinstance(dest_table, (dict, list)): if isinstance(dest_table, (dict, list)):
try: try:
@ -382,7 +110,7 @@ def get_topology() -> dict:
"active": True, "active": True,
}) })
except Exception: except Exception:
pass pass # Transport internals may vary between RNS versions
return { return {
"nodes": nodes, "nodes": nodes,
@ -393,6 +121,7 @@ def get_topology() -> dict:
def get_identity_info() -> dict: def get_identity_info() -> dict:
"""Return server identity information."""
if _identity is None: if _identity is None:
return {"identity_hash": "", "public_key_hex": ""} return {"identity_hash": "", "public_key_hex": ""}
return { return {
@ -402,6 +131,7 @@ def get_identity_info() -> dict:
def announce(): def announce():
"""Announce this bridge on the network."""
if _destination is None: if _destination is None:
return {"announced": False, "identity_hash": ""} return {"announced": False, "identity_hash": ""}
_destination.announce(app_data=b"rMesh Bridge") _destination.announce(app_data=b"rMesh Bridge")

View File

@ -1,63 +0,0 @@
"""
WebSocket manager broadcasts real-time events to connected clients.
"""
import asyncio
import json
import logging
from typing import Set
from fastapi import WebSocket
logger = logging.getLogger("rmesh.websocket")
_clients: Set[WebSocket] = set()
_loop: asyncio.AbstractEventLoop | None = None
def set_event_loop(loop: asyncio.AbstractEventLoop):
global _loop
_loop = loop
async def connect(websocket: WebSocket):
await websocket.accept()
_clients.add(websocket)
logger.info("WebSocket client connected (%d total)", len(_clients))
def disconnect(websocket: WebSocket):
_clients.discard(websocket)
logger.info("WebSocket client disconnected (%d remaining)", len(_clients))
async def broadcast(data: dict):
"""Send event to all connected WebSocket clients."""
if not _clients:
return
message = json.dumps(data)
dead = set()
for client in _clients:
try:
await client.send_text(message)
except Exception:
dead.add(client)
for client in dead:
_clients.discard(client)
def on_event(event: dict):
"""Synchronous callback that schedules broadcast on the event loop.
Use this as the event_listener for reticulum_bridge and lxmf_bridge."""
if _loop is None or not _clients:
return
try:
asyncio.run_coroutine_threadsafe(broadcast(event), _loop)
except Exception:
pass
def client_count() -> int:
return len(_clients)

View File

@ -4,5 +4,3 @@ fastapi>=0.115.0
uvicorn[standard]>=0.30.0 uvicorn[standard]>=0.30.0
pydantic>=2.0 pydantic>=2.0
meshcore>=2.3.0 meshcore>=2.3.0
httpx>=0.27.0
websockets>=12.0