Compare commits
No commits in common. "main" and "dev" have entirely different histories.
|
|
@ -4,23 +4,12 @@ BRIDGE_API_KEY = os.environ.get("BRIDGE_API_KEY", "")
|
||||||
RNS_CONFIG_DIR = os.environ.get("RNS_CONFIG_DIR", "/data/reticulum")
|
RNS_CONFIG_DIR = os.environ.get("RNS_CONFIG_DIR", "/data/reticulum")
|
||||||
LXMF_STORAGE_DIR = os.environ.get("LXMF_STORAGE_DIR", "/data/lxmf")
|
LXMF_STORAGE_DIR = os.environ.get("LXMF_STORAGE_DIR", "/data/lxmf")
|
||||||
LXMF_MESSAGES_FILE = os.path.join(LXMF_STORAGE_DIR, "bridge_messages.json")
|
LXMF_MESSAGES_FILE = os.path.join(LXMF_STORAGE_DIR, "bridge_messages.json")
|
||||||
LXMF_ATTACHMENTS_DIR = os.path.join(LXMF_STORAGE_DIR, "attachments")
|
|
||||||
LOG_LEVEL = int(os.environ.get("RNS_LOG_LEVEL", "4"))
|
LOG_LEVEL = int(os.environ.get("RNS_LOG_LEVEL", "4"))
|
||||||
|
|
||||||
# MeshCore companion node connection
|
# MeshCore companion node connection
|
||||||
MESHCORE_ENABLED = os.environ.get("MESHCORE_ENABLED", "false").lower() == "true"
|
MESHCORE_ENABLED = os.environ.get("MESHCORE_ENABLED", "false").lower() == "true"
|
||||||
MESHCORE_HOST = os.environ.get("MESHCORE_HOST", "")
|
MESHCORE_HOST = os.environ.get("MESHCORE_HOST", "") # TCP host (WiFi companion)
|
||||||
MESHCORE_PORT = int(os.environ.get("MESHCORE_PORT", "5000"))
|
MESHCORE_PORT = int(os.environ.get("MESHCORE_PORT", "5000"))
|
||||||
MESHCORE_SERIAL = os.environ.get("MESHCORE_SERIAL", "")
|
MESHCORE_SERIAL = os.environ.get("MESHCORE_SERIAL", "") # Serial port (USB companion)
|
||||||
MESHCORE_DATA_DIR = os.environ.get("MESHCORE_DATA_DIR", "/data/meshcore")
|
MESHCORE_DATA_DIR = os.environ.get("MESHCORE_DATA_DIR", "/data/meshcore")
|
||||||
MESHCORE_MESSAGES_FILE = os.path.join(MESHCORE_DATA_DIR, "messages.json")
|
MESHCORE_MESSAGES_FILE = os.path.join(MESHCORE_DATA_DIR, "messages.json")
|
||||||
|
|
||||||
# Feature flags
|
|
||||||
AUTO_RESEND_ON_ANNOUNCE = os.environ.get("AUTO_RESEND_ON_ANNOUNCE", "true").lower() == "true"
|
|
||||||
LOCAL_PROPAGATION_NODE = os.environ.get("LOCAL_PROPAGATION_NODE", "true").lower() == "true"
|
|
||||||
PREFERRED_PROPAGATION_NODE = os.environ.get("PREFERRED_PROPAGATION_NODE", "")
|
|
||||||
PROPAGATION_SYNC_INTERVAL = int(os.environ.get("PROPAGATION_SYNC_INTERVAL", "300"))
|
|
||||||
|
|
||||||
# rChats bridge
|
|
||||||
RCHATS_BRIDGE_ENABLED = os.environ.get("RCHATS_BRIDGE_ENABLED", "false").lower() == "true"
|
|
||||||
RCHATS_INTERNAL_URL = os.environ.get("RCHATS_INTERNAL_URL", "http://rchats:3000")
|
|
||||||
|
|
|
||||||
|
|
@ -1,9 +1,7 @@
|
||||||
"""
|
"""
|
||||||
LXMF bridge — rich message handling, propagation node management,
|
LXMF bridge — message send/receive via the LXMF protocol on Reticulum.
|
||||||
auto-resend on announce, file/image/audio attachments.
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import base64
|
|
||||||
import json
|
import json
|
||||||
import time
|
import time
|
||||||
import uuid
|
import uuid
|
||||||
|
|
@ -14,11 +12,7 @@ import os
|
||||||
import RNS
|
import RNS
|
||||||
import LXMF
|
import LXMF
|
||||||
|
|
||||||
from .config import (
|
from .config import LXMF_STORAGE_DIR, LXMF_MESSAGES_FILE
|
||||||
LXMF_STORAGE_DIR, LXMF_MESSAGES_FILE, LXMF_ATTACHMENTS_DIR,
|
|
||||||
AUTO_RESEND_ON_ANNOUNCE, LOCAL_PROPAGATION_NODE,
|
|
||||||
PREFERRED_PROPAGATION_NODE, PROPAGATION_SYNC_INTERVAL,
|
|
||||||
)
|
|
||||||
|
|
||||||
logger = logging.getLogger("rmesh.lxmf")
|
logger = logging.getLogger("rmesh.lxmf")
|
||||||
|
|
||||||
|
|
@ -27,9 +21,6 @@ _local_delivery_destination: RNS.Destination | None = None
|
||||||
_identity: RNS.Identity | None = None
|
_identity: RNS.Identity | None = None
|
||||||
_messages: list[dict] = []
|
_messages: list[dict] = []
|
||||||
_lock = threading.Lock()
|
_lock = threading.Lock()
|
||||||
_event_listeners: list = [] # WebSocket broadcast callbacks
|
|
||||||
_propagation_nodes: dict[str, dict] = {}
|
|
||||||
_last_propagation_sync: float = 0
|
|
||||||
|
|
||||||
|
|
||||||
def init(identity: RNS.Identity):
|
def init(identity: RNS.Identity):
|
||||||
|
|
@ -39,30 +30,16 @@ def init(identity: RNS.Identity):
|
||||||
_identity = identity
|
_identity = identity
|
||||||
|
|
||||||
os.makedirs(LXMF_STORAGE_DIR, exist_ok=True)
|
os.makedirs(LXMF_STORAGE_DIR, exist_ok=True)
|
||||||
os.makedirs(LXMF_ATTACHMENTS_DIR, exist_ok=True)
|
|
||||||
|
|
||||||
_router = LXMF.LXMRouter(
|
_router = LXMF.LXMRouter(
|
||||||
identity=_identity,
|
identity=_identity,
|
||||||
storagepath=LXMF_STORAGE_DIR,
|
storagepath=LXMF_STORAGE_DIR,
|
||||||
)
|
)
|
||||||
|
|
||||||
# Increase transfer limit for attachments (10 MB)
|
# Enable propagation node
|
||||||
_router.delivery_per_transfer_limit = 10000
|
_router.enable_propagation()
|
||||||
|
|
||||||
# Enable local propagation node
|
# Register local delivery destination for receiving messages
|
||||||
if LOCAL_PROPAGATION_NODE:
|
|
||||||
_router.enable_propagation()
|
|
||||||
logger.info("Local LXMF propagation node enabled")
|
|
||||||
|
|
||||||
# Set preferred propagation node
|
|
||||||
if PREFERRED_PROPAGATION_NODE:
|
|
||||||
try:
|
|
||||||
_router.set_outbound_propagation_node(bytes.fromhex(PREFERRED_PROPAGATION_NODE))
|
|
||||||
logger.info("Set preferred propagation node: %s", PREFERRED_PROPAGATION_NODE[:16])
|
|
||||||
except Exception as e:
|
|
||||||
logger.warning("Failed to set propagation node: %s", e)
|
|
||||||
|
|
||||||
# Register local delivery
|
|
||||||
_local_delivery_destination = _router.register_delivery_identity(
|
_local_delivery_destination = _router.register_delivery_identity(
|
||||||
_identity,
|
_identity,
|
||||||
display_name="rMesh Bridge",
|
display_name="rMesh Bridge",
|
||||||
|
|
@ -70,152 +47,14 @@ def init(identity: RNS.Identity):
|
||||||
|
|
||||||
_router.register_delivery_callback(_delivery_callback)
|
_router.register_delivery_callback(_delivery_callback)
|
||||||
|
|
||||||
# Register announce handlers for auto-resend and propagation tracking
|
# Load persisted messages
|
||||||
RNS.Transport.register_announce_handler(_LxmfAnnounceHandler())
|
|
||||||
RNS.Transport.register_announce_handler(_PropagationAnnounceHandler())
|
|
||||||
|
|
||||||
_load_messages()
|
_load_messages()
|
||||||
|
|
||||||
# Start propagation sync thread
|
|
||||||
if PROPAGATION_SYNC_INTERVAL > 0:
|
|
||||||
sync_thread = threading.Thread(target=_propagation_sync_loop, daemon=True)
|
|
||||||
sync_thread.start()
|
|
||||||
|
|
||||||
logger.info("LXMF bridge ready — delivery hash: %s", _identity.hexhash)
|
logger.info("LXMF bridge ready — delivery hash: %s", _identity.hexhash)
|
||||||
|
|
||||||
|
|
||||||
def register_event_listener(callback):
|
|
||||||
"""Register a callback for real-time events (WebSocket broadcast)."""
|
|
||||||
_event_listeners.append(callback)
|
|
||||||
|
|
||||||
|
|
||||||
def _emit_event(event_type: str, data: dict):
|
|
||||||
"""Emit event to all registered listeners."""
|
|
||||||
event = {"type": event_type, **data}
|
|
||||||
for listener in _event_listeners:
|
|
||||||
try:
|
|
||||||
listener(event)
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
# ═══════════════════════════════════════════════════════════════════
|
|
||||||
# LXMF Rich Field Parsing
|
|
||||||
# ═══════════════════════════════════════════════════════════════════
|
|
||||||
|
|
||||||
def _parse_lxmf_fields(lxmf_message) -> dict:
|
|
||||||
"""Extract all LXMF fields from a message into a serializable dict."""
|
|
||||||
fields = {}
|
|
||||||
|
|
||||||
try:
|
|
||||||
lxmf_fields = lxmf_message.get_fields()
|
|
||||||
except Exception:
|
|
||||||
return fields
|
|
||||||
|
|
||||||
# Image attachment
|
|
||||||
if LXMF.FIELD_IMAGE in lxmf_fields:
|
|
||||||
try:
|
|
||||||
image_data = lxmf_fields[LXMF.FIELD_IMAGE]
|
|
||||||
image_type = image_data[0] if len(image_data) > 0 else "unknown"
|
|
||||||
image_bytes = image_data[1] if len(image_data) > 1 else b""
|
|
||||||
if image_bytes:
|
|
||||||
attachment_id = str(uuid.uuid4())
|
|
||||||
ext = "jpg" if "jpeg" in str(image_type).lower() or "jpg" in str(image_type).lower() else "png"
|
|
||||||
filename = f"{attachment_id}.{ext}"
|
|
||||||
filepath = os.path.join(LXMF_ATTACHMENTS_DIR, filename)
|
|
||||||
with open(filepath, "wb") as f:
|
|
||||||
f.write(image_bytes if isinstance(image_bytes, bytes) else bytes(image_bytes))
|
|
||||||
fields["image"] = {
|
|
||||||
"type": str(image_type),
|
|
||||||
"filename": filename,
|
|
||||||
"size": len(image_bytes),
|
|
||||||
"base64": base64.b64encode(image_bytes if isinstance(image_bytes, bytes) else bytes(image_bytes)).decode("ascii")[:200] + "...",
|
|
||||||
}
|
|
||||||
except Exception as e:
|
|
||||||
logger.warning("Failed to parse image field: %s", e)
|
|
||||||
|
|
||||||
# File attachments
|
|
||||||
if LXMF.FIELD_FILE_ATTACHMENTS in lxmf_fields:
|
|
||||||
try:
|
|
||||||
file_list = lxmf_fields[LXMF.FIELD_FILE_ATTACHMENTS]
|
|
||||||
attachments = []
|
|
||||||
for file_data in file_list:
|
|
||||||
file_name = file_data[0] if len(file_data) > 0 else "unknown"
|
|
||||||
file_bytes = file_data[1] if len(file_data) > 1 else b""
|
|
||||||
if file_bytes:
|
|
||||||
safe_name = "".join(c for c in str(file_name) if c.isalnum() or c in ".-_")
|
|
||||||
attachment_id = str(uuid.uuid4())
|
|
||||||
stored_name = f"{attachment_id}_{safe_name}"
|
|
||||||
filepath = os.path.join(LXMF_ATTACHMENTS_DIR, stored_name)
|
|
||||||
with open(filepath, "wb") as f:
|
|
||||||
f.write(file_bytes if isinstance(file_bytes, bytes) else bytes(file_bytes))
|
|
||||||
attachments.append({
|
|
||||||
"name": str(file_name),
|
|
||||||
"stored_name": stored_name,
|
|
||||||
"size": len(file_bytes),
|
|
||||||
})
|
|
||||||
if attachments:
|
|
||||||
fields["file_attachments"] = attachments
|
|
||||||
except Exception as e:
|
|
||||||
logger.warning("Failed to parse file attachments: %s", e)
|
|
||||||
|
|
||||||
# Audio message (Codec2)
|
|
||||||
if LXMF.FIELD_AUDIO in lxmf_fields:
|
|
||||||
try:
|
|
||||||
audio_data = lxmf_fields[LXMF.FIELD_AUDIO]
|
|
||||||
audio_mode = audio_data[0] if len(audio_data) > 0 else 0
|
|
||||||
audio_bytes = audio_data[1] if len(audio_data) > 1 else b""
|
|
||||||
if audio_bytes:
|
|
||||||
attachment_id = str(uuid.uuid4())
|
|
||||||
filename = f"{attachment_id}.c2"
|
|
||||||
filepath = os.path.join(LXMF_ATTACHMENTS_DIR, filename)
|
|
||||||
with open(filepath, "wb") as f:
|
|
||||||
f.write(audio_bytes if isinstance(audio_bytes, bytes) else bytes(audio_bytes))
|
|
||||||
fields["audio"] = {
|
|
||||||
"codec": "codec2",
|
|
||||||
"mode": audio_mode,
|
|
||||||
"filename": filename,
|
|
||||||
"size": len(audio_bytes),
|
|
||||||
}
|
|
||||||
except Exception as e:
|
|
||||||
logger.warning("Failed to parse audio field: %s", e)
|
|
||||||
|
|
||||||
# Icon appearance
|
|
||||||
if LXMF.FIELD_ICON_APPEARANCE in lxmf_fields:
|
|
||||||
try:
|
|
||||||
icon_data = lxmf_fields[LXMF.FIELD_ICON_APPEARANCE]
|
|
||||||
fields["icon"] = {
|
|
||||||
"name": str(icon_data[0]) if len(icon_data) > 0 else "",
|
|
||||||
"foreground": "#" + icon_data[1].hex() if len(icon_data) > 1 and hasattr(icon_data[1], "hex") else "",
|
|
||||||
"background": "#" + icon_data[2].hex() if len(icon_data) > 2 and hasattr(icon_data[2], "hex") else "",
|
|
||||||
}
|
|
||||||
except Exception as e:
|
|
||||||
logger.warning("Failed to parse icon field: %s", e)
|
|
||||||
|
|
||||||
# Commands
|
|
||||||
if LXMF.FIELD_COMMANDS in lxmf_fields:
|
|
||||||
try:
|
|
||||||
fields["commands"] = [str(cmd) for cmd in lxmf_fields[LXMF.FIELD_COMMANDS]]
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
|
|
||||||
return fields
|
|
||||||
|
|
||||||
|
|
||||||
# ═══════════════════════════════════════════════════════════════════
|
|
||||||
# Message Handling
|
|
||||||
# ═══════════════════════════════════════════════════════════════════
|
|
||||||
|
|
||||||
def _delivery_callback(message):
|
def _delivery_callback(message):
|
||||||
"""Handle incoming LXMF message with rich field extraction."""
|
"""Handle incoming LXMF message."""
|
||||||
# Parse rich fields
|
|
||||||
fields = _parse_lxmf_fields(message)
|
|
||||||
|
|
||||||
# Get signal quality if available
|
|
||||||
rssi = getattr(message, "rssi", None)
|
|
||||||
snr = getattr(message, "snr", None)
|
|
||||||
quality = getattr(message, "quality", None)
|
|
||||||
|
|
||||||
msg_record = {
|
msg_record = {
|
||||||
"id": str(uuid.uuid4()),
|
"id": str(uuid.uuid4()),
|
||||||
"direction": "inbound",
|
"direction": "inbound",
|
||||||
|
|
@ -223,10 +62,6 @@ def _delivery_callback(message):
|
||||||
"recipient_hash": _identity.hexhash if _identity else "",
|
"recipient_hash": _identity.hexhash if _identity else "",
|
||||||
"title": message.title.decode("utf-8") if isinstance(message.title, bytes) else str(message.title or ""),
|
"title": message.title.decode("utf-8") if isinstance(message.title, bytes) else str(message.title or ""),
|
||||||
"content": message.content.decode("utf-8") if isinstance(message.content, bytes) else str(message.content or ""),
|
"content": message.content.decode("utf-8") if isinstance(message.content, bytes) else str(message.content or ""),
|
||||||
"fields": fields,
|
|
||||||
"rssi": rssi,
|
|
||||||
"snr": snr,
|
|
||||||
"quality": quality,
|
|
||||||
"status": "delivered",
|
"status": "delivered",
|
||||||
"timestamp": time.time(),
|
"timestamp": time.time(),
|
||||||
}
|
}
|
||||||
|
|
@ -235,19 +70,11 @@ def _delivery_callback(message):
|
||||||
_messages.append(msg_record)
|
_messages.append(msg_record)
|
||||||
_save_messages()
|
_save_messages()
|
||||||
|
|
||||||
logger.info("Received LXMF message from %s (fields: %s)",
|
logger.info("Received LXMF message from %s", msg_record["sender_hash"][:16])
|
||||||
msg_record["sender_hash"][:16],
|
|
||||||
list(fields.keys()) if fields else "none")
|
|
||||||
|
|
||||||
_emit_event("lxmf.delivery", {"message": msg_record})
|
|
||||||
|
|
||||||
|
|
||||||
def send_message(destination_hash_hex: str, content: str, title: str = "",
|
def send_message(destination_hash_hex: str, content: str, title: str = "") -> dict:
|
||||||
image_bytes: bytes = None, image_type: str = "image/jpeg",
|
"""Send an LXMF message to a destination hash."""
|
||||||
file_attachments: list = None,
|
|
||||||
audio_bytes: bytes = None, audio_mode: int = 0,
|
|
||||||
try_propagation_on_fail: bool = True) -> dict:
|
|
||||||
"""Send an LXMF message with optional attachments."""
|
|
||||||
if _router is None or _identity is None:
|
if _router is None or _identity is None:
|
||||||
return {"id": "", "status": "failed"}
|
return {"id": "", "status": "failed"}
|
||||||
|
|
||||||
|
|
@ -256,82 +83,72 @@ def send_message(destination_hash_hex: str, content: str, title: str = "",
|
||||||
except ValueError:
|
except ValueError:
|
||||||
return {"id": "", "status": "failed"}
|
return {"id": "", "status": "failed"}
|
||||||
|
|
||||||
|
# Look up identity for destination
|
||||||
dest_identity = RNS.Identity.recall(dest_hash)
|
dest_identity = RNS.Identity.recall(dest_hash)
|
||||||
|
|
||||||
msg_id = str(uuid.uuid4())
|
msg_id = str(uuid.uuid4())
|
||||||
|
|
||||||
if dest_identity is None:
|
if dest_identity is None:
|
||||||
|
# Request path first, the message may be deliverable later
|
||||||
RNS.Transport.request_path(dest_hash)
|
RNS.Transport.request_path(dest_hash)
|
||||||
msg_record = {
|
msg_record = {
|
||||||
"id": msg_id, "direction": "outbound",
|
"id": msg_id,
|
||||||
"sender_hash": _identity.hexhash, "recipient_hash": destination_hash_hex,
|
"direction": "outbound",
|
||||||
"title": title, "content": content, "fields": {},
|
"sender_hash": _identity.hexhash,
|
||||||
"status": "pending", "timestamp": time.time(),
|
"recipient_hash": destination_hash_hex,
|
||||||
|
"title": title,
|
||||||
|
"content": content,
|
||||||
|
"status": "pending",
|
||||||
|
"timestamp": time.time(),
|
||||||
}
|
}
|
||||||
with _lock:
|
with _lock:
|
||||||
_messages.append(msg_record)
|
_messages.append(msg_record)
|
||||||
_save_messages()
|
_save_messages()
|
||||||
return msg_record
|
return msg_record
|
||||||
|
|
||||||
|
# Create destination for the recipient
|
||||||
dest = RNS.Destination(
|
dest = RNS.Destination(
|
||||||
dest_identity, RNS.Destination.OUT, RNS.Destination.SINGLE,
|
dest_identity, RNS.Destination.OUT, RNS.Destination.SINGLE,
|
||||||
"lxmf", "delivery",
|
"lxmf", "delivery",
|
||||||
)
|
)
|
||||||
|
|
||||||
lxm = LXMF.LXMessage(
|
lxm = LXMF.LXMessage(
|
||||||
dest, _router.get_delivery_destination(),
|
dest,
|
||||||
|
_router.get_delivery_destination(),
|
||||||
content.encode("utf-8") if isinstance(content, str) else content,
|
content.encode("utf-8") if isinstance(content, str) else content,
|
||||||
title=title.encode("utf-8") if isinstance(title, str) else title,
|
title=title.encode("utf-8") if isinstance(title, str) else title,
|
||||||
desired_method=LXMF.LXMessage.DIRECT,
|
desired_method=LXMF.LXMessage.DIRECT,
|
||||||
)
|
)
|
||||||
|
|
||||||
# Add rich fields
|
|
||||||
if image_bytes:
|
|
||||||
lxm.fields[LXMF.FIELD_IMAGE] = [image_type, image_bytes]
|
|
||||||
if file_attachments:
|
|
||||||
lxm.fields[LXMF.FIELD_FILE_ATTACHMENTS] = [
|
|
||||||
[att["name"], att["bytes"]] for att in file_attachments
|
|
||||||
]
|
|
||||||
if audio_bytes:
|
|
||||||
lxm.fields[LXMF.FIELD_AUDIO] = [audio_mode, audio_bytes]
|
|
||||||
|
|
||||||
# Enable propagation fallback
|
|
||||||
lxm.try_propagation_on_fail = try_propagation_on_fail
|
|
||||||
|
|
||||||
msg_record = {
|
msg_record = {
|
||||||
"id": msg_id, "direction": "outbound",
|
"id": msg_id,
|
||||||
"sender_hash": _identity.hexhash, "recipient_hash": destination_hash_hex,
|
"direction": "outbound",
|
||||||
"title": title, "content": content, "fields": {},
|
"sender_hash": _identity.hexhash,
|
||||||
"status": "pending", "timestamp": time.time(),
|
"recipient_hash": destination_hash_hex,
|
||||||
|
"title": title,
|
||||||
|
"content": content,
|
||||||
|
"status": "pending",
|
||||||
|
"timestamp": time.time(),
|
||||||
}
|
}
|
||||||
|
|
||||||
def _on_delivered(message):
|
def _delivery_callback_outbound(message):
|
||||||
with _lock:
|
with _lock:
|
||||||
for m in _messages:
|
for m in _messages:
|
||||||
if m["id"] == msg_id:
|
if m["id"] == msg_id:
|
||||||
m["status"] = "delivered"
|
m["status"] = "delivered"
|
||||||
break
|
break
|
||||||
_save_messages()
|
_save_messages()
|
||||||
_emit_event("lxmf.state_updated", {"message_id": msg_id, "status": "delivered"})
|
|
||||||
|
|
||||||
def _on_failed(message):
|
|
||||||
# Try propagation node if direct failed
|
|
||||||
if getattr(message, "try_propagation_on_fail", False) and _router.outbound_propagation_node:
|
|
||||||
logger.info("Direct delivery failed, trying propagation node for %s", destination_hash_hex[:16])
|
|
||||||
message.desired_method = LXMF.LXMessage.PROPAGATED
|
|
||||||
message.try_propagation_on_fail = False
|
|
||||||
_router.handle_outbound(message)
|
|
||||||
return
|
|
||||||
|
|
||||||
|
def _failed_callback(message):
|
||||||
with _lock:
|
with _lock:
|
||||||
for m in _messages:
|
for m in _messages:
|
||||||
if m["id"] == msg_id:
|
if m["id"] == msg_id:
|
||||||
m["status"] = "failed"
|
m["status"] = "failed"
|
||||||
break
|
break
|
||||||
_save_messages()
|
_save_messages()
|
||||||
_emit_event("lxmf.state_updated", {"message_id": msg_id, "status": "failed"})
|
|
||||||
|
|
||||||
lxm.delivery_callback = _on_delivered
|
lxm.delivery_callback = _delivery_callback_outbound
|
||||||
lxm.failed_callback = _on_failed
|
lxm.failed_callback = _failed_callback
|
||||||
|
|
||||||
_router.handle_outbound(lxm)
|
_router.handle_outbound(lxm)
|
||||||
|
|
||||||
|
|
@ -339,159 +156,18 @@ def send_message(destination_hash_hex: str, content: str, title: str = "",
|
||||||
_messages.append(msg_record)
|
_messages.append(msg_record)
|
||||||
_save_messages()
|
_save_messages()
|
||||||
|
|
||||||
_emit_event("lxmf.created", {"message": msg_record})
|
|
||||||
return msg_record
|
return msg_record
|
||||||
|
|
||||||
|
|
||||||
# ═══════════════════════════════════════════════════════════════════
|
|
||||||
# Auto-Resend Failed Messages on Announce
|
|
||||||
# ═══════════════════════════════════════════════════════════════════
|
|
||||||
|
|
||||||
class _LxmfAnnounceHandler:
|
|
||||||
"""Re-attempt failed messages when a peer announces."""
|
|
||||||
aspect_filter = "lxmf.delivery"
|
|
||||||
|
|
||||||
def received_announce(self, destination_hash, announced_identity, app_data, announce_packet_hash=None):
|
|
||||||
if not AUTO_RESEND_ON_ANNOUNCE:
|
|
||||||
return
|
|
||||||
|
|
||||||
dest_hex = destination_hash.hex()
|
|
||||||
resend_count = 0
|
|
||||||
|
|
||||||
with _lock:
|
|
||||||
for msg in _messages:
|
|
||||||
if (msg["status"] == "failed" and
|
|
||||||
msg["direction"] == "outbound" and
|
|
||||||
msg["recipient_hash"] == dest_hex):
|
|
||||||
msg["status"] = "pending"
|
|
||||||
resend_count += 1
|
|
||||||
# Re-send in background
|
|
||||||
threading.Thread(
|
|
||||||
target=_resend_message, args=(msg,), daemon=True
|
|
||||||
).start()
|
|
||||||
|
|
||||||
if resend_count > 0:
|
|
||||||
_save_messages()
|
|
||||||
|
|
||||||
if resend_count > 0:
|
|
||||||
logger.info("Auto-resending %d failed messages to %s", resend_count, dest_hex[:16])
|
|
||||||
|
|
||||||
|
|
||||||
def _resend_message(msg_record: dict):
|
|
||||||
"""Resend a previously failed message."""
|
|
||||||
time.sleep(1) # Brief delay to let path establish
|
|
||||||
result = send_message(
|
|
||||||
msg_record["recipient_hash"],
|
|
||||||
msg_record["content"],
|
|
||||||
msg_record.get("title", ""),
|
|
||||||
)
|
|
||||||
if result.get("status") == "pending":
|
|
||||||
# Remove the duplicate — original is being retried
|
|
||||||
with _lock:
|
|
||||||
_messages[:] = [m for m in _messages if m["id"] != result["id"]]
|
|
||||||
_save_messages()
|
|
||||||
|
|
||||||
|
|
||||||
# ═══════════════════════════════════════════════════════════════════
|
|
||||||
# Propagation Node Management
|
|
||||||
# ═══════════════════════════════════════════════════════════════════
|
|
||||||
|
|
||||||
class _PropagationAnnounceHandler:
|
|
||||||
"""Track LXMF propagation node announces."""
|
|
||||||
aspect_filter = "lxmf.propagation"
|
|
||||||
|
|
||||||
def received_announce(self, destination_hash, announced_identity, app_data, announce_packet_hash=None):
|
|
||||||
dest_hex = destination_hash.hex()
|
|
||||||
node_info = {
|
|
||||||
"destination_hash": dest_hex,
|
|
||||||
"identity_hash": announced_identity.hexhash if announced_identity else "",
|
|
||||||
"last_heard": time.time(),
|
|
||||||
"enabled": True,
|
|
||||||
"per_transfer_limit": 0,
|
|
||||||
}
|
|
||||||
|
|
||||||
# Parse propagation node app data
|
|
||||||
if app_data:
|
|
||||||
try:
|
|
||||||
import RNS.vendor.umsgpack as msgpack
|
|
||||||
data = msgpack.unpackb(app_data)
|
|
||||||
if isinstance(data, dict):
|
|
||||||
node_info["enabled"] = data.get("enabled", True)
|
|
||||||
node_info["per_transfer_limit"] = data.get("per_transfer_limit", 0)
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
|
|
||||||
_propagation_nodes[dest_hex] = node_info
|
|
||||||
_emit_event("propagation.announce", {"node": node_info})
|
|
||||||
|
|
||||||
|
|
||||||
def get_propagation_nodes() -> list[dict]:
|
|
||||||
"""Return list of known propagation nodes."""
|
|
||||||
return list(_propagation_nodes.values())
|
|
||||||
|
|
||||||
|
|
||||||
def set_preferred_propagation_node(destination_hash_hex: str) -> bool:
|
|
||||||
"""Set the preferred outbound propagation node."""
|
|
||||||
if _router is None:
|
|
||||||
return False
|
|
||||||
try:
|
|
||||||
if destination_hash_hex:
|
|
||||||
_router.set_outbound_propagation_node(bytes.fromhex(destination_hash_hex))
|
|
||||||
else:
|
|
||||||
_router.outbound_propagation_node = None
|
|
||||||
return True
|
|
||||||
except Exception as e:
|
|
||||||
logger.warning("Failed to set propagation node: %s", e)
|
|
||||||
return False
|
|
||||||
|
|
||||||
|
|
||||||
def get_propagation_status() -> dict:
|
|
||||||
"""Return propagation node status."""
|
|
||||||
if _router is None:
|
|
||||||
return {"enabled": False, "outbound_node": None, "transfer_state": "idle"}
|
|
||||||
|
|
||||||
outbound = _router.get_outbound_propagation_node()
|
|
||||||
return {
|
|
||||||
"local_enabled": LOCAL_PROPAGATION_NODE,
|
|
||||||
"outbound_node": outbound.hex() if outbound else None,
|
|
||||||
"transfer_state": str(getattr(_router, "propagation_transfer_state", "unknown")),
|
|
||||||
"known_nodes": len(_propagation_nodes),
|
|
||||||
"last_sync": _last_propagation_sync,
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
def sync_propagation_messages():
|
|
||||||
"""Request messages from the propagation node."""
|
|
||||||
global _last_propagation_sync
|
|
||||||
if _router is None or _identity is None:
|
|
||||||
return
|
|
||||||
try:
|
|
||||||
_router.request_messages_from_propagation_node(_identity)
|
|
||||||
_last_propagation_sync = time.time()
|
|
||||||
logger.info("Syncing messages from propagation node")
|
|
||||||
except Exception as e:
|
|
||||||
logger.warning("Propagation sync failed: %s", e)
|
|
||||||
|
|
||||||
|
|
||||||
def _propagation_sync_loop():
|
|
||||||
"""Background thread to periodically sync propagation messages."""
|
|
||||||
while True:
|
|
||||||
time.sleep(PROPAGATION_SYNC_INTERVAL)
|
|
||||||
if _router and _router.get_outbound_propagation_node():
|
|
||||||
sync_propagation_messages()
|
|
||||||
|
|
||||||
|
|
||||||
# ═══════════════════════════════════════════════════════════════════
|
|
||||||
# Query / Persistence
|
|
||||||
# ═══════════════════════════════════════════════════════════════════
|
|
||||||
|
|
||||||
def get_messages(limit: int = 100, offset: int = 0) -> list[dict]:
|
def get_messages(limit: int = 100, offset: int = 0) -> list[dict]:
|
||||||
|
"""Return stored messages."""
|
||||||
with _lock:
|
with _lock:
|
||||||
sorted_msgs = sorted(_messages, key=lambda m: m["timestamp"], reverse=True)
|
sorted_msgs = sorted(_messages, key=lambda m: m["timestamp"], reverse=True)
|
||||||
return sorted_msgs[offset:offset + limit]
|
return sorted_msgs[offset:offset + limit]
|
||||||
|
|
||||||
|
|
||||||
def get_message(msg_id: str) -> dict | None:
|
def get_message(msg_id: str) -> dict | None:
|
||||||
|
"""Return a single message by ID."""
|
||||||
with _lock:
|
with _lock:
|
||||||
for m in _messages:
|
for m in _messages:
|
||||||
if m["id"] == msg_id:
|
if m["id"] == msg_id:
|
||||||
|
|
@ -500,19 +176,13 @@ def get_message(msg_id: str) -> dict | None:
|
||||||
|
|
||||||
|
|
||||||
def get_total_count() -> int:
|
def get_total_count() -> int:
|
||||||
|
"""Return total message count."""
|
||||||
with _lock:
|
with _lock:
|
||||||
return len(_messages)
|
return len(_messages)
|
||||||
|
|
||||||
|
|
||||||
def get_attachment_path(filename: str) -> str | None:
|
|
||||||
"""Return full path to an attachment file if it exists."""
|
|
||||||
filepath = os.path.join(LXMF_ATTACHMENTS_DIR, filename)
|
|
||||||
if os.path.exists(filepath):
|
|
||||||
return filepath
|
|
||||||
return None
|
|
||||||
|
|
||||||
|
|
||||||
def _load_messages():
|
def _load_messages():
|
||||||
|
"""Load messages from persistent storage."""
|
||||||
global _messages
|
global _messages
|
||||||
if os.path.exists(LXMF_MESSAGES_FILE):
|
if os.path.exists(LXMF_MESSAGES_FILE):
|
||||||
try:
|
try:
|
||||||
|
|
@ -525,6 +195,7 @@ def _load_messages():
|
||||||
|
|
||||||
|
|
||||||
def _save_messages():
|
def _save_messages():
|
||||||
|
"""Persist messages to storage (call with _lock held)."""
|
||||||
try:
|
try:
|
||||||
os.makedirs(os.path.dirname(LXMF_MESSAGES_FILE), exist_ok=True)
|
os.makedirs(os.path.dirname(LXMF_MESSAGES_FILE), exist_ok=True)
|
||||||
with open(LXMF_MESSAGES_FILE, "w") as f:
|
with open(LXMF_MESSAGES_FILE, "w") as f:
|
||||||
|
|
|
||||||
214
app/main.py
214
app/main.py
|
|
@ -1,19 +1,18 @@
|
||||||
"""
|
"""
|
||||||
rMesh Bridge — Unified REST + WebSocket API for Reticulum (Internet backbone),
|
rMesh Bridge — FastAPI service exposing Reticulum (Internet backbone)
|
||||||
MeshCore (LoRa mesh), LXMF messaging, audio calls, and NomadNet browsing.
|
and MeshCore (LoRa mesh) as a unified REST API.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import asyncio
|
|
||||||
import logging
|
import logging
|
||||||
from contextlib import asynccontextmanager
|
from contextlib import asynccontextmanager
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
from fastapi import FastAPI, HTTPException, Header, WebSocket, WebSocketDisconnect
|
from fastapi import FastAPI, HTTPException, Header
|
||||||
from fastapi.responses import FileResponse
|
|
||||||
from pydantic import BaseModel
|
from pydantic import BaseModel
|
||||||
from typing import Annotated, Optional
|
from typing import Annotated
|
||||||
|
|
||||||
from . import reticulum_bridge, lxmf_bridge, meshcore_bridge, websocket_manager, rchats_bridge
|
from . import reticulum_bridge, lxmf_bridge, meshcore_bridge
|
||||||
from .config import BRIDGE_API_KEY, MESHCORE_ENABLED, RCHATS_BRIDGE_ENABLED
|
from .config import BRIDGE_API_KEY, MESHCORE_ENABLED
|
||||||
from .models import (
|
from .models import (
|
||||||
StatusResponse, NodesResponse, TopologyResponse,
|
StatusResponse, NodesResponse, TopologyResponse,
|
||||||
MessageIn, MessageOut, MessagesResponse,
|
MessageIn, MessageOut, MessagesResponse,
|
||||||
|
|
@ -26,22 +25,9 @@ logger = logging.getLogger("rmesh.api")
|
||||||
|
|
||||||
@asynccontextmanager
|
@asynccontextmanager
|
||||||
async def lifespan(app: FastAPI):
|
async def lifespan(app: FastAPI):
|
||||||
"""Initialize all subsystems on startup."""
|
"""Initialize Reticulum and MeshCore on startup."""
|
||||||
logger.info("Starting rMesh bridge...")
|
logger.info("Starting rMesh bridge...")
|
||||||
|
|
||||||
# Set up WebSocket event loop
|
|
||||||
loop = asyncio.get_event_loop()
|
|
||||||
websocket_manager.set_event_loop(loop)
|
|
||||||
|
|
||||||
# Register WebSocket broadcast as event listener for all subsystems
|
|
||||||
reticulum_bridge.register_event_listener(websocket_manager.on_event)
|
|
||||||
lxmf_bridge.register_event_listener(websocket_manager.on_event)
|
|
||||||
|
|
||||||
# Register rChats bridge as event listener
|
|
||||||
if RCHATS_BRIDGE_ENABLED:
|
|
||||||
lxmf_bridge.register_event_listener(rchats_bridge.on_lxmf_event)
|
|
||||||
rchats_bridge.init()
|
|
||||||
|
|
||||||
# Reticulum (Internet backbone)
|
# Reticulum (Internet backbone)
|
||||||
reticulum_bridge.init()
|
reticulum_bridge.init()
|
||||||
from .reticulum_bridge import _identity
|
from .reticulum_bridge import _identity
|
||||||
|
|
@ -50,7 +36,6 @@ async def lifespan(app: FastAPI):
|
||||||
else:
|
else:
|
||||||
logger.error("No Reticulum identity available for LXMF")
|
logger.error("No Reticulum identity available for LXMF")
|
||||||
reticulum_bridge.announce()
|
reticulum_bridge.announce()
|
||||||
reticulum_bridge.announce_call_capability()
|
|
||||||
|
|
||||||
# MeshCore (LoRa mesh)
|
# MeshCore (LoRa mesh)
|
||||||
if MESHCORE_ENABLED:
|
if MESHCORE_ENABLED:
|
||||||
|
|
@ -75,25 +60,6 @@ def verify_api_key(x_bridge_api_key: Annotated[str, Header()] = ""):
|
||||||
raise HTTPException(status_code=401, detail="Invalid API key")
|
raise HTTPException(status_code=401, detail="Invalid API key")
|
||||||
|
|
||||||
|
|
||||||
# ═══════════════════════════════════════════════════════════════════
|
|
||||||
# WebSocket (real-time events)
|
|
||||||
# ═══════════════════════════════════════════════════════════════════
|
|
||||||
|
|
||||||
@app.websocket("/ws")
|
|
||||||
async def websocket_endpoint(websocket: WebSocket):
|
|
||||||
await websocket_manager.connect(websocket)
|
|
||||||
try:
|
|
||||||
while True:
|
|
||||||
data = await websocket.receive_text()
|
|
||||||
# Handle ping/pong
|
|
||||||
if data == "ping":
|
|
||||||
await websocket.send_text('{"type":"pong"}')
|
|
||||||
except WebSocketDisconnect:
|
|
||||||
websocket_manager.disconnect(websocket)
|
|
||||||
except Exception:
|
|
||||||
websocket_manager.disconnect(websocket)
|
|
||||||
|
|
||||||
|
|
||||||
# ═══════════════════════════════════════════════════════════════════
|
# ═══════════════════════════════════════════════════════════════════
|
||||||
# Health & Combined Status
|
# Health & Combined Status
|
||||||
# ═══════════════════════════════════════════════════════════════════
|
# ═══════════════════════════════════════════════════════════════════
|
||||||
|
|
@ -106,18 +72,15 @@ async def health():
|
||||||
@app.get("/api/status")
|
@app.get("/api/status")
|
||||||
async def combined_status(x_bridge_api_key: Annotated[str, Header()] = ""):
|
async def combined_status(x_bridge_api_key: Annotated[str, Header()] = ""):
|
||||||
verify_api_key(x_bridge_api_key)
|
verify_api_key(x_bridge_api_key)
|
||||||
return {
|
result = {
|
||||||
"reticulum": reticulum_bridge.get_status(),
|
"reticulum": reticulum_bridge.get_status(),
|
||||||
"meshcore": meshcore_bridge.get_status() if MESHCORE_ENABLED else {
|
"meshcore": meshcore_bridge.get_status() if MESHCORE_ENABLED else {"connected": False, "device_info": {}, "contact_count": 0, "message_count": 0},
|
||||||
"connected": False, "device_info": {}, "contact_count": 0, "message_count": 0
|
|
||||||
},
|
|
||||||
"propagation": lxmf_bridge.get_propagation_status(),
|
|
||||||
"websocket_clients": websocket_manager.client_count(),
|
|
||||||
}
|
}
|
||||||
|
return result
|
||||||
|
|
||||||
|
|
||||||
# ═══════════════════════════════════════════════════════════════════
|
# ═══════════════════════════════════════════════════════════════════
|
||||||
# Reticulum Endpoints
|
# Reticulum Endpoints (Internet backbone)
|
||||||
# ═══════════════════════════════════════════════════════════════════
|
# ═══════════════════════════════════════════════════════════════════
|
||||||
|
|
||||||
@app.get("/api/reticulum/status", response_model=StatusResponse)
|
@app.get("/api/reticulum/status", response_model=StatusResponse)
|
||||||
|
|
@ -126,14 +89,14 @@ async def reticulum_status(x_bridge_api_key: Annotated[str, Header()] = ""):
|
||||||
return reticulum_bridge.get_status()
|
return reticulum_bridge.get_status()
|
||||||
|
|
||||||
|
|
||||||
@app.get("/api/reticulum/nodes")
|
@app.get("/api/reticulum/nodes", response_model=NodesResponse)
|
||||||
async def reticulum_nodes(x_bridge_api_key: Annotated[str, Header()] = ""):
|
async def reticulum_nodes(x_bridge_api_key: Annotated[str, Header()] = ""):
|
||||||
verify_api_key(x_bridge_api_key)
|
verify_api_key(x_bridge_api_key)
|
||||||
node_list = reticulum_bridge.get_nodes()
|
node_list = reticulum_bridge.get_nodes()
|
||||||
return {"nodes": node_list, "total": len(node_list)}
|
return {"nodes": node_list, "total": len(node_list)}
|
||||||
|
|
||||||
|
|
||||||
@app.get("/api/reticulum/topology")
|
@app.get("/api/reticulum/topology", response_model=TopologyResponse)
|
||||||
async def reticulum_topology(x_bridge_api_key: Annotated[str, Header()] = ""):
|
async def reticulum_topology(x_bridge_api_key: Annotated[str, Header()] = ""):
|
||||||
verify_api_key(x_bridge_api_key)
|
verify_api_key(x_bridge_api_key)
|
||||||
return reticulum_bridge.get_topology()
|
return reticulum_bridge.get_topology()
|
||||||
|
|
@ -151,11 +114,7 @@ async def reticulum_identity(x_bridge_api_key: Annotated[str, Header()] = ""):
|
||||||
return reticulum_bridge.get_identity_info()
|
return reticulum_bridge.get_identity_info()
|
||||||
|
|
||||||
|
|
||||||
# ═══════════════════════════════════════════════════════════════════
|
@app.get("/api/reticulum/messages", response_model=MessagesResponse)
|
||||||
# LXMF Messages (with rich fields)
|
|
||||||
# ═══════════════════════════════════════════════════════════════════
|
|
||||||
|
|
||||||
@app.get("/api/reticulum/messages")
|
|
||||||
async def reticulum_messages(
|
async def reticulum_messages(
|
||||||
x_bridge_api_key: Annotated[str, Header()] = "",
|
x_bridge_api_key: Annotated[str, Header()] = "",
|
||||||
limit: int = 100, offset: int = 0,
|
limit: int = 100, offset: int = 0,
|
||||||
|
|
@ -165,7 +124,7 @@ async def reticulum_messages(
|
||||||
return {"messages": msgs, "total": lxmf_bridge.get_total_count()}
|
return {"messages": msgs, "total": lxmf_bridge.get_total_count()}
|
||||||
|
|
||||||
|
|
||||||
@app.post("/api/reticulum/messages")
|
@app.post("/api/reticulum/messages", response_model=MessageOut)
|
||||||
async def reticulum_send_message(
|
async def reticulum_send_message(
|
||||||
body: MessageIn,
|
body: MessageIn,
|
||||||
x_bridge_api_key: Annotated[str, Header()] = "",
|
x_bridge_api_key: Annotated[str, Header()] = "",
|
||||||
|
|
@ -181,120 +140,24 @@ async def reticulum_send_message(
|
||||||
return result
|
return result
|
||||||
|
|
||||||
|
|
||||||
@app.get("/api/reticulum/messages/{msg_id}")
|
# Keep old /api/status path as alias for backwards compat
|
||||||
async def reticulum_get_message(msg_id: str, x_bridge_api_key: Annotated[str, Header()] = ""):
|
@app.get("/api/nodes", response_model=NodesResponse)
|
||||||
|
async def nodes_compat(x_bridge_api_key: Annotated[str, Header()] = ""):
|
||||||
verify_api_key(x_bridge_api_key)
|
verify_api_key(x_bridge_api_key)
|
||||||
msg = lxmf_bridge.get_message(msg_id)
|
node_list = reticulum_bridge.get_nodes()
|
||||||
if msg is None:
|
return {"nodes": node_list, "total": len(node_list)}
|
||||||
raise HTTPException(status_code=404, detail="Message not found")
|
|
||||||
return msg
|
|
||||||
|
|
||||||
|
|
||||||
@app.get("/api/attachments/{filename}")
|
@app.get("/api/topology", response_model=TopologyResponse)
|
||||||
async def get_attachment(filename: str, x_bridge_api_key: Annotated[str, Header()] = ""):
|
async def topology_compat(x_bridge_api_key: Annotated[str, Header()] = ""):
|
||||||
verify_api_key(x_bridge_api_key)
|
verify_api_key(x_bridge_api_key)
|
||||||
# Sanitize filename
|
return reticulum_bridge.get_topology()
|
||||||
safe = "".join(c for c in filename if c.isalnum() or c in ".-_")
|
|
||||||
path = lxmf_bridge.get_attachment_path(safe)
|
|
||||||
if path is None:
|
|
||||||
raise HTTPException(status_code=404, detail="Attachment not found")
|
|
||||||
return FileResponse(path)
|
|
||||||
|
|
||||||
|
|
||||||
# ═══════════════════════════════════════════════════════════════════
|
@app.get("/api/identity", response_model=IdentityResponse)
|
||||||
# Propagation Node Management
|
async def identity_compat(x_bridge_api_key: Annotated[str, Header()] = ""):
|
||||||
# ═══════════════════════════════════════════════════════════════════
|
|
||||||
|
|
||||||
@app.get("/api/propagation/status")
|
|
||||||
async def propagation_status(x_bridge_api_key: Annotated[str, Header()] = ""):
|
|
||||||
verify_api_key(x_bridge_api_key)
|
verify_api_key(x_bridge_api_key)
|
||||||
return lxmf_bridge.get_propagation_status()
|
return reticulum_bridge.get_identity_info()
|
||||||
|
|
||||||
|
|
||||||
@app.get("/api/propagation/nodes")
|
|
||||||
async def propagation_nodes(x_bridge_api_key: Annotated[str, Header()] = ""):
|
|
||||||
verify_api_key(x_bridge_api_key)
|
|
||||||
nodes = lxmf_bridge.get_propagation_nodes()
|
|
||||||
return {"nodes": nodes, "total": len(nodes)}
|
|
||||||
|
|
||||||
|
|
||||||
class PropagationNodeSet(BaseModel):
|
|
||||||
destination_hash: str
|
|
||||||
|
|
||||||
|
|
||||||
@app.post("/api/propagation/preferred")
|
|
||||||
async def set_preferred_propagation(body: PropagationNodeSet, x_bridge_api_key: Annotated[str, Header()] = ""):
|
|
||||||
verify_api_key(x_bridge_api_key)
|
|
||||||
ok = lxmf_bridge.set_preferred_propagation_node(body.destination_hash)
|
|
||||||
if not ok:
|
|
||||||
raise HTTPException(status_code=400, detail="Failed to set propagation node")
|
|
||||||
return {"status": "ok", "destination_hash": body.destination_hash}
|
|
||||||
|
|
||||||
|
|
||||||
@app.post("/api/propagation/sync")
|
|
||||||
async def sync_propagation(x_bridge_api_key: Annotated[str, Header()] = ""):
|
|
||||||
verify_api_key(x_bridge_api_key)
|
|
||||||
lxmf_bridge.sync_propagation_messages()
|
|
||||||
return {"status": "syncing"}
|
|
||||||
|
|
||||||
|
|
||||||
# ═══════════════════════════════════════════════════════════════════
|
|
||||||
# Audio Calls
|
|
||||||
# ═══════════════════════════════════════════════════════════════════
|
|
||||||
|
|
||||||
class CallInitiate(BaseModel):
|
|
||||||
destination_hash: str
|
|
||||||
|
|
||||||
|
|
||||||
@app.get("/api/calls")
|
|
||||||
async def list_calls(x_bridge_api_key: Annotated[str, Header()] = ""):
|
|
||||||
verify_api_key(x_bridge_api_key)
|
|
||||||
return {"calls": reticulum_bridge.get_active_calls()}
|
|
||||||
|
|
||||||
|
|
||||||
@app.post("/api/calls")
|
|
||||||
async def initiate_call(body: CallInitiate, x_bridge_api_key: Annotated[str, Header()] = ""):
|
|
||||||
verify_api_key(x_bridge_api_key)
|
|
||||||
result = reticulum_bridge.initiate_call(body.destination_hash)
|
|
||||||
if not result.get("call_id"):
|
|
||||||
raise HTTPException(status_code=400, detail=result.get("error", "Call failed"))
|
|
||||||
return result
|
|
||||||
|
|
||||||
|
|
||||||
class CallHangup(BaseModel):
|
|
||||||
call_id: str
|
|
||||||
|
|
||||||
|
|
||||||
@app.post("/api/calls/hangup")
|
|
||||||
async def hangup_call(body: CallHangup, x_bridge_api_key: Annotated[str, Header()] = ""):
|
|
||||||
verify_api_key(x_bridge_api_key)
|
|
||||||
reticulum_bridge.hangup_call(body.call_id)
|
|
||||||
return {"status": "ok"}
|
|
||||||
|
|
||||||
|
|
||||||
# ═══════════════════════════════════════════════════════════════════
|
|
||||||
# NomadNet Node Browser
|
|
||||||
# ═══════════════════════════════════════════════════════════════════
|
|
||||||
|
|
||||||
@app.get("/api/nomadnet/nodes")
|
|
||||||
async def nomadnet_nodes(x_bridge_api_key: Annotated[str, Header()] = ""):
|
|
||||||
verify_api_key(x_bridge_api_key)
|
|
||||||
nodes = reticulum_bridge.get_nomadnet_nodes()
|
|
||||||
return {"nodes": nodes, "total": len(nodes)}
|
|
||||||
|
|
||||||
|
|
||||||
class NomadNetBrowse(BaseModel):
|
|
||||||
destination_hash: str
|
|
||||||
path: str = "/"
|
|
||||||
|
|
||||||
|
|
||||||
@app.post("/api/nomadnet/browse")
|
|
||||||
async def nomadnet_browse(body: NomadNetBrowse, x_bridge_api_key: Annotated[str, Header()] = ""):
|
|
||||||
verify_api_key(x_bridge_api_key)
|
|
||||||
result = reticulum_bridge.browse_nomadnet_node(body.destination_hash, body.path)
|
|
||||||
if "error" in result:
|
|
||||||
raise HTTPException(status_code=400, detail=result["error"])
|
|
||||||
return result
|
|
||||||
|
|
||||||
|
|
||||||
# ═══════════════════════════════════════════════════════════════════
|
# ═══════════════════════════════════════════════════════════════════
|
||||||
|
|
@ -375,26 +238,3 @@ async def meshcore_advert(x_bridge_api_key: Annotated[str, Header()] = ""):
|
||||||
async def meshcore_stats(x_bridge_api_key: Annotated[str, Header()] = ""):
|
async def meshcore_stats(x_bridge_api_key: Annotated[str, Header()] = ""):
|
||||||
verify_api_key(x_bridge_api_key)
|
verify_api_key(x_bridge_api_key)
|
||||||
return await meshcore_bridge.get_device_stats()
|
return await meshcore_bridge.get_device_stats()
|
||||||
|
|
||||||
|
|
||||||
# ═══════════════════════════════════════════════════════════════════
|
|
||||||
# Backwards Compatibility Aliases
|
|
||||||
# ═══════════════════════════════════════════════════════════════════
|
|
||||||
|
|
||||||
@app.get("/api/nodes")
|
|
||||||
async def nodes_compat(x_bridge_api_key: Annotated[str, Header()] = ""):
|
|
||||||
verify_api_key(x_bridge_api_key)
|
|
||||||
node_list = reticulum_bridge.get_nodes()
|
|
||||||
return {"nodes": node_list, "total": len(node_list)}
|
|
||||||
|
|
||||||
|
|
||||||
@app.get("/api/topology")
|
|
||||||
async def topology_compat(x_bridge_api_key: Annotated[str, Header()] = ""):
|
|
||||||
verify_api_key(x_bridge_api_key)
|
|
||||||
return reticulum_bridge.get_topology()
|
|
||||||
|
|
||||||
|
|
||||||
@app.get("/api/identity")
|
|
||||||
async def identity_compat(x_bridge_api_key: Annotated[str, Header()] = ""):
|
|
||||||
verify_api_key(x_bridge_api_key)
|
|
||||||
return reticulum_bridge.get_identity_info()
|
|
||||||
|
|
|
||||||
|
|
@ -1,79 +0,0 @@
|
||||||
"""
|
|
||||||
rChats bridge — relays LXMF and MeshCore messages to/from rChats.
|
|
||||||
Messages arriving over mesh are forwarded to rChats internal API.
|
|
||||||
Messages from rChats can be sent out over mesh.
|
|
||||||
"""
|
|
||||||
|
|
||||||
import json
|
|
||||||
import logging
|
|
||||||
import threading
|
|
||||||
import time
|
|
||||||
|
|
||||||
import httpx
|
|
||||||
|
|
||||||
from .config import RCHATS_BRIDGE_ENABLED, RCHATS_INTERNAL_URL
|
|
||||||
|
|
||||||
logger = logging.getLogger("rmesh.rchats_bridge")
|
|
||||||
|
|
||||||
|
|
||||||
def init():
|
|
||||||
"""Initialize the rChats bridge."""
|
|
||||||
if not RCHATS_BRIDGE_ENABLED:
|
|
||||||
logger.info("rChats bridge disabled")
|
|
||||||
return
|
|
||||||
logger.info("rChats bridge enabled — forwarding to %s", RCHATS_INTERNAL_URL)
|
|
||||||
|
|
||||||
|
|
||||||
def on_lxmf_event(event: dict):
|
|
||||||
"""Handle LXMF events and forward relevant ones to rChats."""
|
|
||||||
if not RCHATS_BRIDGE_ENABLED:
|
|
||||||
return
|
|
||||||
|
|
||||||
event_type = event.get("type", "")
|
|
||||||
|
|
||||||
if event_type == "lxmf.delivery":
|
|
||||||
msg = event.get("message", {})
|
|
||||||
_forward_to_rchats(msg, source="reticulum")
|
|
||||||
|
|
||||||
elif event_type == "meshcore.message":
|
|
||||||
msg = event.get("message", {})
|
|
||||||
_forward_to_rchats(msg, source="meshcore")
|
|
||||||
|
|
||||||
|
|
||||||
def _forward_to_rchats(msg: dict, source: str):
|
|
||||||
"""Forward a mesh message to rChats internal API."""
|
|
||||||
space_slug = msg.get("space_slug")
|
|
||||||
if not space_slug:
|
|
||||||
return # Only forward space-scoped messages
|
|
||||||
|
|
||||||
payload = {
|
|
||||||
"source": source,
|
|
||||||
"sender": msg.get("sender_hash", "") or msg.get("sender", ""),
|
|
||||||
"content": msg.get("content", ""),
|
|
||||||
"title": msg.get("title", ""),
|
|
||||||
"has_image": "image" in msg.get("fields", {}),
|
|
||||||
"has_audio": "audio" in msg.get("fields", {}),
|
|
||||||
"has_files": "file_attachments" in msg.get("fields", {}),
|
|
||||||
"space": space_slug,
|
|
||||||
"timestamp": msg.get("timestamp", time.time()),
|
|
||||||
}
|
|
||||||
|
|
||||||
threading.Thread(
|
|
||||||
target=_post_to_rchats, args=(space_slug, payload), daemon=True
|
|
||||||
).start()
|
|
||||||
|
|
||||||
|
|
||||||
def _post_to_rchats(space_slug: str, payload: dict):
|
|
||||||
"""POST message to rChats internal mesh-relay endpoint."""
|
|
||||||
try:
|
|
||||||
url = f"{RCHATS_INTERNAL_URL}/api/internal/mesh-relay"
|
|
||||||
with httpx.Client(timeout=10) as client:
|
|
||||||
response = client.post(url, json=payload)
|
|
||||||
if response.status_code < 400:
|
|
||||||
logger.info("Forwarded mesh message to rChats space '%s'", space_slug)
|
|
||||||
else:
|
|
||||||
logger.warning("rChats rejected message: %d %s", response.status_code, response.text[:100])
|
|
||||||
except httpx.ConnectError:
|
|
||||||
logger.debug("rChats not reachable at %s", RCHATS_INTERNAL_URL)
|
|
||||||
except Exception as e:
|
|
||||||
logger.warning("Failed to forward to rChats: %s", e)
|
|
||||||
|
|
@ -1,6 +1,5 @@
|
||||||
"""
|
"""
|
||||||
Reticulum bridge — singleton RNS instance, identity management,
|
Reticulum bridge — singleton RNS instance, identity management, topology queries.
|
||||||
topology queries, signal quality tracking, audio call support.
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import time
|
import time
|
||||||
|
|
@ -15,44 +14,17 @@ logger = logging.getLogger("rmesh.reticulum")
|
||||||
_rns_instance: RNS.Reticulum | None = None
|
_rns_instance: RNS.Reticulum | None = None
|
||||||
_identity: RNS.Identity | None = None
|
_identity: RNS.Identity | None = None
|
||||||
_destination: RNS.Destination | None = None
|
_destination: RNS.Destination | None = None
|
||||||
_call_destination: RNS.Destination | None = None
|
|
||||||
_start_time: float = 0
|
_start_time: float = 0
|
||||||
_announced_destinations: dict[str, dict] = {}
|
_announced_destinations: dict[str, dict] = {}
|
||||||
_lock = threading.Lock()
|
_lock = threading.Lock()
|
||||||
_event_listeners: list = []
|
|
||||||
|
|
||||||
APP_NAME = "rmesh"
|
APP_NAME = "rmesh"
|
||||||
ASPECT = "bridge"
|
ASPECT = "bridge"
|
||||||
|
|
||||||
|
|
||||||
def _safe_decode(data) -> str | None:
|
|
||||||
"""Safely decode app_data which may be binary."""
|
|
||||||
if data is None:
|
|
||||||
return None
|
|
||||||
if isinstance(data, bytes):
|
|
||||||
try:
|
|
||||||
return data.decode("utf-8")
|
|
||||||
except UnicodeDecodeError:
|
|
||||||
return data.hex()
|
|
||||||
return str(data)
|
|
||||||
|
|
||||||
|
|
||||||
def register_event_listener(callback):
|
|
||||||
_event_listeners.append(callback)
|
|
||||||
|
|
||||||
|
|
||||||
def _emit_event(event_type: str, data: dict):
|
|
||||||
event = {"type": event_type, **data}
|
|
||||||
for listener in _event_listeners:
|
|
||||||
try:
|
|
||||||
listener(event)
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
def init():
|
def init():
|
||||||
"""Initialize the Reticulum instance and server identity."""
|
"""Initialize the Reticulum instance and server identity."""
|
||||||
global _rns_instance, _identity, _destination, _call_destination, _start_time
|
global _rns_instance, _identity, _destination, _start_time
|
||||||
|
|
||||||
if _rns_instance is not None:
|
if _rns_instance is not None:
|
||||||
return
|
return
|
||||||
|
|
@ -71,275 +43,30 @@ def init():
|
||||||
_identity.to_file(identity_path)
|
_identity.to_file(identity_path)
|
||||||
logger.info("Created new identity")
|
logger.info("Created new identity")
|
||||||
|
|
||||||
# Bridge destination
|
# Create a destination for this bridge
|
||||||
_destination = RNS.Destination(
|
_destination = RNS.Destination(
|
||||||
_identity, RNS.Destination.IN, RNS.Destination.SINGLE,
|
_identity, RNS.Destination.IN, RNS.Destination.SINGLE,
|
||||||
APP_NAME, ASPECT,
|
APP_NAME, ASPECT,
|
||||||
)
|
)
|
||||||
|
|
||||||
# Audio call destination
|
# Register announce handler to track network
|
||||||
_call_destination = RNS.Destination(
|
RNS.Transport.register_announce_handler(_announce_handler)
|
||||||
_identity, RNS.Destination.IN, RNS.Destination.SINGLE,
|
|
||||||
"call", "audio",
|
|
||||||
)
|
|
||||||
_call_destination.set_link_established_callback(_on_incoming_call_link)
|
|
||||||
|
|
||||||
# Register announce handlers with signal quality
|
|
||||||
RNS.Transport.register_announce_handler(_AnnounceHandler())
|
|
||||||
|
|
||||||
logger.info("Reticulum bridge ready — identity: %s", _identity.hexhash)
|
logger.info("Reticulum bridge ready — identity: %s", _identity.hexhash)
|
||||||
|
|
||||||
|
|
||||||
class _AnnounceHandler:
|
def _announce_handler(destination_hash, announced_identity, app_data):
|
||||||
"""Track announces with RSSI/SNR/quality metrics."""
|
"""Callback when we hear an announce from another node."""
|
||||||
aspect_filter = None # Catch all announces
|
with _lock:
|
||||||
|
_announced_destinations[destination_hash.hex()] = {
|
||||||
def received_announce(self, destination_hash, announced_identity, app_data, announce_packet_hash=None):
|
"destination_hash": destination_hash.hex(),
|
||||||
# Extract signal quality from the announce packet
|
"app_data": app_data.decode("utf-8") if app_data else None,
|
||||||
rssi = None
|
|
||||||
snr = None
|
|
||||||
quality = None
|
|
||||||
|
|
||||||
# Try to get signal info from the packet
|
|
||||||
try:
|
|
||||||
if announce_packet_hash:
|
|
||||||
packet = RNS.Transport.packet_hashmap.get(announce_packet_hash)
|
|
||||||
if packet:
|
|
||||||
rssi = getattr(packet, "rssi", None)
|
|
||||||
snr = getattr(packet, "snr", None)
|
|
||||||
quality = getattr(packet, "q", None) or getattr(packet, "quality", None)
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
|
|
||||||
dest_hex = destination_hash.hex()
|
|
||||||
node_data = {
|
|
||||||
"destination_hash": dest_hex,
|
|
||||||
"identity_hash": announced_identity.hexhash if announced_identity else "",
|
|
||||||
"app_data": _safe_decode(app_data),
|
|
||||||
"last_heard": time.time(),
|
|
||||||
"rssi": rssi,
|
|
||||||
"snr": snr,
|
|
||||||
"quality": quality,
|
|
||||||
}
|
|
||||||
|
|
||||||
with _lock:
|
|
||||||
_announced_destinations[dest_hex] = node_data
|
|
||||||
|
|
||||||
_emit_event("announce", {"node": node_data})
|
|
||||||
|
|
||||||
|
|
||||||
# ═══════════════════════════════════════════════════════════════════
|
|
||||||
# Audio Calls over Reticulum Links
|
|
||||||
# ═══════════════════════════════════════════════════════════════════
|
|
||||||
|
|
||||||
_active_calls: dict[str, dict] = {}
|
|
||||||
|
|
||||||
|
|
||||||
def _on_incoming_call_link(link):
|
|
||||||
"""Handle incoming audio call link."""
|
|
||||||
call_id = str(link.hash.hex()) if hasattr(link, "hash") else str(id(link))
|
|
||||||
_active_calls[call_id] = {
|
|
||||||
"id": call_id,
|
|
||||||
"link": link,
|
|
||||||
"direction": "inbound",
|
|
||||||
"started_at": time.time(),
|
|
||||||
"peer_hash": link.destination.hexhash if hasattr(link.destination, "hexhash") else "",
|
|
||||||
}
|
|
||||||
|
|
||||||
link.set_packet_callback(lambda msg, pkt: _on_call_audio_packet(call_id, msg))
|
|
||||||
link.set_link_closed_callback(lambda l: _on_call_ended(call_id))
|
|
||||||
|
|
||||||
logger.info("Incoming audio call: %s", call_id[:16])
|
|
||||||
_emit_event("call.incoming", {"call_id": call_id, "peer": _active_calls[call_id]["peer_hash"]})
|
|
||||||
|
|
||||||
|
|
||||||
def initiate_call(destination_hash_hex: str) -> dict:
|
|
||||||
"""Initiate an audio call to a destination."""
|
|
||||||
if _identity is None:
|
|
||||||
return {"call_id": "", "error": "No identity"}
|
|
||||||
|
|
||||||
try:
|
|
||||||
dest_hash = bytes.fromhex(destination_hash_hex)
|
|
||||||
dest_identity = RNS.Identity.recall(dest_hash)
|
|
||||||
if dest_identity is None:
|
|
||||||
RNS.Transport.request_path(dest_hash)
|
|
||||||
return {"call_id": "", "error": "Peer not found, path requested"}
|
|
||||||
|
|
||||||
dest = RNS.Destination(
|
|
||||||
dest_identity, RNS.Destination.OUT, RNS.Destination.SINGLE,
|
|
||||||
"call", "audio",
|
|
||||||
)
|
|
||||||
|
|
||||||
link = RNS.Link(dest)
|
|
||||||
call_id = str(id(link))
|
|
||||||
|
|
||||||
_active_calls[call_id] = {
|
|
||||||
"id": call_id,
|
|
||||||
"link": link,
|
|
||||||
"direction": "outbound",
|
|
||||||
"started_at": time.time(),
|
|
||||||
"peer_hash": destination_hash_hex,
|
|
||||||
}
|
|
||||||
|
|
||||||
link.set_link_established_callback(lambda l: _emit_event("call.established", {"call_id": call_id}))
|
|
||||||
link.set_packet_callback(lambda msg, pkt: _on_call_audio_packet(call_id, msg))
|
|
||||||
link.set_link_closed_callback(lambda l: _on_call_ended(call_id))
|
|
||||||
|
|
||||||
return {"call_id": call_id, "peer": destination_hash_hex}
|
|
||||||
except Exception as e:
|
|
||||||
return {"call_id": "", "error": str(e)}
|
|
||||||
|
|
||||||
|
|
||||||
def send_call_audio(call_id: str, audio_data: bytes) -> bool:
|
|
||||||
"""Send audio data over an active call link."""
|
|
||||||
call = _active_calls.get(call_id)
|
|
||||||
if not call or not call["link"]:
|
|
||||||
return False
|
|
||||||
|
|
||||||
try:
|
|
||||||
link = call["link"]
|
|
||||||
if link.status == RNS.Link.ACTIVE and len(audio_data) <= RNS.Link.MDU:
|
|
||||||
RNS.Packet(link, audio_data).send()
|
|
||||||
return True
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
return False
|
|
||||||
|
|
||||||
|
|
||||||
def hangup_call(call_id: str):
|
|
||||||
"""End an active call."""
|
|
||||||
call = _active_calls.pop(call_id, None)
|
|
||||||
if call and call["link"]:
|
|
||||||
try:
|
|
||||||
call["link"].teardown()
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
_emit_event("call.ended", {"call_id": call_id})
|
|
||||||
|
|
||||||
|
|
||||||
def get_active_calls() -> list[dict]:
|
|
||||||
"""Return active calls (without link objects)."""
|
|
||||||
return [{
|
|
||||||
"id": c["id"],
|
|
||||||
"direction": c["direction"],
|
|
||||||
"started_at": c["started_at"],
|
|
||||||
"peer_hash": c["peer_hash"],
|
|
||||||
"duration": time.time() - c["started_at"],
|
|
||||||
} for c in _active_calls.values()]
|
|
||||||
|
|
||||||
|
|
||||||
def _on_call_audio_packet(call_id: str, audio_data):
|
|
||||||
"""Handle received audio data from a call."""
|
|
||||||
_emit_event("call.audio", {"call_id": call_id, "size": len(audio_data) if audio_data else 0})
|
|
||||||
|
|
||||||
|
|
||||||
def _on_call_ended(call_id: str):
|
|
||||||
"""Handle call link closure."""
|
|
||||||
_active_calls.pop(call_id, None)
|
|
||||||
logger.info("Call ended: %s", call_id[:16])
|
|
||||||
_emit_event("call.ended", {"call_id": call_id})
|
|
||||||
|
|
||||||
|
|
||||||
def announce_call_capability():
|
|
||||||
"""Announce that this node can receive audio calls."""
|
|
||||||
if _call_destination:
|
|
||||||
_call_destination.announce(app_data=b"rMesh Audio")
|
|
||||||
return True
|
|
||||||
return False
|
|
||||||
|
|
||||||
|
|
||||||
# ═══════════════════════════════════════════════════════════════════
|
|
||||||
# NomadNet Node Browser
|
|
||||||
# ═══════════════════════════════════════════════════════════════════
|
|
||||||
|
|
||||||
_nomadnet_nodes: dict[str, dict] = {}
|
|
||||||
|
|
||||||
|
|
||||||
class _NomadNetAnnounceHandler:
|
|
||||||
"""Track NomadNet node announces."""
|
|
||||||
aspect_filter = "nomadnetwork.node"
|
|
||||||
|
|
||||||
def received_announce(self, destination_hash, announced_identity, app_data, announce_packet_hash=None):
|
|
||||||
dest_hex = destination_hash.hex()
|
|
||||||
node_name = ""
|
|
||||||
if app_data:
|
|
||||||
try:
|
|
||||||
node_name = app_data.decode("utf-8") if isinstance(app_data, bytes) else str(app_data)
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
|
|
||||||
_nomadnet_nodes[dest_hex] = {
|
|
||||||
"destination_hash": dest_hex,
|
|
||||||
"name": node_name,
|
|
||||||
"identity_hash": announced_identity.hexhash if announced_identity else "",
|
|
||||||
"last_heard": time.time(),
|
"last_heard": time.time(),
|
||||||
}
|
}
|
||||||
_emit_event("nomadnet.announce", {"node": _nomadnet_nodes[dest_hex]})
|
|
||||||
|
|
||||||
|
|
||||||
def get_nomadnet_nodes() -> list[dict]:
|
|
||||||
"""Return discovered NomadNet nodes."""
|
|
||||||
return list(_nomadnet_nodes.values())
|
|
||||||
|
|
||||||
|
|
||||||
def browse_nomadnet_node(destination_hash_hex: str, page_path: str = "/") -> dict:
|
|
||||||
"""Request a page from a NomadNet node. Returns async — result via event."""
|
|
||||||
if _identity is None:
|
|
||||||
return {"error": "No identity"}
|
|
||||||
|
|
||||||
try:
|
|
||||||
dest_hash = bytes.fromhex(destination_hash_hex)
|
|
||||||
dest_identity = RNS.Identity.recall(dest_hash)
|
|
||||||
if dest_identity is None:
|
|
||||||
RNS.Transport.request_path(dest_hash)
|
|
||||||
return {"error": "Node not found, path requested"}
|
|
||||||
|
|
||||||
dest = RNS.Destination(
|
|
||||||
dest_identity, RNS.Destination.OUT, RNS.Destination.SINGLE,
|
|
||||||
"nomadnetwork", "node",
|
|
||||||
)
|
|
||||||
|
|
||||||
link = RNS.Link(dest)
|
|
||||||
|
|
||||||
def on_established(l):
|
|
||||||
# Request page
|
|
||||||
l.request(
|
|
||||||
page_path.encode("utf-8"),
|
|
||||||
response_callback=lambda receipt: _on_nomadnet_response(destination_hash_hex, page_path, receipt),
|
|
||||||
failed_callback=lambda receipt: _emit_event("nomadnet.page.error", {
|
|
||||||
"node": destination_hash_hex, "path": page_path, "error": "Request failed"
|
|
||||||
}),
|
|
||||||
)
|
|
||||||
|
|
||||||
link.set_link_established_callback(on_established)
|
|
||||||
return {"status": "requesting", "node": destination_hash_hex, "path": page_path}
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
return {"error": str(e)}
|
|
||||||
|
|
||||||
|
|
||||||
def _on_nomadnet_response(node_hash: str, page_path: str, receipt):
|
|
||||||
"""Handle NomadNet page response."""
|
|
||||||
try:
|
|
||||||
response = receipt.response
|
|
||||||
content = response.decode("utf-8") if isinstance(response, bytes) else str(response)
|
|
||||||
_emit_event("nomadnet.page.downloaded", {
|
|
||||||
"node": node_hash,
|
|
||||||
"path": page_path,
|
|
||||||
"content": content,
|
|
||||||
"size": len(content),
|
|
||||||
})
|
|
||||||
except Exception as e:
|
|
||||||
_emit_event("nomadnet.page.error", {
|
|
||||||
"node": node_hash, "path": page_path, "error": str(e)
|
|
||||||
})
|
|
||||||
|
|
||||||
|
|
||||||
# ═══════════════════════════════════════════════════════════════════
|
|
||||||
# Status / Query
|
|
||||||
# ═══════════════════════════════════════════════════════════════════
|
|
||||||
|
|
||||||
def get_status() -> dict:
|
def get_status() -> dict:
|
||||||
|
"""Return transport status info."""
|
||||||
if _rns_instance is None:
|
if _rns_instance is None:
|
||||||
return {"online": False, "transport_enabled": False, "identity_hash": "",
|
return {"online": False, "transport_enabled": False, "identity_hash": "",
|
||||||
"uptime_seconds": 0, "announced_count": 0, "path_count": 0}
|
"uptime_seconds": 0, "announced_count": 0, "path_count": 0}
|
||||||
|
|
@ -354,20 +81,21 @@ def get_status() -> dict:
|
||||||
"uptime_seconds": time.time() - _start_time,
|
"uptime_seconds": time.time() - _start_time,
|
||||||
"announced_count": announced_count,
|
"announced_count": announced_count,
|
||||||
"path_count": len(RNS.Transport.destinations) if hasattr(RNS.Transport, "destinations") else 0,
|
"path_count": len(RNS.Transport.destinations) if hasattr(RNS.Transport, "destinations") else 0,
|
||||||
"active_calls": len(_active_calls),
|
|
||||||
"nomadnet_nodes": len(_nomadnet_nodes),
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
def get_nodes() -> list[dict]:
|
def get_nodes() -> list[dict]:
|
||||||
|
"""Return list of known announced destinations."""
|
||||||
with _lock:
|
with _lock:
|
||||||
return list(_announced_destinations.values())
|
return list(_announced_destinations.values())
|
||||||
|
|
||||||
|
|
||||||
def get_topology() -> dict:
|
def get_topology() -> dict:
|
||||||
|
"""Return nodes and links for visualization."""
|
||||||
nodes = get_nodes()
|
nodes = get_nodes()
|
||||||
links = []
|
links = []
|
||||||
|
|
||||||
|
# Build links from destinations/path tables if available
|
||||||
dest_table = getattr(RNS.Transport, "destinations", {})
|
dest_table = getattr(RNS.Transport, "destinations", {})
|
||||||
if isinstance(dest_table, (dict, list)):
|
if isinstance(dest_table, (dict, list)):
|
||||||
try:
|
try:
|
||||||
|
|
@ -382,7 +110,7 @@ def get_topology() -> dict:
|
||||||
"active": True,
|
"active": True,
|
||||||
})
|
})
|
||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass # Transport internals may vary between RNS versions
|
||||||
|
|
||||||
return {
|
return {
|
||||||
"nodes": nodes,
|
"nodes": nodes,
|
||||||
|
|
@ -393,6 +121,7 @@ def get_topology() -> dict:
|
||||||
|
|
||||||
|
|
||||||
def get_identity_info() -> dict:
|
def get_identity_info() -> dict:
|
||||||
|
"""Return server identity information."""
|
||||||
if _identity is None:
|
if _identity is None:
|
||||||
return {"identity_hash": "", "public_key_hex": ""}
|
return {"identity_hash": "", "public_key_hex": ""}
|
||||||
return {
|
return {
|
||||||
|
|
@ -402,6 +131,7 @@ def get_identity_info() -> dict:
|
||||||
|
|
||||||
|
|
||||||
def announce():
|
def announce():
|
||||||
|
"""Announce this bridge on the network."""
|
||||||
if _destination is None:
|
if _destination is None:
|
||||||
return {"announced": False, "identity_hash": ""}
|
return {"announced": False, "identity_hash": ""}
|
||||||
_destination.announce(app_data=b"rMesh Bridge")
|
_destination.announce(app_data=b"rMesh Bridge")
|
||||||
|
|
|
||||||
|
|
@ -1,63 +0,0 @@
|
||||||
"""
|
|
||||||
WebSocket manager — broadcasts real-time events to connected clients.
|
|
||||||
"""
|
|
||||||
|
|
||||||
import asyncio
|
|
||||||
import json
|
|
||||||
import logging
|
|
||||||
from typing import Set
|
|
||||||
|
|
||||||
from fastapi import WebSocket
|
|
||||||
|
|
||||||
logger = logging.getLogger("rmesh.websocket")
|
|
||||||
|
|
||||||
_clients: Set[WebSocket] = set()
|
|
||||||
_loop: asyncio.AbstractEventLoop | None = None
|
|
||||||
|
|
||||||
|
|
||||||
def set_event_loop(loop: asyncio.AbstractEventLoop):
|
|
||||||
global _loop
|
|
||||||
_loop = loop
|
|
||||||
|
|
||||||
|
|
||||||
async def connect(websocket: WebSocket):
|
|
||||||
await websocket.accept()
|
|
||||||
_clients.add(websocket)
|
|
||||||
logger.info("WebSocket client connected (%d total)", len(_clients))
|
|
||||||
|
|
||||||
|
|
||||||
def disconnect(websocket: WebSocket):
|
|
||||||
_clients.discard(websocket)
|
|
||||||
logger.info("WebSocket client disconnected (%d remaining)", len(_clients))
|
|
||||||
|
|
||||||
|
|
||||||
async def broadcast(data: dict):
|
|
||||||
"""Send event to all connected WebSocket clients."""
|
|
||||||
if not _clients:
|
|
||||||
return
|
|
||||||
|
|
||||||
message = json.dumps(data)
|
|
||||||
dead = set()
|
|
||||||
for client in _clients:
|
|
||||||
try:
|
|
||||||
await client.send_text(message)
|
|
||||||
except Exception:
|
|
||||||
dead.add(client)
|
|
||||||
|
|
||||||
for client in dead:
|
|
||||||
_clients.discard(client)
|
|
||||||
|
|
||||||
|
|
||||||
def on_event(event: dict):
|
|
||||||
"""Synchronous callback that schedules broadcast on the event loop.
|
|
||||||
Use this as the event_listener for reticulum_bridge and lxmf_bridge."""
|
|
||||||
if _loop is None or not _clients:
|
|
||||||
return
|
|
||||||
try:
|
|
||||||
asyncio.run_coroutine_threadsafe(broadcast(event), _loop)
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
def client_count() -> int:
|
|
||||||
return len(_clients)
|
|
||||||
|
|
@ -4,5 +4,3 @@ fastapi>=0.115.0
|
||||||
uvicorn[standard]>=0.30.0
|
uvicorn[standard]>=0.30.0
|
||||||
pydantic>=2.0
|
pydantic>=2.0
|
||||||
meshcore>=2.3.0
|
meshcore>=2.3.0
|
||||||
httpx>=0.27.0
|
|
||||||
websockets>=12.0
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue