Compare commits
1 Commits
| Author | SHA1 | Date |
|---|---|---|
|
|
b078d6896e |
|
|
@ -4,12 +4,23 @@ BRIDGE_API_KEY = os.environ.get("BRIDGE_API_KEY", "")
|
||||||
RNS_CONFIG_DIR = os.environ.get("RNS_CONFIG_DIR", "/data/reticulum")
|
RNS_CONFIG_DIR = os.environ.get("RNS_CONFIG_DIR", "/data/reticulum")
|
||||||
LXMF_STORAGE_DIR = os.environ.get("LXMF_STORAGE_DIR", "/data/lxmf")
|
LXMF_STORAGE_DIR = os.environ.get("LXMF_STORAGE_DIR", "/data/lxmf")
|
||||||
LXMF_MESSAGES_FILE = os.path.join(LXMF_STORAGE_DIR, "bridge_messages.json")
|
LXMF_MESSAGES_FILE = os.path.join(LXMF_STORAGE_DIR, "bridge_messages.json")
|
||||||
|
LXMF_ATTACHMENTS_DIR = os.path.join(LXMF_STORAGE_DIR, "attachments")
|
||||||
LOG_LEVEL = int(os.environ.get("RNS_LOG_LEVEL", "4"))
|
LOG_LEVEL = int(os.environ.get("RNS_LOG_LEVEL", "4"))
|
||||||
|
|
||||||
# MeshCore companion node connection
|
# MeshCore companion node connection
|
||||||
MESHCORE_ENABLED = os.environ.get("MESHCORE_ENABLED", "false").lower() == "true"
|
MESHCORE_ENABLED = os.environ.get("MESHCORE_ENABLED", "false").lower() == "true"
|
||||||
MESHCORE_HOST = os.environ.get("MESHCORE_HOST", "") # TCP host (WiFi companion)
|
MESHCORE_HOST = os.environ.get("MESHCORE_HOST", "")
|
||||||
MESHCORE_PORT = int(os.environ.get("MESHCORE_PORT", "5000"))
|
MESHCORE_PORT = int(os.environ.get("MESHCORE_PORT", "5000"))
|
||||||
MESHCORE_SERIAL = os.environ.get("MESHCORE_SERIAL", "") # Serial port (USB companion)
|
MESHCORE_SERIAL = os.environ.get("MESHCORE_SERIAL", "")
|
||||||
MESHCORE_DATA_DIR = os.environ.get("MESHCORE_DATA_DIR", "/data/meshcore")
|
MESHCORE_DATA_DIR = os.environ.get("MESHCORE_DATA_DIR", "/data/meshcore")
|
||||||
MESHCORE_MESSAGES_FILE = os.path.join(MESHCORE_DATA_DIR, "messages.json")
|
MESHCORE_MESSAGES_FILE = os.path.join(MESHCORE_DATA_DIR, "messages.json")
|
||||||
|
|
||||||
|
# Feature flags
|
||||||
|
AUTO_RESEND_ON_ANNOUNCE = os.environ.get("AUTO_RESEND_ON_ANNOUNCE", "true").lower() == "true"
|
||||||
|
LOCAL_PROPAGATION_NODE = os.environ.get("LOCAL_PROPAGATION_NODE", "true").lower() == "true"
|
||||||
|
PREFERRED_PROPAGATION_NODE = os.environ.get("PREFERRED_PROPAGATION_NODE", "")
|
||||||
|
PROPAGATION_SYNC_INTERVAL = int(os.environ.get("PROPAGATION_SYNC_INTERVAL", "300"))
|
||||||
|
|
||||||
|
# rChats bridge
|
||||||
|
RCHATS_BRIDGE_ENABLED = os.environ.get("RCHATS_BRIDGE_ENABLED", "false").lower() == "true"
|
||||||
|
RCHATS_INTERNAL_URL = os.environ.get("RCHATS_INTERNAL_URL", "http://rchats:3000")
|
||||||
|
|
|
||||||
|
|
@ -1,7 +1,9 @@
|
||||||
"""
|
"""
|
||||||
LXMF bridge — message send/receive via the LXMF protocol on Reticulum.
|
LXMF bridge — rich message handling, propagation node management,
|
||||||
|
auto-resend on announce, file/image/audio attachments.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
import base64
|
||||||
import json
|
import json
|
||||||
import time
|
import time
|
||||||
import uuid
|
import uuid
|
||||||
|
|
@ -12,7 +14,11 @@ import os
|
||||||
import RNS
|
import RNS
|
||||||
import LXMF
|
import LXMF
|
||||||
|
|
||||||
from .config import LXMF_STORAGE_DIR, LXMF_MESSAGES_FILE
|
from .config import (
|
||||||
|
LXMF_STORAGE_DIR, LXMF_MESSAGES_FILE, LXMF_ATTACHMENTS_DIR,
|
||||||
|
AUTO_RESEND_ON_ANNOUNCE, LOCAL_PROPAGATION_NODE,
|
||||||
|
PREFERRED_PROPAGATION_NODE, PROPAGATION_SYNC_INTERVAL,
|
||||||
|
)
|
||||||
|
|
||||||
logger = logging.getLogger("rmesh.lxmf")
|
logger = logging.getLogger("rmesh.lxmf")
|
||||||
|
|
||||||
|
|
@ -21,6 +27,9 @@ _local_delivery_destination: RNS.Destination | None = None
|
||||||
_identity: RNS.Identity | None = None
|
_identity: RNS.Identity | None = None
|
||||||
_messages: list[dict] = []
|
_messages: list[dict] = []
|
||||||
_lock = threading.Lock()
|
_lock = threading.Lock()
|
||||||
|
_event_listeners: list = [] # WebSocket broadcast callbacks
|
||||||
|
_propagation_nodes: dict[str, dict] = {}
|
||||||
|
_last_propagation_sync: float = 0
|
||||||
|
|
||||||
|
|
||||||
def init(identity: RNS.Identity):
|
def init(identity: RNS.Identity):
|
||||||
|
|
@ -30,16 +39,30 @@ def init(identity: RNS.Identity):
|
||||||
_identity = identity
|
_identity = identity
|
||||||
|
|
||||||
os.makedirs(LXMF_STORAGE_DIR, exist_ok=True)
|
os.makedirs(LXMF_STORAGE_DIR, exist_ok=True)
|
||||||
|
os.makedirs(LXMF_ATTACHMENTS_DIR, exist_ok=True)
|
||||||
|
|
||||||
_router = LXMF.LXMRouter(
|
_router = LXMF.LXMRouter(
|
||||||
identity=_identity,
|
identity=_identity,
|
||||||
storagepath=LXMF_STORAGE_DIR,
|
storagepath=LXMF_STORAGE_DIR,
|
||||||
)
|
)
|
||||||
|
|
||||||
# Enable propagation node
|
# Increase transfer limit for attachments (10 MB)
|
||||||
_router.enable_propagation()
|
_router.delivery_per_transfer_limit = 10000
|
||||||
|
|
||||||
# Register local delivery destination for receiving messages
|
# Enable local propagation node
|
||||||
|
if LOCAL_PROPAGATION_NODE:
|
||||||
|
_router.enable_propagation()
|
||||||
|
logger.info("Local LXMF propagation node enabled")
|
||||||
|
|
||||||
|
# Set preferred propagation node
|
||||||
|
if PREFERRED_PROPAGATION_NODE:
|
||||||
|
try:
|
||||||
|
_router.set_outbound_propagation_node(bytes.fromhex(PREFERRED_PROPAGATION_NODE))
|
||||||
|
logger.info("Set preferred propagation node: %s", PREFERRED_PROPAGATION_NODE[:16])
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning("Failed to set propagation node: %s", e)
|
||||||
|
|
||||||
|
# Register local delivery
|
||||||
_local_delivery_destination = _router.register_delivery_identity(
|
_local_delivery_destination = _router.register_delivery_identity(
|
||||||
_identity,
|
_identity,
|
||||||
display_name="rMesh Bridge",
|
display_name="rMesh Bridge",
|
||||||
|
|
@ -47,14 +70,152 @@ def init(identity: RNS.Identity):
|
||||||
|
|
||||||
_router.register_delivery_callback(_delivery_callback)
|
_router.register_delivery_callback(_delivery_callback)
|
||||||
|
|
||||||
# Load persisted messages
|
# Register announce handlers for auto-resend and propagation tracking
|
||||||
|
RNS.Transport.register_announce_handler(_LxmfAnnounceHandler())
|
||||||
|
RNS.Transport.register_announce_handler(_PropagationAnnounceHandler())
|
||||||
|
|
||||||
_load_messages()
|
_load_messages()
|
||||||
|
|
||||||
|
# Start propagation sync thread
|
||||||
|
if PROPAGATION_SYNC_INTERVAL > 0:
|
||||||
|
sync_thread = threading.Thread(target=_propagation_sync_loop, daemon=True)
|
||||||
|
sync_thread.start()
|
||||||
|
|
||||||
logger.info("LXMF bridge ready — delivery hash: %s", _identity.hexhash)
|
logger.info("LXMF bridge ready — delivery hash: %s", _identity.hexhash)
|
||||||
|
|
||||||
|
|
||||||
|
def register_event_listener(callback):
|
||||||
|
"""Register a callback for real-time events (WebSocket broadcast)."""
|
||||||
|
_event_listeners.append(callback)
|
||||||
|
|
||||||
|
|
||||||
|
def _emit_event(event_type: str, data: dict):
|
||||||
|
"""Emit event to all registered listeners."""
|
||||||
|
event = {"type": event_type, **data}
|
||||||
|
for listener in _event_listeners:
|
||||||
|
try:
|
||||||
|
listener(event)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
# ═══════════════════════════════════════════════════════════════════
|
||||||
|
# LXMF Rich Field Parsing
|
||||||
|
# ═══════════════════════════════════════════════════════════════════
|
||||||
|
|
||||||
|
def _parse_lxmf_fields(lxmf_message) -> dict:
|
||||||
|
"""Extract all LXMF fields from a message into a serializable dict."""
|
||||||
|
fields = {}
|
||||||
|
|
||||||
|
try:
|
||||||
|
lxmf_fields = lxmf_message.get_fields()
|
||||||
|
except Exception:
|
||||||
|
return fields
|
||||||
|
|
||||||
|
# Image attachment
|
||||||
|
if LXMF.FIELD_IMAGE in lxmf_fields:
|
||||||
|
try:
|
||||||
|
image_data = lxmf_fields[LXMF.FIELD_IMAGE]
|
||||||
|
image_type = image_data[0] if len(image_data) > 0 else "unknown"
|
||||||
|
image_bytes = image_data[1] if len(image_data) > 1 else b""
|
||||||
|
if image_bytes:
|
||||||
|
attachment_id = str(uuid.uuid4())
|
||||||
|
ext = "jpg" if "jpeg" in str(image_type).lower() or "jpg" in str(image_type).lower() else "png"
|
||||||
|
filename = f"{attachment_id}.{ext}"
|
||||||
|
filepath = os.path.join(LXMF_ATTACHMENTS_DIR, filename)
|
||||||
|
with open(filepath, "wb") as f:
|
||||||
|
f.write(image_bytes if isinstance(image_bytes, bytes) else bytes(image_bytes))
|
||||||
|
fields["image"] = {
|
||||||
|
"type": str(image_type),
|
||||||
|
"filename": filename,
|
||||||
|
"size": len(image_bytes),
|
||||||
|
"base64": base64.b64encode(image_bytes if isinstance(image_bytes, bytes) else bytes(image_bytes)).decode("ascii")[:200] + "...",
|
||||||
|
}
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning("Failed to parse image field: %s", e)
|
||||||
|
|
||||||
|
# File attachments
|
||||||
|
if LXMF.FIELD_FILE_ATTACHMENTS in lxmf_fields:
|
||||||
|
try:
|
||||||
|
file_list = lxmf_fields[LXMF.FIELD_FILE_ATTACHMENTS]
|
||||||
|
attachments = []
|
||||||
|
for file_data in file_list:
|
||||||
|
file_name = file_data[0] if len(file_data) > 0 else "unknown"
|
||||||
|
file_bytes = file_data[1] if len(file_data) > 1 else b""
|
||||||
|
if file_bytes:
|
||||||
|
safe_name = "".join(c for c in str(file_name) if c.isalnum() or c in ".-_")
|
||||||
|
attachment_id = str(uuid.uuid4())
|
||||||
|
stored_name = f"{attachment_id}_{safe_name}"
|
||||||
|
filepath = os.path.join(LXMF_ATTACHMENTS_DIR, stored_name)
|
||||||
|
with open(filepath, "wb") as f:
|
||||||
|
f.write(file_bytes if isinstance(file_bytes, bytes) else bytes(file_bytes))
|
||||||
|
attachments.append({
|
||||||
|
"name": str(file_name),
|
||||||
|
"stored_name": stored_name,
|
||||||
|
"size": len(file_bytes),
|
||||||
|
})
|
||||||
|
if attachments:
|
||||||
|
fields["file_attachments"] = attachments
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning("Failed to parse file attachments: %s", e)
|
||||||
|
|
||||||
|
# Audio message (Codec2)
|
||||||
|
if LXMF.FIELD_AUDIO in lxmf_fields:
|
||||||
|
try:
|
||||||
|
audio_data = lxmf_fields[LXMF.FIELD_AUDIO]
|
||||||
|
audio_mode = audio_data[0] if len(audio_data) > 0 else 0
|
||||||
|
audio_bytes = audio_data[1] if len(audio_data) > 1 else b""
|
||||||
|
if audio_bytes:
|
||||||
|
attachment_id = str(uuid.uuid4())
|
||||||
|
filename = f"{attachment_id}.c2"
|
||||||
|
filepath = os.path.join(LXMF_ATTACHMENTS_DIR, filename)
|
||||||
|
with open(filepath, "wb") as f:
|
||||||
|
f.write(audio_bytes if isinstance(audio_bytes, bytes) else bytes(audio_bytes))
|
||||||
|
fields["audio"] = {
|
||||||
|
"codec": "codec2",
|
||||||
|
"mode": audio_mode,
|
||||||
|
"filename": filename,
|
||||||
|
"size": len(audio_bytes),
|
||||||
|
}
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning("Failed to parse audio field: %s", e)
|
||||||
|
|
||||||
|
# Icon appearance
|
||||||
|
if LXMF.FIELD_ICON_APPEARANCE in lxmf_fields:
|
||||||
|
try:
|
||||||
|
icon_data = lxmf_fields[LXMF.FIELD_ICON_APPEARANCE]
|
||||||
|
fields["icon"] = {
|
||||||
|
"name": str(icon_data[0]) if len(icon_data) > 0 else "",
|
||||||
|
"foreground": "#" + icon_data[1].hex() if len(icon_data) > 1 and hasattr(icon_data[1], "hex") else "",
|
||||||
|
"background": "#" + icon_data[2].hex() if len(icon_data) > 2 and hasattr(icon_data[2], "hex") else "",
|
||||||
|
}
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning("Failed to parse icon field: %s", e)
|
||||||
|
|
||||||
|
# Commands
|
||||||
|
if LXMF.FIELD_COMMANDS in lxmf_fields:
|
||||||
|
try:
|
||||||
|
fields["commands"] = [str(cmd) for cmd in lxmf_fields[LXMF.FIELD_COMMANDS]]
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
return fields
|
||||||
|
|
||||||
|
|
||||||
|
# ═══════════════════════════════════════════════════════════════════
|
||||||
|
# Message Handling
|
||||||
|
# ═══════════════════════════════════════════════════════════════════
|
||||||
|
|
||||||
def _delivery_callback(message):
|
def _delivery_callback(message):
|
||||||
"""Handle incoming LXMF message."""
|
"""Handle incoming LXMF message with rich field extraction."""
|
||||||
|
# Parse rich fields
|
||||||
|
fields = _parse_lxmf_fields(message)
|
||||||
|
|
||||||
|
# Get signal quality if available
|
||||||
|
rssi = getattr(message, "rssi", None)
|
||||||
|
snr = getattr(message, "snr", None)
|
||||||
|
quality = getattr(message, "quality", None)
|
||||||
|
|
||||||
msg_record = {
|
msg_record = {
|
||||||
"id": str(uuid.uuid4()),
|
"id": str(uuid.uuid4()),
|
||||||
"direction": "inbound",
|
"direction": "inbound",
|
||||||
|
|
@ -62,6 +223,10 @@ def _delivery_callback(message):
|
||||||
"recipient_hash": _identity.hexhash if _identity else "",
|
"recipient_hash": _identity.hexhash if _identity else "",
|
||||||
"title": message.title.decode("utf-8") if isinstance(message.title, bytes) else str(message.title or ""),
|
"title": message.title.decode("utf-8") if isinstance(message.title, bytes) else str(message.title or ""),
|
||||||
"content": message.content.decode("utf-8") if isinstance(message.content, bytes) else str(message.content or ""),
|
"content": message.content.decode("utf-8") if isinstance(message.content, bytes) else str(message.content or ""),
|
||||||
|
"fields": fields,
|
||||||
|
"rssi": rssi,
|
||||||
|
"snr": snr,
|
||||||
|
"quality": quality,
|
||||||
"status": "delivered",
|
"status": "delivered",
|
||||||
"timestamp": time.time(),
|
"timestamp": time.time(),
|
||||||
}
|
}
|
||||||
|
|
@ -70,11 +235,19 @@ def _delivery_callback(message):
|
||||||
_messages.append(msg_record)
|
_messages.append(msg_record)
|
||||||
_save_messages()
|
_save_messages()
|
||||||
|
|
||||||
logger.info("Received LXMF message from %s", msg_record["sender_hash"][:16])
|
logger.info("Received LXMF message from %s (fields: %s)",
|
||||||
|
msg_record["sender_hash"][:16],
|
||||||
|
list(fields.keys()) if fields else "none")
|
||||||
|
|
||||||
|
_emit_event("lxmf.delivery", {"message": msg_record})
|
||||||
|
|
||||||
|
|
||||||
def send_message(destination_hash_hex: str, content: str, title: str = "") -> dict:
|
def send_message(destination_hash_hex: str, content: str, title: str = "",
|
||||||
"""Send an LXMF message to a destination hash."""
|
image_bytes: bytes = None, image_type: str = "image/jpeg",
|
||||||
|
file_attachments: list = None,
|
||||||
|
audio_bytes: bytes = None, audio_mode: int = 0,
|
||||||
|
try_propagation_on_fail: bool = True) -> dict:
|
||||||
|
"""Send an LXMF message with optional attachments."""
|
||||||
if _router is None or _identity is None:
|
if _router is None or _identity is None:
|
||||||
return {"id": "", "status": "failed"}
|
return {"id": "", "status": "failed"}
|
||||||
|
|
||||||
|
|
@ -83,72 +256,82 @@ def send_message(destination_hash_hex: str, content: str, title: str = "") -> di
|
||||||
except ValueError:
|
except ValueError:
|
||||||
return {"id": "", "status": "failed"}
|
return {"id": "", "status": "failed"}
|
||||||
|
|
||||||
# Look up identity for destination
|
|
||||||
dest_identity = RNS.Identity.recall(dest_hash)
|
dest_identity = RNS.Identity.recall(dest_hash)
|
||||||
|
|
||||||
msg_id = str(uuid.uuid4())
|
msg_id = str(uuid.uuid4())
|
||||||
|
|
||||||
if dest_identity is None:
|
if dest_identity is None:
|
||||||
# Request path first, the message may be deliverable later
|
|
||||||
RNS.Transport.request_path(dest_hash)
|
RNS.Transport.request_path(dest_hash)
|
||||||
msg_record = {
|
msg_record = {
|
||||||
"id": msg_id,
|
"id": msg_id, "direction": "outbound",
|
||||||
"direction": "outbound",
|
"sender_hash": _identity.hexhash, "recipient_hash": destination_hash_hex,
|
||||||
"sender_hash": _identity.hexhash,
|
"title": title, "content": content, "fields": {},
|
||||||
"recipient_hash": destination_hash_hex,
|
"status": "pending", "timestamp": time.time(),
|
||||||
"title": title,
|
|
||||||
"content": content,
|
|
||||||
"status": "pending",
|
|
||||||
"timestamp": time.time(),
|
|
||||||
}
|
}
|
||||||
with _lock:
|
with _lock:
|
||||||
_messages.append(msg_record)
|
_messages.append(msg_record)
|
||||||
_save_messages()
|
_save_messages()
|
||||||
return msg_record
|
return msg_record
|
||||||
|
|
||||||
# Create destination for the recipient
|
|
||||||
dest = RNS.Destination(
|
dest = RNS.Destination(
|
||||||
dest_identity, RNS.Destination.OUT, RNS.Destination.SINGLE,
|
dest_identity, RNS.Destination.OUT, RNS.Destination.SINGLE,
|
||||||
"lxmf", "delivery",
|
"lxmf", "delivery",
|
||||||
)
|
)
|
||||||
|
|
||||||
lxm = LXMF.LXMessage(
|
lxm = LXMF.LXMessage(
|
||||||
dest,
|
dest, _router.get_delivery_destination(),
|
||||||
_router.get_delivery_destination(),
|
|
||||||
content.encode("utf-8") if isinstance(content, str) else content,
|
content.encode("utf-8") if isinstance(content, str) else content,
|
||||||
title=title.encode("utf-8") if isinstance(title, str) else title,
|
title=title.encode("utf-8") if isinstance(title, str) else title,
|
||||||
desired_method=LXMF.LXMessage.DIRECT,
|
desired_method=LXMF.LXMessage.DIRECT,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Add rich fields
|
||||||
|
if image_bytes:
|
||||||
|
lxm.fields[LXMF.FIELD_IMAGE] = [image_type, image_bytes]
|
||||||
|
if file_attachments:
|
||||||
|
lxm.fields[LXMF.FIELD_FILE_ATTACHMENTS] = [
|
||||||
|
[att["name"], att["bytes"]] for att in file_attachments
|
||||||
|
]
|
||||||
|
if audio_bytes:
|
||||||
|
lxm.fields[LXMF.FIELD_AUDIO] = [audio_mode, audio_bytes]
|
||||||
|
|
||||||
|
# Enable propagation fallback
|
||||||
|
lxm.try_propagation_on_fail = try_propagation_on_fail
|
||||||
|
|
||||||
msg_record = {
|
msg_record = {
|
||||||
"id": msg_id,
|
"id": msg_id, "direction": "outbound",
|
||||||
"direction": "outbound",
|
"sender_hash": _identity.hexhash, "recipient_hash": destination_hash_hex,
|
||||||
"sender_hash": _identity.hexhash,
|
"title": title, "content": content, "fields": {},
|
||||||
"recipient_hash": destination_hash_hex,
|
"status": "pending", "timestamp": time.time(),
|
||||||
"title": title,
|
|
||||||
"content": content,
|
|
||||||
"status": "pending",
|
|
||||||
"timestamp": time.time(),
|
|
||||||
}
|
}
|
||||||
|
|
||||||
def _delivery_callback_outbound(message):
|
def _on_delivered(message):
|
||||||
with _lock:
|
with _lock:
|
||||||
for m in _messages:
|
for m in _messages:
|
||||||
if m["id"] == msg_id:
|
if m["id"] == msg_id:
|
||||||
m["status"] = "delivered"
|
m["status"] = "delivered"
|
||||||
break
|
break
|
||||||
_save_messages()
|
_save_messages()
|
||||||
|
_emit_event("lxmf.state_updated", {"message_id": msg_id, "status": "delivered"})
|
||||||
|
|
||||||
|
def _on_failed(message):
|
||||||
|
# Try propagation node if direct failed
|
||||||
|
if getattr(message, "try_propagation_on_fail", False) and _router.outbound_propagation_node:
|
||||||
|
logger.info("Direct delivery failed, trying propagation node for %s", destination_hash_hex[:16])
|
||||||
|
message.desired_method = LXMF.LXMessage.PROPAGATED
|
||||||
|
message.try_propagation_on_fail = False
|
||||||
|
_router.handle_outbound(message)
|
||||||
|
return
|
||||||
|
|
||||||
def _failed_callback(message):
|
|
||||||
with _lock:
|
with _lock:
|
||||||
for m in _messages:
|
for m in _messages:
|
||||||
if m["id"] == msg_id:
|
if m["id"] == msg_id:
|
||||||
m["status"] = "failed"
|
m["status"] = "failed"
|
||||||
break
|
break
|
||||||
_save_messages()
|
_save_messages()
|
||||||
|
_emit_event("lxmf.state_updated", {"message_id": msg_id, "status": "failed"})
|
||||||
|
|
||||||
lxm.delivery_callback = _delivery_callback_outbound
|
lxm.delivery_callback = _on_delivered
|
||||||
lxm.failed_callback = _failed_callback
|
lxm.failed_callback = _on_failed
|
||||||
|
|
||||||
_router.handle_outbound(lxm)
|
_router.handle_outbound(lxm)
|
||||||
|
|
||||||
|
|
@ -156,18 +339,159 @@ def send_message(destination_hash_hex: str, content: str, title: str = "") -> di
|
||||||
_messages.append(msg_record)
|
_messages.append(msg_record)
|
||||||
_save_messages()
|
_save_messages()
|
||||||
|
|
||||||
|
_emit_event("lxmf.created", {"message": msg_record})
|
||||||
return msg_record
|
return msg_record
|
||||||
|
|
||||||
|
|
||||||
|
# ═══════════════════════════════════════════════════════════════════
|
||||||
|
# Auto-Resend Failed Messages on Announce
|
||||||
|
# ═══════════════════════════════════════════════════════════════════
|
||||||
|
|
||||||
|
class _LxmfAnnounceHandler:
|
||||||
|
"""Re-attempt failed messages when a peer announces."""
|
||||||
|
aspect_filter = "lxmf.delivery"
|
||||||
|
|
||||||
|
def received_announce(self, destination_hash, announced_identity, app_data, announce_packet_hash=None):
|
||||||
|
if not AUTO_RESEND_ON_ANNOUNCE:
|
||||||
|
return
|
||||||
|
|
||||||
|
dest_hex = destination_hash.hex()
|
||||||
|
resend_count = 0
|
||||||
|
|
||||||
|
with _lock:
|
||||||
|
for msg in _messages:
|
||||||
|
if (msg["status"] == "failed" and
|
||||||
|
msg["direction"] == "outbound" and
|
||||||
|
msg["recipient_hash"] == dest_hex):
|
||||||
|
msg["status"] = "pending"
|
||||||
|
resend_count += 1
|
||||||
|
# Re-send in background
|
||||||
|
threading.Thread(
|
||||||
|
target=_resend_message, args=(msg,), daemon=True
|
||||||
|
).start()
|
||||||
|
|
||||||
|
if resend_count > 0:
|
||||||
|
_save_messages()
|
||||||
|
|
||||||
|
if resend_count > 0:
|
||||||
|
logger.info("Auto-resending %d failed messages to %s", resend_count, dest_hex[:16])
|
||||||
|
|
||||||
|
|
||||||
|
def _resend_message(msg_record: dict):
|
||||||
|
"""Resend a previously failed message."""
|
||||||
|
time.sleep(1) # Brief delay to let path establish
|
||||||
|
result = send_message(
|
||||||
|
msg_record["recipient_hash"],
|
||||||
|
msg_record["content"],
|
||||||
|
msg_record.get("title", ""),
|
||||||
|
)
|
||||||
|
if result.get("status") == "pending":
|
||||||
|
# Remove the duplicate — original is being retried
|
||||||
|
with _lock:
|
||||||
|
_messages[:] = [m for m in _messages if m["id"] != result["id"]]
|
||||||
|
_save_messages()
|
||||||
|
|
||||||
|
|
||||||
|
# ═══════════════════════════════════════════════════════════════════
|
||||||
|
# Propagation Node Management
|
||||||
|
# ═══════════════════════════════════════════════════════════════════
|
||||||
|
|
||||||
|
class _PropagationAnnounceHandler:
|
||||||
|
"""Track LXMF propagation node announces."""
|
||||||
|
aspect_filter = "lxmf.propagation"
|
||||||
|
|
||||||
|
def received_announce(self, destination_hash, announced_identity, app_data, announce_packet_hash=None):
|
||||||
|
dest_hex = destination_hash.hex()
|
||||||
|
node_info = {
|
||||||
|
"destination_hash": dest_hex,
|
||||||
|
"identity_hash": announced_identity.hexhash if announced_identity else "",
|
||||||
|
"last_heard": time.time(),
|
||||||
|
"enabled": True,
|
||||||
|
"per_transfer_limit": 0,
|
||||||
|
}
|
||||||
|
|
||||||
|
# Parse propagation node app data
|
||||||
|
if app_data:
|
||||||
|
try:
|
||||||
|
import RNS.vendor.umsgpack as msgpack
|
||||||
|
data = msgpack.unpackb(app_data)
|
||||||
|
if isinstance(data, dict):
|
||||||
|
node_info["enabled"] = data.get("enabled", True)
|
||||||
|
node_info["per_transfer_limit"] = data.get("per_transfer_limit", 0)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
_propagation_nodes[dest_hex] = node_info
|
||||||
|
_emit_event("propagation.announce", {"node": node_info})
|
||||||
|
|
||||||
|
|
||||||
|
def get_propagation_nodes() -> list[dict]:
|
||||||
|
"""Return list of known propagation nodes."""
|
||||||
|
return list(_propagation_nodes.values())
|
||||||
|
|
||||||
|
|
||||||
|
def set_preferred_propagation_node(destination_hash_hex: str) -> bool:
|
||||||
|
"""Set the preferred outbound propagation node."""
|
||||||
|
if _router is None:
|
||||||
|
return False
|
||||||
|
try:
|
||||||
|
if destination_hash_hex:
|
||||||
|
_router.set_outbound_propagation_node(bytes.fromhex(destination_hash_hex))
|
||||||
|
else:
|
||||||
|
_router.outbound_propagation_node = None
|
||||||
|
return True
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning("Failed to set propagation node: %s", e)
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def get_propagation_status() -> dict:
|
||||||
|
"""Return propagation node status."""
|
||||||
|
if _router is None:
|
||||||
|
return {"enabled": False, "outbound_node": None, "transfer_state": "idle"}
|
||||||
|
|
||||||
|
outbound = _router.get_outbound_propagation_node()
|
||||||
|
return {
|
||||||
|
"local_enabled": LOCAL_PROPAGATION_NODE,
|
||||||
|
"outbound_node": outbound.hex() if outbound else None,
|
||||||
|
"transfer_state": str(getattr(_router, "propagation_transfer_state", "unknown")),
|
||||||
|
"known_nodes": len(_propagation_nodes),
|
||||||
|
"last_sync": _last_propagation_sync,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def sync_propagation_messages():
|
||||||
|
"""Request messages from the propagation node."""
|
||||||
|
global _last_propagation_sync
|
||||||
|
if _router is None or _identity is None:
|
||||||
|
return
|
||||||
|
try:
|
||||||
|
_router.request_messages_from_propagation_node(_identity)
|
||||||
|
_last_propagation_sync = time.time()
|
||||||
|
logger.info("Syncing messages from propagation node")
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning("Propagation sync failed: %s", e)
|
||||||
|
|
||||||
|
|
||||||
|
def _propagation_sync_loop():
|
||||||
|
"""Background thread to periodically sync propagation messages."""
|
||||||
|
while True:
|
||||||
|
time.sleep(PROPAGATION_SYNC_INTERVAL)
|
||||||
|
if _router and _router.get_outbound_propagation_node():
|
||||||
|
sync_propagation_messages()
|
||||||
|
|
||||||
|
|
||||||
|
# ═══════════════════════════════════════════════════════════════════
|
||||||
|
# Query / Persistence
|
||||||
|
# ═══════════════════════════════════════════════════════════════════
|
||||||
|
|
||||||
def get_messages(limit: int = 100, offset: int = 0) -> list[dict]:
|
def get_messages(limit: int = 100, offset: int = 0) -> list[dict]:
|
||||||
"""Return stored messages."""
|
|
||||||
with _lock:
|
with _lock:
|
||||||
sorted_msgs = sorted(_messages, key=lambda m: m["timestamp"], reverse=True)
|
sorted_msgs = sorted(_messages, key=lambda m: m["timestamp"], reverse=True)
|
||||||
return sorted_msgs[offset:offset + limit]
|
return sorted_msgs[offset:offset + limit]
|
||||||
|
|
||||||
|
|
||||||
def get_message(msg_id: str) -> dict | None:
|
def get_message(msg_id: str) -> dict | None:
|
||||||
"""Return a single message by ID."""
|
|
||||||
with _lock:
|
with _lock:
|
||||||
for m in _messages:
|
for m in _messages:
|
||||||
if m["id"] == msg_id:
|
if m["id"] == msg_id:
|
||||||
|
|
@ -176,13 +500,19 @@ def get_message(msg_id: str) -> dict | None:
|
||||||
|
|
||||||
|
|
||||||
def get_total_count() -> int:
|
def get_total_count() -> int:
|
||||||
"""Return total message count."""
|
|
||||||
with _lock:
|
with _lock:
|
||||||
return len(_messages)
|
return len(_messages)
|
||||||
|
|
||||||
|
|
||||||
|
def get_attachment_path(filename: str) -> str | None:
|
||||||
|
"""Return full path to an attachment file if it exists."""
|
||||||
|
filepath = os.path.join(LXMF_ATTACHMENTS_DIR, filename)
|
||||||
|
if os.path.exists(filepath):
|
||||||
|
return filepath
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
def _load_messages():
|
def _load_messages():
|
||||||
"""Load messages from persistent storage."""
|
|
||||||
global _messages
|
global _messages
|
||||||
if os.path.exists(LXMF_MESSAGES_FILE):
|
if os.path.exists(LXMF_MESSAGES_FILE):
|
||||||
try:
|
try:
|
||||||
|
|
@ -195,7 +525,6 @@ def _load_messages():
|
||||||
|
|
||||||
|
|
||||||
def _save_messages():
|
def _save_messages():
|
||||||
"""Persist messages to storage (call with _lock held)."""
|
|
||||||
try:
|
try:
|
||||||
os.makedirs(os.path.dirname(LXMF_MESSAGES_FILE), exist_ok=True)
|
os.makedirs(os.path.dirname(LXMF_MESSAGES_FILE), exist_ok=True)
|
||||||
with open(LXMF_MESSAGES_FILE, "w") as f:
|
with open(LXMF_MESSAGES_FILE, "w") as f:
|
||||||
|
|
|
||||||
214
app/main.py
214
app/main.py
|
|
@ -1,18 +1,19 @@
|
||||||
"""
|
"""
|
||||||
rMesh Bridge — FastAPI service exposing Reticulum (Internet backbone)
|
rMesh Bridge — Unified REST + WebSocket API for Reticulum (Internet backbone),
|
||||||
and MeshCore (LoRa mesh) as a unified REST API.
|
MeshCore (LoRa mesh), LXMF messaging, audio calls, and NomadNet browsing.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
import asyncio
|
||||||
import logging
|
import logging
|
||||||
from contextlib import asynccontextmanager
|
from contextlib import asynccontextmanager
|
||||||
from typing import Optional
|
|
||||||
|
|
||||||
from fastapi import FastAPI, HTTPException, Header
|
from fastapi import FastAPI, HTTPException, Header, WebSocket, WebSocketDisconnect
|
||||||
|
from fastapi.responses import FileResponse
|
||||||
from pydantic import BaseModel
|
from pydantic import BaseModel
|
||||||
from typing import Annotated
|
from typing import Annotated, Optional
|
||||||
|
|
||||||
from . import reticulum_bridge, lxmf_bridge, meshcore_bridge
|
from . import reticulum_bridge, lxmf_bridge, meshcore_bridge, websocket_manager, rchats_bridge
|
||||||
from .config import BRIDGE_API_KEY, MESHCORE_ENABLED
|
from .config import BRIDGE_API_KEY, MESHCORE_ENABLED, RCHATS_BRIDGE_ENABLED
|
||||||
from .models import (
|
from .models import (
|
||||||
StatusResponse, NodesResponse, TopologyResponse,
|
StatusResponse, NodesResponse, TopologyResponse,
|
||||||
MessageIn, MessageOut, MessagesResponse,
|
MessageIn, MessageOut, MessagesResponse,
|
||||||
|
|
@ -25,9 +26,22 @@ logger = logging.getLogger("rmesh.api")
|
||||||
|
|
||||||
@asynccontextmanager
|
@asynccontextmanager
|
||||||
async def lifespan(app: FastAPI):
|
async def lifespan(app: FastAPI):
|
||||||
"""Initialize Reticulum and MeshCore on startup."""
|
"""Initialize all subsystems on startup."""
|
||||||
logger.info("Starting rMesh bridge...")
|
logger.info("Starting rMesh bridge...")
|
||||||
|
|
||||||
|
# Set up WebSocket event loop
|
||||||
|
loop = asyncio.get_event_loop()
|
||||||
|
websocket_manager.set_event_loop(loop)
|
||||||
|
|
||||||
|
# Register WebSocket broadcast as event listener for all subsystems
|
||||||
|
reticulum_bridge.register_event_listener(websocket_manager.on_event)
|
||||||
|
lxmf_bridge.register_event_listener(websocket_manager.on_event)
|
||||||
|
|
||||||
|
# Register rChats bridge as event listener
|
||||||
|
if RCHATS_BRIDGE_ENABLED:
|
||||||
|
lxmf_bridge.register_event_listener(rchats_bridge.on_lxmf_event)
|
||||||
|
rchats_bridge.init()
|
||||||
|
|
||||||
# Reticulum (Internet backbone)
|
# Reticulum (Internet backbone)
|
||||||
reticulum_bridge.init()
|
reticulum_bridge.init()
|
||||||
from .reticulum_bridge import _identity
|
from .reticulum_bridge import _identity
|
||||||
|
|
@ -36,6 +50,7 @@ async def lifespan(app: FastAPI):
|
||||||
else:
|
else:
|
||||||
logger.error("No Reticulum identity available for LXMF")
|
logger.error("No Reticulum identity available for LXMF")
|
||||||
reticulum_bridge.announce()
|
reticulum_bridge.announce()
|
||||||
|
reticulum_bridge.announce_call_capability()
|
||||||
|
|
||||||
# MeshCore (LoRa mesh)
|
# MeshCore (LoRa mesh)
|
||||||
if MESHCORE_ENABLED:
|
if MESHCORE_ENABLED:
|
||||||
|
|
@ -60,6 +75,25 @@ def verify_api_key(x_bridge_api_key: Annotated[str, Header()] = ""):
|
||||||
raise HTTPException(status_code=401, detail="Invalid API key")
|
raise HTTPException(status_code=401, detail="Invalid API key")
|
||||||
|
|
||||||
|
|
||||||
|
# ═══════════════════════════════════════════════════════════════════
|
||||||
|
# WebSocket (real-time events)
|
||||||
|
# ═══════════════════════════════════════════════════════════════════
|
||||||
|
|
||||||
|
@app.websocket("/ws")
|
||||||
|
async def websocket_endpoint(websocket: WebSocket):
|
||||||
|
await websocket_manager.connect(websocket)
|
||||||
|
try:
|
||||||
|
while True:
|
||||||
|
data = await websocket.receive_text()
|
||||||
|
# Handle ping/pong
|
||||||
|
if data == "ping":
|
||||||
|
await websocket.send_text('{"type":"pong"}')
|
||||||
|
except WebSocketDisconnect:
|
||||||
|
websocket_manager.disconnect(websocket)
|
||||||
|
except Exception:
|
||||||
|
websocket_manager.disconnect(websocket)
|
||||||
|
|
||||||
|
|
||||||
# ═══════════════════════════════════════════════════════════════════
|
# ═══════════════════════════════════════════════════════════════════
|
||||||
# Health & Combined Status
|
# Health & Combined Status
|
||||||
# ═══════════════════════════════════════════════════════════════════
|
# ═══════════════════════════════════════════════════════════════════
|
||||||
|
|
@ -72,15 +106,18 @@ async def health():
|
||||||
@app.get("/api/status")
|
@app.get("/api/status")
|
||||||
async def combined_status(x_bridge_api_key: Annotated[str, Header()] = ""):
|
async def combined_status(x_bridge_api_key: Annotated[str, Header()] = ""):
|
||||||
verify_api_key(x_bridge_api_key)
|
verify_api_key(x_bridge_api_key)
|
||||||
result = {
|
return {
|
||||||
"reticulum": reticulum_bridge.get_status(),
|
"reticulum": reticulum_bridge.get_status(),
|
||||||
"meshcore": meshcore_bridge.get_status() if MESHCORE_ENABLED else {"connected": False, "device_info": {}, "contact_count": 0, "message_count": 0},
|
"meshcore": meshcore_bridge.get_status() if MESHCORE_ENABLED else {
|
||||||
|
"connected": False, "device_info": {}, "contact_count": 0, "message_count": 0
|
||||||
|
},
|
||||||
|
"propagation": lxmf_bridge.get_propagation_status(),
|
||||||
|
"websocket_clients": websocket_manager.client_count(),
|
||||||
}
|
}
|
||||||
return result
|
|
||||||
|
|
||||||
|
|
||||||
# ═══════════════════════════════════════════════════════════════════
|
# ═══════════════════════════════════════════════════════════════════
|
||||||
# Reticulum Endpoints (Internet backbone)
|
# Reticulum Endpoints
|
||||||
# ═══════════════════════════════════════════════════════════════════
|
# ═══════════════════════════════════════════════════════════════════
|
||||||
|
|
||||||
@app.get("/api/reticulum/status", response_model=StatusResponse)
|
@app.get("/api/reticulum/status", response_model=StatusResponse)
|
||||||
|
|
@ -89,14 +126,14 @@ async def reticulum_status(x_bridge_api_key: Annotated[str, Header()] = ""):
|
||||||
return reticulum_bridge.get_status()
|
return reticulum_bridge.get_status()
|
||||||
|
|
||||||
|
|
||||||
@app.get("/api/reticulum/nodes", response_model=NodesResponse)
|
@app.get("/api/reticulum/nodes")
|
||||||
async def reticulum_nodes(x_bridge_api_key: Annotated[str, Header()] = ""):
|
async def reticulum_nodes(x_bridge_api_key: Annotated[str, Header()] = ""):
|
||||||
verify_api_key(x_bridge_api_key)
|
verify_api_key(x_bridge_api_key)
|
||||||
node_list = reticulum_bridge.get_nodes()
|
node_list = reticulum_bridge.get_nodes()
|
||||||
return {"nodes": node_list, "total": len(node_list)}
|
return {"nodes": node_list, "total": len(node_list)}
|
||||||
|
|
||||||
|
|
||||||
@app.get("/api/reticulum/topology", response_model=TopologyResponse)
|
@app.get("/api/reticulum/topology")
|
||||||
async def reticulum_topology(x_bridge_api_key: Annotated[str, Header()] = ""):
|
async def reticulum_topology(x_bridge_api_key: Annotated[str, Header()] = ""):
|
||||||
verify_api_key(x_bridge_api_key)
|
verify_api_key(x_bridge_api_key)
|
||||||
return reticulum_bridge.get_topology()
|
return reticulum_bridge.get_topology()
|
||||||
|
|
@ -114,7 +151,11 @@ async def reticulum_identity(x_bridge_api_key: Annotated[str, Header()] = ""):
|
||||||
return reticulum_bridge.get_identity_info()
|
return reticulum_bridge.get_identity_info()
|
||||||
|
|
||||||
|
|
||||||
@app.get("/api/reticulum/messages", response_model=MessagesResponse)
|
# ═══════════════════════════════════════════════════════════════════
|
||||||
|
# LXMF Messages (with rich fields)
|
||||||
|
# ═══════════════════════════════════════════════════════════════════
|
||||||
|
|
||||||
|
@app.get("/api/reticulum/messages")
|
||||||
async def reticulum_messages(
|
async def reticulum_messages(
|
||||||
x_bridge_api_key: Annotated[str, Header()] = "",
|
x_bridge_api_key: Annotated[str, Header()] = "",
|
||||||
limit: int = 100, offset: int = 0,
|
limit: int = 100, offset: int = 0,
|
||||||
|
|
@ -124,7 +165,7 @@ async def reticulum_messages(
|
||||||
return {"messages": msgs, "total": lxmf_bridge.get_total_count()}
|
return {"messages": msgs, "total": lxmf_bridge.get_total_count()}
|
||||||
|
|
||||||
|
|
||||||
@app.post("/api/reticulum/messages", response_model=MessageOut)
|
@app.post("/api/reticulum/messages")
|
||||||
async def reticulum_send_message(
|
async def reticulum_send_message(
|
||||||
body: MessageIn,
|
body: MessageIn,
|
||||||
x_bridge_api_key: Annotated[str, Header()] = "",
|
x_bridge_api_key: Annotated[str, Header()] = "",
|
||||||
|
|
@ -140,24 +181,120 @@ async def reticulum_send_message(
|
||||||
return result
|
return result
|
||||||
|
|
||||||
|
|
||||||
# Keep old /api/status path as alias for backwards compat
|
@app.get("/api/reticulum/messages/{msg_id}")
|
||||||
@app.get("/api/nodes", response_model=NodesResponse)
|
async def reticulum_get_message(msg_id: str, x_bridge_api_key: Annotated[str, Header()] = ""):
|
||||||
async def nodes_compat(x_bridge_api_key: Annotated[str, Header()] = ""):
|
|
||||||
verify_api_key(x_bridge_api_key)
|
verify_api_key(x_bridge_api_key)
|
||||||
node_list = reticulum_bridge.get_nodes()
|
msg = lxmf_bridge.get_message(msg_id)
|
||||||
return {"nodes": node_list, "total": len(node_list)}
|
if msg is None:
|
||||||
|
raise HTTPException(status_code=404, detail="Message not found")
|
||||||
|
return msg
|
||||||
|
|
||||||
|
|
||||||
@app.get("/api/topology", response_model=TopologyResponse)
|
@app.get("/api/attachments/{filename}")
|
||||||
async def topology_compat(x_bridge_api_key: Annotated[str, Header()] = ""):
|
async def get_attachment(filename: str, x_bridge_api_key: Annotated[str, Header()] = ""):
|
||||||
verify_api_key(x_bridge_api_key)
|
verify_api_key(x_bridge_api_key)
|
||||||
return reticulum_bridge.get_topology()
|
# Sanitize filename
|
||||||
|
safe = "".join(c for c in filename if c.isalnum() or c in ".-_")
|
||||||
|
path = lxmf_bridge.get_attachment_path(safe)
|
||||||
|
if path is None:
|
||||||
|
raise HTTPException(status_code=404, detail="Attachment not found")
|
||||||
|
return FileResponse(path)
|
||||||
|
|
||||||
|
|
||||||
@app.get("/api/identity", response_model=IdentityResponse)
|
# ═══════════════════════════════════════════════════════════════════
|
||||||
async def identity_compat(x_bridge_api_key: Annotated[str, Header()] = ""):
|
# Propagation Node Management
|
||||||
|
# ═══════════════════════════════════════════════════════════════════
|
||||||
|
|
||||||
|
@app.get("/api/propagation/status")
|
||||||
|
async def propagation_status(x_bridge_api_key: Annotated[str, Header()] = ""):
|
||||||
verify_api_key(x_bridge_api_key)
|
verify_api_key(x_bridge_api_key)
|
||||||
return reticulum_bridge.get_identity_info()
|
return lxmf_bridge.get_propagation_status()
|
||||||
|
|
||||||
|
|
||||||
|
@app.get("/api/propagation/nodes")
|
||||||
|
async def propagation_nodes(x_bridge_api_key: Annotated[str, Header()] = ""):
|
||||||
|
verify_api_key(x_bridge_api_key)
|
||||||
|
nodes = lxmf_bridge.get_propagation_nodes()
|
||||||
|
return {"nodes": nodes, "total": len(nodes)}
|
||||||
|
|
||||||
|
|
||||||
|
class PropagationNodeSet(BaseModel):
|
||||||
|
destination_hash: str
|
||||||
|
|
||||||
|
|
||||||
|
@app.post("/api/propagation/preferred")
|
||||||
|
async def set_preferred_propagation(body: PropagationNodeSet, x_bridge_api_key: Annotated[str, Header()] = ""):
|
||||||
|
verify_api_key(x_bridge_api_key)
|
||||||
|
ok = lxmf_bridge.set_preferred_propagation_node(body.destination_hash)
|
||||||
|
if not ok:
|
||||||
|
raise HTTPException(status_code=400, detail="Failed to set propagation node")
|
||||||
|
return {"status": "ok", "destination_hash": body.destination_hash}
|
||||||
|
|
||||||
|
|
||||||
|
@app.post("/api/propagation/sync")
|
||||||
|
async def sync_propagation(x_bridge_api_key: Annotated[str, Header()] = ""):
|
||||||
|
verify_api_key(x_bridge_api_key)
|
||||||
|
lxmf_bridge.sync_propagation_messages()
|
||||||
|
return {"status": "syncing"}
|
||||||
|
|
||||||
|
|
||||||
|
# ═══════════════════════════════════════════════════════════════════
|
||||||
|
# Audio Calls
|
||||||
|
# ═══════════════════════════════════════════════════════════════════
|
||||||
|
|
||||||
|
class CallInitiate(BaseModel):
|
||||||
|
destination_hash: str
|
||||||
|
|
||||||
|
|
||||||
|
@app.get("/api/calls")
|
||||||
|
async def list_calls(x_bridge_api_key: Annotated[str, Header()] = ""):
|
||||||
|
verify_api_key(x_bridge_api_key)
|
||||||
|
return {"calls": reticulum_bridge.get_active_calls()}
|
||||||
|
|
||||||
|
|
||||||
|
@app.post("/api/calls")
|
||||||
|
async def initiate_call(body: CallInitiate, x_bridge_api_key: Annotated[str, Header()] = ""):
|
||||||
|
verify_api_key(x_bridge_api_key)
|
||||||
|
result = reticulum_bridge.initiate_call(body.destination_hash)
|
||||||
|
if not result.get("call_id"):
|
||||||
|
raise HTTPException(status_code=400, detail=result.get("error", "Call failed"))
|
||||||
|
return result
|
||||||
|
|
||||||
|
|
||||||
|
class CallHangup(BaseModel):
|
||||||
|
call_id: str
|
||||||
|
|
||||||
|
|
||||||
|
@app.post("/api/calls/hangup")
|
||||||
|
async def hangup_call(body: CallHangup, x_bridge_api_key: Annotated[str, Header()] = ""):
|
||||||
|
verify_api_key(x_bridge_api_key)
|
||||||
|
reticulum_bridge.hangup_call(body.call_id)
|
||||||
|
return {"status": "ok"}
|
||||||
|
|
||||||
|
|
||||||
|
# ═══════════════════════════════════════════════════════════════════
|
||||||
|
# NomadNet Node Browser
|
||||||
|
# ═══════════════════════════════════════════════════════════════════
|
||||||
|
|
||||||
|
@app.get("/api/nomadnet/nodes")
|
||||||
|
async def nomadnet_nodes(x_bridge_api_key: Annotated[str, Header()] = ""):
|
||||||
|
verify_api_key(x_bridge_api_key)
|
||||||
|
nodes = reticulum_bridge.get_nomadnet_nodes()
|
||||||
|
return {"nodes": nodes, "total": len(nodes)}
|
||||||
|
|
||||||
|
|
||||||
|
class NomadNetBrowse(BaseModel):
|
||||||
|
destination_hash: str
|
||||||
|
path: str = "/"
|
||||||
|
|
||||||
|
|
||||||
|
@app.post("/api/nomadnet/browse")
|
||||||
|
async def nomadnet_browse(body: NomadNetBrowse, x_bridge_api_key: Annotated[str, Header()] = ""):
|
||||||
|
verify_api_key(x_bridge_api_key)
|
||||||
|
result = reticulum_bridge.browse_nomadnet_node(body.destination_hash, body.path)
|
||||||
|
if "error" in result:
|
||||||
|
raise HTTPException(status_code=400, detail=result["error"])
|
||||||
|
return result
|
||||||
|
|
||||||
|
|
||||||
# ═══════════════════════════════════════════════════════════════════
|
# ═══════════════════════════════════════════════════════════════════
|
||||||
|
|
@ -238,3 +375,26 @@ async def meshcore_advert(x_bridge_api_key: Annotated[str, Header()] = ""):
|
||||||
async def meshcore_stats(x_bridge_api_key: Annotated[str, Header()] = ""):
|
async def meshcore_stats(x_bridge_api_key: Annotated[str, Header()] = ""):
|
||||||
verify_api_key(x_bridge_api_key)
|
verify_api_key(x_bridge_api_key)
|
||||||
return await meshcore_bridge.get_device_stats()
|
return await meshcore_bridge.get_device_stats()
|
||||||
|
|
||||||
|
|
||||||
|
# ═══════════════════════════════════════════════════════════════════
|
||||||
|
# Backwards Compatibility Aliases
|
||||||
|
# ═══════════════════════════════════════════════════════════════════
|
||||||
|
|
||||||
|
@app.get("/api/nodes")
|
||||||
|
async def nodes_compat(x_bridge_api_key: Annotated[str, Header()] = ""):
|
||||||
|
verify_api_key(x_bridge_api_key)
|
||||||
|
node_list = reticulum_bridge.get_nodes()
|
||||||
|
return {"nodes": node_list, "total": len(node_list)}
|
||||||
|
|
||||||
|
|
||||||
|
@app.get("/api/topology")
|
||||||
|
async def topology_compat(x_bridge_api_key: Annotated[str, Header()] = ""):
|
||||||
|
verify_api_key(x_bridge_api_key)
|
||||||
|
return reticulum_bridge.get_topology()
|
||||||
|
|
||||||
|
|
||||||
|
@app.get("/api/identity")
|
||||||
|
async def identity_compat(x_bridge_api_key: Annotated[str, Header()] = ""):
|
||||||
|
verify_api_key(x_bridge_api_key)
|
||||||
|
return reticulum_bridge.get_identity_info()
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,79 @@
|
||||||
|
"""
|
||||||
|
rChats bridge — relays LXMF and MeshCore messages to/from rChats.
|
||||||
|
Messages arriving over mesh are forwarded to rChats internal API.
|
||||||
|
Messages from rChats can be sent out over mesh.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import json
|
||||||
|
import logging
|
||||||
|
import threading
|
||||||
|
import time
|
||||||
|
|
||||||
|
import httpx
|
||||||
|
|
||||||
|
from .config import RCHATS_BRIDGE_ENABLED, RCHATS_INTERNAL_URL
|
||||||
|
|
||||||
|
logger = logging.getLogger("rmesh.rchats_bridge")
|
||||||
|
|
||||||
|
|
||||||
|
def init():
|
||||||
|
"""Initialize the rChats bridge."""
|
||||||
|
if not RCHATS_BRIDGE_ENABLED:
|
||||||
|
logger.info("rChats bridge disabled")
|
||||||
|
return
|
||||||
|
logger.info("rChats bridge enabled — forwarding to %s", RCHATS_INTERNAL_URL)
|
||||||
|
|
||||||
|
|
||||||
|
def on_lxmf_event(event: dict):
|
||||||
|
"""Handle LXMF events and forward relevant ones to rChats."""
|
||||||
|
if not RCHATS_BRIDGE_ENABLED:
|
||||||
|
return
|
||||||
|
|
||||||
|
event_type = event.get("type", "")
|
||||||
|
|
||||||
|
if event_type == "lxmf.delivery":
|
||||||
|
msg = event.get("message", {})
|
||||||
|
_forward_to_rchats(msg, source="reticulum")
|
||||||
|
|
||||||
|
elif event_type == "meshcore.message":
|
||||||
|
msg = event.get("message", {})
|
||||||
|
_forward_to_rchats(msg, source="meshcore")
|
||||||
|
|
||||||
|
|
||||||
|
def _forward_to_rchats(msg: dict, source: str):
|
||||||
|
"""Forward a mesh message to rChats internal API."""
|
||||||
|
space_slug = msg.get("space_slug")
|
||||||
|
if not space_slug:
|
||||||
|
return # Only forward space-scoped messages
|
||||||
|
|
||||||
|
payload = {
|
||||||
|
"source": source,
|
||||||
|
"sender": msg.get("sender_hash", "") or msg.get("sender", ""),
|
||||||
|
"content": msg.get("content", ""),
|
||||||
|
"title": msg.get("title", ""),
|
||||||
|
"has_image": "image" in msg.get("fields", {}),
|
||||||
|
"has_audio": "audio" in msg.get("fields", {}),
|
||||||
|
"has_files": "file_attachments" in msg.get("fields", {}),
|
||||||
|
"space": space_slug,
|
||||||
|
"timestamp": msg.get("timestamp", time.time()),
|
||||||
|
}
|
||||||
|
|
||||||
|
threading.Thread(
|
||||||
|
target=_post_to_rchats, args=(space_slug, payload), daemon=True
|
||||||
|
).start()
|
||||||
|
|
||||||
|
|
||||||
|
def _post_to_rchats(space_slug: str, payload: dict):
|
||||||
|
"""POST message to rChats internal mesh-relay endpoint."""
|
||||||
|
try:
|
||||||
|
url = f"{RCHATS_INTERNAL_URL}/api/internal/mesh-relay"
|
||||||
|
with httpx.Client(timeout=10) as client:
|
||||||
|
response = client.post(url, json=payload)
|
||||||
|
if response.status_code < 400:
|
||||||
|
logger.info("Forwarded mesh message to rChats space '%s'", space_slug)
|
||||||
|
else:
|
||||||
|
logger.warning("rChats rejected message: %d %s", response.status_code, response.text[:100])
|
||||||
|
except httpx.ConnectError:
|
||||||
|
logger.debug("rChats not reachable at %s", RCHATS_INTERNAL_URL)
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning("Failed to forward to rChats: %s", e)
|
||||||
|
|
@ -1,5 +1,6 @@
|
||||||
"""
|
"""
|
||||||
Reticulum bridge — singleton RNS instance, identity management, topology queries.
|
Reticulum bridge — singleton RNS instance, identity management,
|
||||||
|
topology queries, signal quality tracking, audio call support.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import time
|
import time
|
||||||
|
|
@ -14,17 +15,44 @@ logger = logging.getLogger("rmesh.reticulum")
|
||||||
_rns_instance: RNS.Reticulum | None = None
|
_rns_instance: RNS.Reticulum | None = None
|
||||||
_identity: RNS.Identity | None = None
|
_identity: RNS.Identity | None = None
|
||||||
_destination: RNS.Destination | None = None
|
_destination: RNS.Destination | None = None
|
||||||
|
_call_destination: RNS.Destination | None = None
|
||||||
_start_time: float = 0
|
_start_time: float = 0
|
||||||
_announced_destinations: dict[str, dict] = {}
|
_announced_destinations: dict[str, dict] = {}
|
||||||
_lock = threading.Lock()
|
_lock = threading.Lock()
|
||||||
|
_event_listeners: list = []
|
||||||
|
|
||||||
APP_NAME = "rmesh"
|
APP_NAME = "rmesh"
|
||||||
ASPECT = "bridge"
|
ASPECT = "bridge"
|
||||||
|
|
||||||
|
|
||||||
|
def _safe_decode(data) -> str | None:
|
||||||
|
"""Safely decode app_data which may be binary."""
|
||||||
|
if data is None:
|
||||||
|
return None
|
||||||
|
if isinstance(data, bytes):
|
||||||
|
try:
|
||||||
|
return data.decode("utf-8")
|
||||||
|
except UnicodeDecodeError:
|
||||||
|
return data.hex()
|
||||||
|
return str(data)
|
||||||
|
|
||||||
|
|
||||||
|
def register_event_listener(callback):
|
||||||
|
_event_listeners.append(callback)
|
||||||
|
|
||||||
|
|
||||||
|
def _emit_event(event_type: str, data: dict):
|
||||||
|
event = {"type": event_type, **data}
|
||||||
|
for listener in _event_listeners:
|
||||||
|
try:
|
||||||
|
listener(event)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
def init():
|
def init():
|
||||||
"""Initialize the Reticulum instance and server identity."""
|
"""Initialize the Reticulum instance and server identity."""
|
||||||
global _rns_instance, _identity, _destination, _start_time
|
global _rns_instance, _identity, _destination, _call_destination, _start_time
|
||||||
|
|
||||||
if _rns_instance is not None:
|
if _rns_instance is not None:
|
||||||
return
|
return
|
||||||
|
|
@ -43,30 +71,275 @@ def init():
|
||||||
_identity.to_file(identity_path)
|
_identity.to_file(identity_path)
|
||||||
logger.info("Created new identity")
|
logger.info("Created new identity")
|
||||||
|
|
||||||
# Create a destination for this bridge
|
# Bridge destination
|
||||||
_destination = RNS.Destination(
|
_destination = RNS.Destination(
|
||||||
_identity, RNS.Destination.IN, RNS.Destination.SINGLE,
|
_identity, RNS.Destination.IN, RNS.Destination.SINGLE,
|
||||||
APP_NAME, ASPECT,
|
APP_NAME, ASPECT,
|
||||||
)
|
)
|
||||||
|
|
||||||
# Register announce handler to track network
|
# Audio call destination
|
||||||
RNS.Transport.register_announce_handler(_announce_handler)
|
_call_destination = RNS.Destination(
|
||||||
|
_identity, RNS.Destination.IN, RNS.Destination.SINGLE,
|
||||||
|
"call", "audio",
|
||||||
|
)
|
||||||
|
_call_destination.set_link_established_callback(_on_incoming_call_link)
|
||||||
|
|
||||||
|
# Register announce handlers with signal quality
|
||||||
|
RNS.Transport.register_announce_handler(_AnnounceHandler())
|
||||||
|
|
||||||
logger.info("Reticulum bridge ready — identity: %s", _identity.hexhash)
|
logger.info("Reticulum bridge ready — identity: %s", _identity.hexhash)
|
||||||
|
|
||||||
|
|
||||||
def _announce_handler(destination_hash, announced_identity, app_data):
|
class _AnnounceHandler:
|
||||||
"""Callback when we hear an announce from another node."""
|
"""Track announces with RSSI/SNR/quality metrics."""
|
||||||
with _lock:
|
aspect_filter = None # Catch all announces
|
||||||
_announced_destinations[destination_hash.hex()] = {
|
|
||||||
"destination_hash": destination_hash.hex(),
|
def received_announce(self, destination_hash, announced_identity, app_data, announce_packet_hash=None):
|
||||||
"app_data": app_data.decode("utf-8") if app_data else None,
|
# Extract signal quality from the announce packet
|
||||||
|
rssi = None
|
||||||
|
snr = None
|
||||||
|
quality = None
|
||||||
|
|
||||||
|
# Try to get signal info from the packet
|
||||||
|
try:
|
||||||
|
if announce_packet_hash:
|
||||||
|
packet = RNS.Transport.packet_hashmap.get(announce_packet_hash)
|
||||||
|
if packet:
|
||||||
|
rssi = getattr(packet, "rssi", None)
|
||||||
|
snr = getattr(packet, "snr", None)
|
||||||
|
quality = getattr(packet, "q", None) or getattr(packet, "quality", None)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
dest_hex = destination_hash.hex()
|
||||||
|
node_data = {
|
||||||
|
"destination_hash": dest_hex,
|
||||||
|
"identity_hash": announced_identity.hexhash if announced_identity else "",
|
||||||
|
"app_data": _safe_decode(app_data),
|
||||||
"last_heard": time.time(),
|
"last_heard": time.time(),
|
||||||
|
"rssi": rssi,
|
||||||
|
"snr": snr,
|
||||||
|
"quality": quality,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
with _lock:
|
||||||
|
_announced_destinations[dest_hex] = node_data
|
||||||
|
|
||||||
|
_emit_event("announce", {"node": node_data})
|
||||||
|
|
||||||
|
|
||||||
|
# ═══════════════════════════════════════════════════════════════════
|
||||||
|
# Audio Calls over Reticulum Links
|
||||||
|
# ═══════════════════════════════════════════════════════════════════
|
||||||
|
|
||||||
|
_active_calls: dict[str, dict] = {}
|
||||||
|
|
||||||
|
|
||||||
|
def _on_incoming_call_link(link):
|
||||||
|
"""Handle incoming audio call link."""
|
||||||
|
call_id = str(link.hash.hex()) if hasattr(link, "hash") else str(id(link))
|
||||||
|
_active_calls[call_id] = {
|
||||||
|
"id": call_id,
|
||||||
|
"link": link,
|
||||||
|
"direction": "inbound",
|
||||||
|
"started_at": time.time(),
|
||||||
|
"peer_hash": link.destination.hexhash if hasattr(link.destination, "hexhash") else "",
|
||||||
|
}
|
||||||
|
|
||||||
|
link.set_packet_callback(lambda msg, pkt: _on_call_audio_packet(call_id, msg))
|
||||||
|
link.set_link_closed_callback(lambda l: _on_call_ended(call_id))
|
||||||
|
|
||||||
|
logger.info("Incoming audio call: %s", call_id[:16])
|
||||||
|
_emit_event("call.incoming", {"call_id": call_id, "peer": _active_calls[call_id]["peer_hash"]})
|
||||||
|
|
||||||
|
|
||||||
|
def initiate_call(destination_hash_hex: str) -> dict:
|
||||||
|
"""Initiate an audio call to a destination."""
|
||||||
|
if _identity is None:
|
||||||
|
return {"call_id": "", "error": "No identity"}
|
||||||
|
|
||||||
|
try:
|
||||||
|
dest_hash = bytes.fromhex(destination_hash_hex)
|
||||||
|
dest_identity = RNS.Identity.recall(dest_hash)
|
||||||
|
if dest_identity is None:
|
||||||
|
RNS.Transport.request_path(dest_hash)
|
||||||
|
return {"call_id": "", "error": "Peer not found, path requested"}
|
||||||
|
|
||||||
|
dest = RNS.Destination(
|
||||||
|
dest_identity, RNS.Destination.OUT, RNS.Destination.SINGLE,
|
||||||
|
"call", "audio",
|
||||||
|
)
|
||||||
|
|
||||||
|
link = RNS.Link(dest)
|
||||||
|
call_id = str(id(link))
|
||||||
|
|
||||||
|
_active_calls[call_id] = {
|
||||||
|
"id": call_id,
|
||||||
|
"link": link,
|
||||||
|
"direction": "outbound",
|
||||||
|
"started_at": time.time(),
|
||||||
|
"peer_hash": destination_hash_hex,
|
||||||
|
}
|
||||||
|
|
||||||
|
link.set_link_established_callback(lambda l: _emit_event("call.established", {"call_id": call_id}))
|
||||||
|
link.set_packet_callback(lambda msg, pkt: _on_call_audio_packet(call_id, msg))
|
||||||
|
link.set_link_closed_callback(lambda l: _on_call_ended(call_id))
|
||||||
|
|
||||||
|
return {"call_id": call_id, "peer": destination_hash_hex}
|
||||||
|
except Exception as e:
|
||||||
|
return {"call_id": "", "error": str(e)}
|
||||||
|
|
||||||
|
|
||||||
|
def send_call_audio(call_id: str, audio_data: bytes) -> bool:
|
||||||
|
"""Send audio data over an active call link."""
|
||||||
|
call = _active_calls.get(call_id)
|
||||||
|
if not call or not call["link"]:
|
||||||
|
return False
|
||||||
|
|
||||||
|
try:
|
||||||
|
link = call["link"]
|
||||||
|
if link.status == RNS.Link.ACTIVE and len(audio_data) <= RNS.Link.MDU:
|
||||||
|
RNS.Packet(link, audio_data).send()
|
||||||
|
return True
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def hangup_call(call_id: str):
|
||||||
|
"""End an active call."""
|
||||||
|
call = _active_calls.pop(call_id, None)
|
||||||
|
if call and call["link"]:
|
||||||
|
try:
|
||||||
|
call["link"].teardown()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
_emit_event("call.ended", {"call_id": call_id})
|
||||||
|
|
||||||
|
|
||||||
|
def get_active_calls() -> list[dict]:
|
||||||
|
"""Return active calls (without link objects)."""
|
||||||
|
return [{
|
||||||
|
"id": c["id"],
|
||||||
|
"direction": c["direction"],
|
||||||
|
"started_at": c["started_at"],
|
||||||
|
"peer_hash": c["peer_hash"],
|
||||||
|
"duration": time.time() - c["started_at"],
|
||||||
|
} for c in _active_calls.values()]
|
||||||
|
|
||||||
|
|
||||||
|
def _on_call_audio_packet(call_id: str, audio_data):
|
||||||
|
"""Handle received audio data from a call."""
|
||||||
|
_emit_event("call.audio", {"call_id": call_id, "size": len(audio_data) if audio_data else 0})
|
||||||
|
|
||||||
|
|
||||||
|
def _on_call_ended(call_id: str):
|
||||||
|
"""Handle call link closure."""
|
||||||
|
_active_calls.pop(call_id, None)
|
||||||
|
logger.info("Call ended: %s", call_id[:16])
|
||||||
|
_emit_event("call.ended", {"call_id": call_id})
|
||||||
|
|
||||||
|
|
||||||
|
def announce_call_capability():
|
||||||
|
"""Announce that this node can receive audio calls."""
|
||||||
|
if _call_destination:
|
||||||
|
_call_destination.announce(app_data=b"rMesh Audio")
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
# ═══════════════════════════════════════════════════════════════════
|
||||||
|
# NomadNet Node Browser
|
||||||
|
# ═══════════════════════════════════════════════════════════════════
|
||||||
|
|
||||||
|
_nomadnet_nodes: dict[str, dict] = {}
|
||||||
|
|
||||||
|
|
||||||
|
class _NomadNetAnnounceHandler:
|
||||||
|
"""Track NomadNet node announces."""
|
||||||
|
aspect_filter = "nomadnetwork.node"
|
||||||
|
|
||||||
|
def received_announce(self, destination_hash, announced_identity, app_data, announce_packet_hash=None):
|
||||||
|
dest_hex = destination_hash.hex()
|
||||||
|
node_name = ""
|
||||||
|
if app_data:
|
||||||
|
try:
|
||||||
|
node_name = app_data.decode("utf-8") if isinstance(app_data, bytes) else str(app_data)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
_nomadnet_nodes[dest_hex] = {
|
||||||
|
"destination_hash": dest_hex,
|
||||||
|
"name": node_name,
|
||||||
|
"identity_hash": announced_identity.hexhash if announced_identity else "",
|
||||||
|
"last_heard": time.time(),
|
||||||
|
}
|
||||||
|
_emit_event("nomadnet.announce", {"node": _nomadnet_nodes[dest_hex]})
|
||||||
|
|
||||||
|
|
||||||
|
def get_nomadnet_nodes() -> list[dict]:
|
||||||
|
"""Return discovered NomadNet nodes."""
|
||||||
|
return list(_nomadnet_nodes.values())
|
||||||
|
|
||||||
|
|
||||||
|
def browse_nomadnet_node(destination_hash_hex: str, page_path: str = "/") -> dict:
|
||||||
|
"""Request a page from a NomadNet node. Returns async — result via event."""
|
||||||
|
if _identity is None:
|
||||||
|
return {"error": "No identity"}
|
||||||
|
|
||||||
|
try:
|
||||||
|
dest_hash = bytes.fromhex(destination_hash_hex)
|
||||||
|
dest_identity = RNS.Identity.recall(dest_hash)
|
||||||
|
if dest_identity is None:
|
||||||
|
RNS.Transport.request_path(dest_hash)
|
||||||
|
return {"error": "Node not found, path requested"}
|
||||||
|
|
||||||
|
dest = RNS.Destination(
|
||||||
|
dest_identity, RNS.Destination.OUT, RNS.Destination.SINGLE,
|
||||||
|
"nomadnetwork", "node",
|
||||||
|
)
|
||||||
|
|
||||||
|
link = RNS.Link(dest)
|
||||||
|
|
||||||
|
def on_established(l):
|
||||||
|
# Request page
|
||||||
|
l.request(
|
||||||
|
page_path.encode("utf-8"),
|
||||||
|
response_callback=lambda receipt: _on_nomadnet_response(destination_hash_hex, page_path, receipt),
|
||||||
|
failed_callback=lambda receipt: _emit_event("nomadnet.page.error", {
|
||||||
|
"node": destination_hash_hex, "path": page_path, "error": "Request failed"
|
||||||
|
}),
|
||||||
|
)
|
||||||
|
|
||||||
|
link.set_link_established_callback(on_established)
|
||||||
|
return {"status": "requesting", "node": destination_hash_hex, "path": page_path}
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
return {"error": str(e)}
|
||||||
|
|
||||||
|
|
||||||
|
def _on_nomadnet_response(node_hash: str, page_path: str, receipt):
|
||||||
|
"""Handle NomadNet page response."""
|
||||||
|
try:
|
||||||
|
response = receipt.response
|
||||||
|
content = response.decode("utf-8") if isinstance(response, bytes) else str(response)
|
||||||
|
_emit_event("nomadnet.page.downloaded", {
|
||||||
|
"node": node_hash,
|
||||||
|
"path": page_path,
|
||||||
|
"content": content,
|
||||||
|
"size": len(content),
|
||||||
|
})
|
||||||
|
except Exception as e:
|
||||||
|
_emit_event("nomadnet.page.error", {
|
||||||
|
"node": node_hash, "path": page_path, "error": str(e)
|
||||||
|
})
|
||||||
|
|
||||||
|
|
||||||
|
# ═══════════════════════════════════════════════════════════════════
|
||||||
|
# Status / Query
|
||||||
|
# ═══════════════════════════════════════════════════════════════════
|
||||||
|
|
||||||
def get_status() -> dict:
|
def get_status() -> dict:
|
||||||
"""Return transport status info."""
|
|
||||||
if _rns_instance is None:
|
if _rns_instance is None:
|
||||||
return {"online": False, "transport_enabled": False, "identity_hash": "",
|
return {"online": False, "transport_enabled": False, "identity_hash": "",
|
||||||
"uptime_seconds": 0, "announced_count": 0, "path_count": 0}
|
"uptime_seconds": 0, "announced_count": 0, "path_count": 0}
|
||||||
|
|
@ -81,21 +354,20 @@ def get_status() -> dict:
|
||||||
"uptime_seconds": time.time() - _start_time,
|
"uptime_seconds": time.time() - _start_time,
|
||||||
"announced_count": announced_count,
|
"announced_count": announced_count,
|
||||||
"path_count": len(RNS.Transport.destinations) if hasattr(RNS.Transport, "destinations") else 0,
|
"path_count": len(RNS.Transport.destinations) if hasattr(RNS.Transport, "destinations") else 0,
|
||||||
|
"active_calls": len(_active_calls),
|
||||||
|
"nomadnet_nodes": len(_nomadnet_nodes),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
def get_nodes() -> list[dict]:
|
def get_nodes() -> list[dict]:
|
||||||
"""Return list of known announced destinations."""
|
|
||||||
with _lock:
|
with _lock:
|
||||||
return list(_announced_destinations.values())
|
return list(_announced_destinations.values())
|
||||||
|
|
||||||
|
|
||||||
def get_topology() -> dict:
|
def get_topology() -> dict:
|
||||||
"""Return nodes and links for visualization."""
|
|
||||||
nodes = get_nodes()
|
nodes = get_nodes()
|
||||||
links = []
|
links = []
|
||||||
|
|
||||||
# Build links from destinations/path tables if available
|
|
||||||
dest_table = getattr(RNS.Transport, "destinations", {})
|
dest_table = getattr(RNS.Transport, "destinations", {})
|
||||||
if isinstance(dest_table, (dict, list)):
|
if isinstance(dest_table, (dict, list)):
|
||||||
try:
|
try:
|
||||||
|
|
@ -110,7 +382,7 @@ def get_topology() -> dict:
|
||||||
"active": True,
|
"active": True,
|
||||||
})
|
})
|
||||||
except Exception:
|
except Exception:
|
||||||
pass # Transport internals may vary between RNS versions
|
pass
|
||||||
|
|
||||||
return {
|
return {
|
||||||
"nodes": nodes,
|
"nodes": nodes,
|
||||||
|
|
@ -121,7 +393,6 @@ def get_topology() -> dict:
|
||||||
|
|
||||||
|
|
||||||
def get_identity_info() -> dict:
|
def get_identity_info() -> dict:
|
||||||
"""Return server identity information."""
|
|
||||||
if _identity is None:
|
if _identity is None:
|
||||||
return {"identity_hash": "", "public_key_hex": ""}
|
return {"identity_hash": "", "public_key_hex": ""}
|
||||||
return {
|
return {
|
||||||
|
|
@ -131,7 +402,6 @@ def get_identity_info() -> dict:
|
||||||
|
|
||||||
|
|
||||||
def announce():
|
def announce():
|
||||||
"""Announce this bridge on the network."""
|
|
||||||
if _destination is None:
|
if _destination is None:
|
||||||
return {"announced": False, "identity_hash": ""}
|
return {"announced": False, "identity_hash": ""}
|
||||||
_destination.announce(app_data=b"rMesh Bridge")
|
_destination.announce(app_data=b"rMesh Bridge")
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,63 @@
|
||||||
|
"""
|
||||||
|
WebSocket manager — broadcasts real-time events to connected clients.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import json
|
||||||
|
import logging
|
||||||
|
from typing import Set
|
||||||
|
|
||||||
|
from fastapi import WebSocket
|
||||||
|
|
||||||
|
logger = logging.getLogger("rmesh.websocket")
|
||||||
|
|
||||||
|
_clients: Set[WebSocket] = set()
|
||||||
|
_loop: asyncio.AbstractEventLoop | None = None
|
||||||
|
|
||||||
|
|
||||||
|
def set_event_loop(loop: asyncio.AbstractEventLoop):
|
||||||
|
global _loop
|
||||||
|
_loop = loop
|
||||||
|
|
||||||
|
|
||||||
|
async def connect(websocket: WebSocket):
|
||||||
|
await websocket.accept()
|
||||||
|
_clients.add(websocket)
|
||||||
|
logger.info("WebSocket client connected (%d total)", len(_clients))
|
||||||
|
|
||||||
|
|
||||||
|
def disconnect(websocket: WebSocket):
|
||||||
|
_clients.discard(websocket)
|
||||||
|
logger.info("WebSocket client disconnected (%d remaining)", len(_clients))
|
||||||
|
|
||||||
|
|
||||||
|
async def broadcast(data: dict):
|
||||||
|
"""Send event to all connected WebSocket clients."""
|
||||||
|
if not _clients:
|
||||||
|
return
|
||||||
|
|
||||||
|
message = json.dumps(data)
|
||||||
|
dead = set()
|
||||||
|
for client in _clients:
|
||||||
|
try:
|
||||||
|
await client.send_text(message)
|
||||||
|
except Exception:
|
||||||
|
dead.add(client)
|
||||||
|
|
||||||
|
for client in dead:
|
||||||
|
_clients.discard(client)
|
||||||
|
|
||||||
|
|
||||||
|
def on_event(event: dict):
|
||||||
|
"""Synchronous callback that schedules broadcast on the event loop.
|
||||||
|
Use this as the event_listener for reticulum_bridge and lxmf_bridge."""
|
||||||
|
if _loop is None or not _clients:
|
||||||
|
return
|
||||||
|
try:
|
||||||
|
asyncio.run_coroutine_threadsafe(broadcast(event), _loop)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
def client_count() -> int:
|
||||||
|
return len(_clients)
|
||||||
|
|
@ -4,3 +4,5 @@ fastapi>=0.115.0
|
||||||
uvicorn[standard]>=0.30.0
|
uvicorn[standard]>=0.30.0
|
||||||
pydantic>=2.0
|
pydantic>=2.0
|
||||||
meshcore>=2.3.0
|
meshcore>=2.3.0
|
||||||
|
httpx>=0.27.0
|
||||||
|
websockets>=12.0
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue