Compare commits
1 Commits
| Author | SHA1 | Date |
|---|---|---|
|
|
b078d6896e |
|
|
@ -4,12 +4,23 @@ BRIDGE_API_KEY = os.environ.get("BRIDGE_API_KEY", "")
|
|||
RNS_CONFIG_DIR = os.environ.get("RNS_CONFIG_DIR", "/data/reticulum")
|
||||
LXMF_STORAGE_DIR = os.environ.get("LXMF_STORAGE_DIR", "/data/lxmf")
|
||||
LXMF_MESSAGES_FILE = os.path.join(LXMF_STORAGE_DIR, "bridge_messages.json")
|
||||
LXMF_ATTACHMENTS_DIR = os.path.join(LXMF_STORAGE_DIR, "attachments")
|
||||
LOG_LEVEL = int(os.environ.get("RNS_LOG_LEVEL", "4"))
|
||||
|
||||
# MeshCore companion node connection
|
||||
MESHCORE_ENABLED = os.environ.get("MESHCORE_ENABLED", "false").lower() == "true"
|
||||
MESHCORE_HOST = os.environ.get("MESHCORE_HOST", "") # TCP host (WiFi companion)
|
||||
MESHCORE_HOST = os.environ.get("MESHCORE_HOST", "")
|
||||
MESHCORE_PORT = int(os.environ.get("MESHCORE_PORT", "5000"))
|
||||
MESHCORE_SERIAL = os.environ.get("MESHCORE_SERIAL", "") # Serial port (USB companion)
|
||||
MESHCORE_SERIAL = os.environ.get("MESHCORE_SERIAL", "")
|
||||
MESHCORE_DATA_DIR = os.environ.get("MESHCORE_DATA_DIR", "/data/meshcore")
|
||||
MESHCORE_MESSAGES_FILE = os.path.join(MESHCORE_DATA_DIR, "messages.json")
|
||||
|
||||
# Feature flags
|
||||
AUTO_RESEND_ON_ANNOUNCE = os.environ.get("AUTO_RESEND_ON_ANNOUNCE", "true").lower() == "true"
|
||||
LOCAL_PROPAGATION_NODE = os.environ.get("LOCAL_PROPAGATION_NODE", "true").lower() == "true"
|
||||
PREFERRED_PROPAGATION_NODE = os.environ.get("PREFERRED_PROPAGATION_NODE", "")
|
||||
PROPAGATION_SYNC_INTERVAL = int(os.environ.get("PROPAGATION_SYNC_INTERVAL", "300"))
|
||||
|
||||
# rChats bridge
|
||||
RCHATS_BRIDGE_ENABLED = os.environ.get("RCHATS_BRIDGE_ENABLED", "false").lower() == "true"
|
||||
RCHATS_INTERNAL_URL = os.environ.get("RCHATS_INTERNAL_URL", "http://rchats:3000")
|
||||
|
|
|
|||
|
|
@ -1,7 +1,9 @@
|
|||
"""
|
||||
LXMF bridge — message send/receive via the LXMF protocol on Reticulum.
|
||||
LXMF bridge — rich message handling, propagation node management,
|
||||
auto-resend on announce, file/image/audio attachments.
|
||||
"""
|
||||
|
||||
import base64
|
||||
import json
|
||||
import time
|
||||
import uuid
|
||||
|
|
@ -12,7 +14,11 @@ import os
|
|||
import RNS
|
||||
import LXMF
|
||||
|
||||
from .config import LXMF_STORAGE_DIR, LXMF_MESSAGES_FILE
|
||||
from .config import (
|
||||
LXMF_STORAGE_DIR, LXMF_MESSAGES_FILE, LXMF_ATTACHMENTS_DIR,
|
||||
AUTO_RESEND_ON_ANNOUNCE, LOCAL_PROPAGATION_NODE,
|
||||
PREFERRED_PROPAGATION_NODE, PROPAGATION_SYNC_INTERVAL,
|
||||
)
|
||||
|
||||
logger = logging.getLogger("rmesh.lxmf")
|
||||
|
||||
|
|
@ -21,6 +27,9 @@ _local_delivery_destination: RNS.Destination | None = None
|
|||
_identity: RNS.Identity | None = None
|
||||
_messages: list[dict] = []
|
||||
_lock = threading.Lock()
|
||||
_event_listeners: list = [] # WebSocket broadcast callbacks
|
||||
_propagation_nodes: dict[str, dict] = {}
|
||||
_last_propagation_sync: float = 0
|
||||
|
||||
|
||||
def init(identity: RNS.Identity):
|
||||
|
|
@ -30,16 +39,30 @@ def init(identity: RNS.Identity):
|
|||
_identity = identity
|
||||
|
||||
os.makedirs(LXMF_STORAGE_DIR, exist_ok=True)
|
||||
os.makedirs(LXMF_ATTACHMENTS_DIR, exist_ok=True)
|
||||
|
||||
_router = LXMF.LXMRouter(
|
||||
identity=_identity,
|
||||
storagepath=LXMF_STORAGE_DIR,
|
||||
)
|
||||
|
||||
# Enable propagation node
|
||||
_router.enable_propagation()
|
||||
# Increase transfer limit for attachments (10 MB)
|
||||
_router.delivery_per_transfer_limit = 10000
|
||||
|
||||
# Register local delivery destination for receiving messages
|
||||
# Enable local propagation node
|
||||
if LOCAL_PROPAGATION_NODE:
|
||||
_router.enable_propagation()
|
||||
logger.info("Local LXMF propagation node enabled")
|
||||
|
||||
# Set preferred propagation node
|
||||
if PREFERRED_PROPAGATION_NODE:
|
||||
try:
|
||||
_router.set_outbound_propagation_node(bytes.fromhex(PREFERRED_PROPAGATION_NODE))
|
||||
logger.info("Set preferred propagation node: %s", PREFERRED_PROPAGATION_NODE[:16])
|
||||
except Exception as e:
|
||||
logger.warning("Failed to set propagation node: %s", e)
|
||||
|
||||
# Register local delivery
|
||||
_local_delivery_destination = _router.register_delivery_identity(
|
||||
_identity,
|
||||
display_name="rMesh Bridge",
|
||||
|
|
@ -47,14 +70,152 @@ def init(identity: RNS.Identity):
|
|||
|
||||
_router.register_delivery_callback(_delivery_callback)
|
||||
|
||||
# Load persisted messages
|
||||
# Register announce handlers for auto-resend and propagation tracking
|
||||
RNS.Transport.register_announce_handler(_LxmfAnnounceHandler())
|
||||
RNS.Transport.register_announce_handler(_PropagationAnnounceHandler())
|
||||
|
||||
_load_messages()
|
||||
|
||||
# Start propagation sync thread
|
||||
if PROPAGATION_SYNC_INTERVAL > 0:
|
||||
sync_thread = threading.Thread(target=_propagation_sync_loop, daemon=True)
|
||||
sync_thread.start()
|
||||
|
||||
logger.info("LXMF bridge ready — delivery hash: %s", _identity.hexhash)
|
||||
|
||||
|
||||
def register_event_listener(callback):
|
||||
"""Register a callback for real-time events (WebSocket broadcast)."""
|
||||
_event_listeners.append(callback)
|
||||
|
||||
|
||||
def _emit_event(event_type: str, data: dict):
|
||||
"""Emit event to all registered listeners."""
|
||||
event = {"type": event_type, **data}
|
||||
for listener in _event_listeners:
|
||||
try:
|
||||
listener(event)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
# ═══════════════════════════════════════════════════════════════════
|
||||
# LXMF Rich Field Parsing
|
||||
# ═══════════════════════════════════════════════════════════════════
|
||||
|
||||
def _parse_lxmf_fields(lxmf_message) -> dict:
|
||||
"""Extract all LXMF fields from a message into a serializable dict."""
|
||||
fields = {}
|
||||
|
||||
try:
|
||||
lxmf_fields = lxmf_message.get_fields()
|
||||
except Exception:
|
||||
return fields
|
||||
|
||||
# Image attachment
|
||||
if LXMF.FIELD_IMAGE in lxmf_fields:
|
||||
try:
|
||||
image_data = lxmf_fields[LXMF.FIELD_IMAGE]
|
||||
image_type = image_data[0] if len(image_data) > 0 else "unknown"
|
||||
image_bytes = image_data[1] if len(image_data) > 1 else b""
|
||||
if image_bytes:
|
||||
attachment_id = str(uuid.uuid4())
|
||||
ext = "jpg" if "jpeg" in str(image_type).lower() or "jpg" in str(image_type).lower() else "png"
|
||||
filename = f"{attachment_id}.{ext}"
|
||||
filepath = os.path.join(LXMF_ATTACHMENTS_DIR, filename)
|
||||
with open(filepath, "wb") as f:
|
||||
f.write(image_bytes if isinstance(image_bytes, bytes) else bytes(image_bytes))
|
||||
fields["image"] = {
|
||||
"type": str(image_type),
|
||||
"filename": filename,
|
||||
"size": len(image_bytes),
|
||||
"base64": base64.b64encode(image_bytes if isinstance(image_bytes, bytes) else bytes(image_bytes)).decode("ascii")[:200] + "...",
|
||||
}
|
||||
except Exception as e:
|
||||
logger.warning("Failed to parse image field: %s", e)
|
||||
|
||||
# File attachments
|
||||
if LXMF.FIELD_FILE_ATTACHMENTS in lxmf_fields:
|
||||
try:
|
||||
file_list = lxmf_fields[LXMF.FIELD_FILE_ATTACHMENTS]
|
||||
attachments = []
|
||||
for file_data in file_list:
|
||||
file_name = file_data[0] if len(file_data) > 0 else "unknown"
|
||||
file_bytes = file_data[1] if len(file_data) > 1 else b""
|
||||
if file_bytes:
|
||||
safe_name = "".join(c for c in str(file_name) if c.isalnum() or c in ".-_")
|
||||
attachment_id = str(uuid.uuid4())
|
||||
stored_name = f"{attachment_id}_{safe_name}"
|
||||
filepath = os.path.join(LXMF_ATTACHMENTS_DIR, stored_name)
|
||||
with open(filepath, "wb") as f:
|
||||
f.write(file_bytes if isinstance(file_bytes, bytes) else bytes(file_bytes))
|
||||
attachments.append({
|
||||
"name": str(file_name),
|
||||
"stored_name": stored_name,
|
||||
"size": len(file_bytes),
|
||||
})
|
||||
if attachments:
|
||||
fields["file_attachments"] = attachments
|
||||
except Exception as e:
|
||||
logger.warning("Failed to parse file attachments: %s", e)
|
||||
|
||||
# Audio message (Codec2)
|
||||
if LXMF.FIELD_AUDIO in lxmf_fields:
|
||||
try:
|
||||
audio_data = lxmf_fields[LXMF.FIELD_AUDIO]
|
||||
audio_mode = audio_data[0] if len(audio_data) > 0 else 0
|
||||
audio_bytes = audio_data[1] if len(audio_data) > 1 else b""
|
||||
if audio_bytes:
|
||||
attachment_id = str(uuid.uuid4())
|
||||
filename = f"{attachment_id}.c2"
|
||||
filepath = os.path.join(LXMF_ATTACHMENTS_DIR, filename)
|
||||
with open(filepath, "wb") as f:
|
||||
f.write(audio_bytes if isinstance(audio_bytes, bytes) else bytes(audio_bytes))
|
||||
fields["audio"] = {
|
||||
"codec": "codec2",
|
||||
"mode": audio_mode,
|
||||
"filename": filename,
|
||||
"size": len(audio_bytes),
|
||||
}
|
||||
except Exception as e:
|
||||
logger.warning("Failed to parse audio field: %s", e)
|
||||
|
||||
# Icon appearance
|
||||
if LXMF.FIELD_ICON_APPEARANCE in lxmf_fields:
|
||||
try:
|
||||
icon_data = lxmf_fields[LXMF.FIELD_ICON_APPEARANCE]
|
||||
fields["icon"] = {
|
||||
"name": str(icon_data[0]) if len(icon_data) > 0 else "",
|
||||
"foreground": "#" + icon_data[1].hex() if len(icon_data) > 1 and hasattr(icon_data[1], "hex") else "",
|
||||
"background": "#" + icon_data[2].hex() if len(icon_data) > 2 and hasattr(icon_data[2], "hex") else "",
|
||||
}
|
||||
except Exception as e:
|
||||
logger.warning("Failed to parse icon field: %s", e)
|
||||
|
||||
# Commands
|
||||
if LXMF.FIELD_COMMANDS in lxmf_fields:
|
||||
try:
|
||||
fields["commands"] = [str(cmd) for cmd in lxmf_fields[LXMF.FIELD_COMMANDS]]
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return fields
|
||||
|
||||
|
||||
# ═══════════════════════════════════════════════════════════════════
|
||||
# Message Handling
|
||||
# ═══════════════════════════════════════════════════════════════════
|
||||
|
||||
def _delivery_callback(message):
|
||||
"""Handle incoming LXMF message."""
|
||||
"""Handle incoming LXMF message with rich field extraction."""
|
||||
# Parse rich fields
|
||||
fields = _parse_lxmf_fields(message)
|
||||
|
||||
# Get signal quality if available
|
||||
rssi = getattr(message, "rssi", None)
|
||||
snr = getattr(message, "snr", None)
|
||||
quality = getattr(message, "quality", None)
|
||||
|
||||
msg_record = {
|
||||
"id": str(uuid.uuid4()),
|
||||
"direction": "inbound",
|
||||
|
|
@ -62,6 +223,10 @@ def _delivery_callback(message):
|
|||
"recipient_hash": _identity.hexhash if _identity else "",
|
||||
"title": message.title.decode("utf-8") if isinstance(message.title, bytes) else str(message.title or ""),
|
||||
"content": message.content.decode("utf-8") if isinstance(message.content, bytes) else str(message.content or ""),
|
||||
"fields": fields,
|
||||
"rssi": rssi,
|
||||
"snr": snr,
|
||||
"quality": quality,
|
||||
"status": "delivered",
|
||||
"timestamp": time.time(),
|
||||
}
|
||||
|
|
@ -70,11 +235,19 @@ def _delivery_callback(message):
|
|||
_messages.append(msg_record)
|
||||
_save_messages()
|
||||
|
||||
logger.info("Received LXMF message from %s", msg_record["sender_hash"][:16])
|
||||
logger.info("Received LXMF message from %s (fields: %s)",
|
||||
msg_record["sender_hash"][:16],
|
||||
list(fields.keys()) if fields else "none")
|
||||
|
||||
_emit_event("lxmf.delivery", {"message": msg_record})
|
||||
|
||||
|
||||
def send_message(destination_hash_hex: str, content: str, title: str = "") -> dict:
|
||||
"""Send an LXMF message to a destination hash."""
|
||||
def send_message(destination_hash_hex: str, content: str, title: str = "",
|
||||
image_bytes: bytes = None, image_type: str = "image/jpeg",
|
||||
file_attachments: list = None,
|
||||
audio_bytes: bytes = None, audio_mode: int = 0,
|
||||
try_propagation_on_fail: bool = True) -> dict:
|
||||
"""Send an LXMF message with optional attachments."""
|
||||
if _router is None or _identity is None:
|
||||
return {"id": "", "status": "failed"}
|
||||
|
||||
|
|
@ -83,72 +256,82 @@ def send_message(destination_hash_hex: str, content: str, title: str = "") -> di
|
|||
except ValueError:
|
||||
return {"id": "", "status": "failed"}
|
||||
|
||||
# Look up identity for destination
|
||||
dest_identity = RNS.Identity.recall(dest_hash)
|
||||
|
||||
msg_id = str(uuid.uuid4())
|
||||
|
||||
if dest_identity is None:
|
||||
# Request path first, the message may be deliverable later
|
||||
RNS.Transport.request_path(dest_hash)
|
||||
msg_record = {
|
||||
"id": msg_id,
|
||||
"direction": "outbound",
|
||||
"sender_hash": _identity.hexhash,
|
||||
"recipient_hash": destination_hash_hex,
|
||||
"title": title,
|
||||
"content": content,
|
||||
"status": "pending",
|
||||
"timestamp": time.time(),
|
||||
"id": msg_id, "direction": "outbound",
|
||||
"sender_hash": _identity.hexhash, "recipient_hash": destination_hash_hex,
|
||||
"title": title, "content": content, "fields": {},
|
||||
"status": "pending", "timestamp": time.time(),
|
||||
}
|
||||
with _lock:
|
||||
_messages.append(msg_record)
|
||||
_save_messages()
|
||||
return msg_record
|
||||
|
||||
# Create destination for the recipient
|
||||
dest = RNS.Destination(
|
||||
dest_identity, RNS.Destination.OUT, RNS.Destination.SINGLE,
|
||||
"lxmf", "delivery",
|
||||
)
|
||||
|
||||
lxm = LXMF.LXMessage(
|
||||
dest,
|
||||
_router.get_delivery_destination(),
|
||||
dest, _router.get_delivery_destination(),
|
||||
content.encode("utf-8") if isinstance(content, str) else content,
|
||||
title=title.encode("utf-8") if isinstance(title, str) else title,
|
||||
desired_method=LXMF.LXMessage.DIRECT,
|
||||
)
|
||||
|
||||
# Add rich fields
|
||||
if image_bytes:
|
||||
lxm.fields[LXMF.FIELD_IMAGE] = [image_type, image_bytes]
|
||||
if file_attachments:
|
||||
lxm.fields[LXMF.FIELD_FILE_ATTACHMENTS] = [
|
||||
[att["name"], att["bytes"]] for att in file_attachments
|
||||
]
|
||||
if audio_bytes:
|
||||
lxm.fields[LXMF.FIELD_AUDIO] = [audio_mode, audio_bytes]
|
||||
|
||||
# Enable propagation fallback
|
||||
lxm.try_propagation_on_fail = try_propagation_on_fail
|
||||
|
||||
msg_record = {
|
||||
"id": msg_id,
|
||||
"direction": "outbound",
|
||||
"sender_hash": _identity.hexhash,
|
||||
"recipient_hash": destination_hash_hex,
|
||||
"title": title,
|
||||
"content": content,
|
||||
"status": "pending",
|
||||
"timestamp": time.time(),
|
||||
"id": msg_id, "direction": "outbound",
|
||||
"sender_hash": _identity.hexhash, "recipient_hash": destination_hash_hex,
|
||||
"title": title, "content": content, "fields": {},
|
||||
"status": "pending", "timestamp": time.time(),
|
||||
}
|
||||
|
||||
def _delivery_callback_outbound(message):
|
||||
def _on_delivered(message):
|
||||
with _lock:
|
||||
for m in _messages:
|
||||
if m["id"] == msg_id:
|
||||
m["status"] = "delivered"
|
||||
break
|
||||
_save_messages()
|
||||
_emit_event("lxmf.state_updated", {"message_id": msg_id, "status": "delivered"})
|
||||
|
||||
def _on_failed(message):
|
||||
# Try propagation node if direct failed
|
||||
if getattr(message, "try_propagation_on_fail", False) and _router.outbound_propagation_node:
|
||||
logger.info("Direct delivery failed, trying propagation node for %s", destination_hash_hex[:16])
|
||||
message.desired_method = LXMF.LXMessage.PROPAGATED
|
||||
message.try_propagation_on_fail = False
|
||||
_router.handle_outbound(message)
|
||||
return
|
||||
|
||||
def _failed_callback(message):
|
||||
with _lock:
|
||||
for m in _messages:
|
||||
if m["id"] == msg_id:
|
||||
m["status"] = "failed"
|
||||
break
|
||||
_save_messages()
|
||||
_emit_event("lxmf.state_updated", {"message_id": msg_id, "status": "failed"})
|
||||
|
||||
lxm.delivery_callback = _delivery_callback_outbound
|
||||
lxm.failed_callback = _failed_callback
|
||||
lxm.delivery_callback = _on_delivered
|
||||
lxm.failed_callback = _on_failed
|
||||
|
||||
_router.handle_outbound(lxm)
|
||||
|
||||
|
|
@ -156,18 +339,159 @@ def send_message(destination_hash_hex: str, content: str, title: str = "") -> di
|
|||
_messages.append(msg_record)
|
||||
_save_messages()
|
||||
|
||||
_emit_event("lxmf.created", {"message": msg_record})
|
||||
return msg_record
|
||||
|
||||
|
||||
# ═══════════════════════════════════════════════════════════════════
|
||||
# Auto-Resend Failed Messages on Announce
|
||||
# ═══════════════════════════════════════════════════════════════════
|
||||
|
||||
class _LxmfAnnounceHandler:
|
||||
"""Re-attempt failed messages when a peer announces."""
|
||||
aspect_filter = "lxmf.delivery"
|
||||
|
||||
def received_announce(self, destination_hash, announced_identity, app_data, announce_packet_hash=None):
|
||||
if not AUTO_RESEND_ON_ANNOUNCE:
|
||||
return
|
||||
|
||||
dest_hex = destination_hash.hex()
|
||||
resend_count = 0
|
||||
|
||||
with _lock:
|
||||
for msg in _messages:
|
||||
if (msg["status"] == "failed" and
|
||||
msg["direction"] == "outbound" and
|
||||
msg["recipient_hash"] == dest_hex):
|
||||
msg["status"] = "pending"
|
||||
resend_count += 1
|
||||
# Re-send in background
|
||||
threading.Thread(
|
||||
target=_resend_message, args=(msg,), daemon=True
|
||||
).start()
|
||||
|
||||
if resend_count > 0:
|
||||
_save_messages()
|
||||
|
||||
if resend_count > 0:
|
||||
logger.info("Auto-resending %d failed messages to %s", resend_count, dest_hex[:16])
|
||||
|
||||
|
||||
def _resend_message(msg_record: dict):
|
||||
"""Resend a previously failed message."""
|
||||
time.sleep(1) # Brief delay to let path establish
|
||||
result = send_message(
|
||||
msg_record["recipient_hash"],
|
||||
msg_record["content"],
|
||||
msg_record.get("title", ""),
|
||||
)
|
||||
if result.get("status") == "pending":
|
||||
# Remove the duplicate — original is being retried
|
||||
with _lock:
|
||||
_messages[:] = [m for m in _messages if m["id"] != result["id"]]
|
||||
_save_messages()
|
||||
|
||||
|
||||
# ═══════════════════════════════════════════════════════════════════
|
||||
# Propagation Node Management
|
||||
# ═══════════════════════════════════════════════════════════════════
|
||||
|
||||
class _PropagationAnnounceHandler:
|
||||
"""Track LXMF propagation node announces."""
|
||||
aspect_filter = "lxmf.propagation"
|
||||
|
||||
def received_announce(self, destination_hash, announced_identity, app_data, announce_packet_hash=None):
|
||||
dest_hex = destination_hash.hex()
|
||||
node_info = {
|
||||
"destination_hash": dest_hex,
|
||||
"identity_hash": announced_identity.hexhash if announced_identity else "",
|
||||
"last_heard": time.time(),
|
||||
"enabled": True,
|
||||
"per_transfer_limit": 0,
|
||||
}
|
||||
|
||||
# Parse propagation node app data
|
||||
if app_data:
|
||||
try:
|
||||
import RNS.vendor.umsgpack as msgpack
|
||||
data = msgpack.unpackb(app_data)
|
||||
if isinstance(data, dict):
|
||||
node_info["enabled"] = data.get("enabled", True)
|
||||
node_info["per_transfer_limit"] = data.get("per_transfer_limit", 0)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
_propagation_nodes[dest_hex] = node_info
|
||||
_emit_event("propagation.announce", {"node": node_info})
|
||||
|
||||
|
||||
def get_propagation_nodes() -> list[dict]:
|
||||
"""Return list of known propagation nodes."""
|
||||
return list(_propagation_nodes.values())
|
||||
|
||||
|
||||
def set_preferred_propagation_node(destination_hash_hex: str) -> bool:
|
||||
"""Set the preferred outbound propagation node."""
|
||||
if _router is None:
|
||||
return False
|
||||
try:
|
||||
if destination_hash_hex:
|
||||
_router.set_outbound_propagation_node(bytes.fromhex(destination_hash_hex))
|
||||
else:
|
||||
_router.outbound_propagation_node = None
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.warning("Failed to set propagation node: %s", e)
|
||||
return False
|
||||
|
||||
|
||||
def get_propagation_status() -> dict:
|
||||
"""Return propagation node status."""
|
||||
if _router is None:
|
||||
return {"enabled": False, "outbound_node": None, "transfer_state": "idle"}
|
||||
|
||||
outbound = _router.get_outbound_propagation_node()
|
||||
return {
|
||||
"local_enabled": LOCAL_PROPAGATION_NODE,
|
||||
"outbound_node": outbound.hex() if outbound else None,
|
||||
"transfer_state": str(getattr(_router, "propagation_transfer_state", "unknown")),
|
||||
"known_nodes": len(_propagation_nodes),
|
||||
"last_sync": _last_propagation_sync,
|
||||
}
|
||||
|
||||
|
||||
def sync_propagation_messages():
|
||||
"""Request messages from the propagation node."""
|
||||
global _last_propagation_sync
|
||||
if _router is None or _identity is None:
|
||||
return
|
||||
try:
|
||||
_router.request_messages_from_propagation_node(_identity)
|
||||
_last_propagation_sync = time.time()
|
||||
logger.info("Syncing messages from propagation node")
|
||||
except Exception as e:
|
||||
logger.warning("Propagation sync failed: %s", e)
|
||||
|
||||
|
||||
def _propagation_sync_loop():
|
||||
"""Background thread to periodically sync propagation messages."""
|
||||
while True:
|
||||
time.sleep(PROPAGATION_SYNC_INTERVAL)
|
||||
if _router and _router.get_outbound_propagation_node():
|
||||
sync_propagation_messages()
|
||||
|
||||
|
||||
# ═══════════════════════════════════════════════════════════════════
|
||||
# Query / Persistence
|
||||
# ═══════════════════════════════════════════════════════════════════
|
||||
|
||||
def get_messages(limit: int = 100, offset: int = 0) -> list[dict]:
|
||||
"""Return stored messages."""
|
||||
with _lock:
|
||||
sorted_msgs = sorted(_messages, key=lambda m: m["timestamp"], reverse=True)
|
||||
return sorted_msgs[offset:offset + limit]
|
||||
|
||||
|
||||
def get_message(msg_id: str) -> dict | None:
|
||||
"""Return a single message by ID."""
|
||||
with _lock:
|
||||
for m in _messages:
|
||||
if m["id"] == msg_id:
|
||||
|
|
@ -176,13 +500,19 @@ def get_message(msg_id: str) -> dict | None:
|
|||
|
||||
|
||||
def get_total_count() -> int:
|
||||
"""Return total message count."""
|
||||
with _lock:
|
||||
return len(_messages)
|
||||
|
||||
|
||||
def get_attachment_path(filename: str) -> str | None:
|
||||
"""Return full path to an attachment file if it exists."""
|
||||
filepath = os.path.join(LXMF_ATTACHMENTS_DIR, filename)
|
||||
if os.path.exists(filepath):
|
||||
return filepath
|
||||
return None
|
||||
|
||||
|
||||
def _load_messages():
|
||||
"""Load messages from persistent storage."""
|
||||
global _messages
|
||||
if os.path.exists(LXMF_MESSAGES_FILE):
|
||||
try:
|
||||
|
|
@ -195,7 +525,6 @@ def _load_messages():
|
|||
|
||||
|
||||
def _save_messages():
|
||||
"""Persist messages to storage (call with _lock held)."""
|
||||
try:
|
||||
os.makedirs(os.path.dirname(LXMF_MESSAGES_FILE), exist_ok=True)
|
||||
with open(LXMF_MESSAGES_FILE, "w") as f:
|
||||
|
|
|
|||
214
app/main.py
214
app/main.py
|
|
@ -1,18 +1,19 @@
|
|||
"""
|
||||
rMesh Bridge — FastAPI service exposing Reticulum (Internet backbone)
|
||||
and MeshCore (LoRa mesh) as a unified REST API.
|
||||
rMesh Bridge — Unified REST + WebSocket API for Reticulum (Internet backbone),
|
||||
MeshCore (LoRa mesh), LXMF messaging, audio calls, and NomadNet browsing.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
from contextlib import asynccontextmanager
|
||||
from typing import Optional
|
||||
|
||||
from fastapi import FastAPI, HTTPException, Header
|
||||
from fastapi import FastAPI, HTTPException, Header, WebSocket, WebSocketDisconnect
|
||||
from fastapi.responses import FileResponse
|
||||
from pydantic import BaseModel
|
||||
from typing import Annotated
|
||||
from typing import Annotated, Optional
|
||||
|
||||
from . import reticulum_bridge, lxmf_bridge, meshcore_bridge
|
||||
from .config import BRIDGE_API_KEY, MESHCORE_ENABLED
|
||||
from . import reticulum_bridge, lxmf_bridge, meshcore_bridge, websocket_manager, rchats_bridge
|
||||
from .config import BRIDGE_API_KEY, MESHCORE_ENABLED, RCHATS_BRIDGE_ENABLED
|
||||
from .models import (
|
||||
StatusResponse, NodesResponse, TopologyResponse,
|
||||
MessageIn, MessageOut, MessagesResponse,
|
||||
|
|
@ -25,9 +26,22 @@ logger = logging.getLogger("rmesh.api")
|
|||
|
||||
@asynccontextmanager
|
||||
async def lifespan(app: FastAPI):
|
||||
"""Initialize Reticulum and MeshCore on startup."""
|
||||
"""Initialize all subsystems on startup."""
|
||||
logger.info("Starting rMesh bridge...")
|
||||
|
||||
# Set up WebSocket event loop
|
||||
loop = asyncio.get_event_loop()
|
||||
websocket_manager.set_event_loop(loop)
|
||||
|
||||
# Register WebSocket broadcast as event listener for all subsystems
|
||||
reticulum_bridge.register_event_listener(websocket_manager.on_event)
|
||||
lxmf_bridge.register_event_listener(websocket_manager.on_event)
|
||||
|
||||
# Register rChats bridge as event listener
|
||||
if RCHATS_BRIDGE_ENABLED:
|
||||
lxmf_bridge.register_event_listener(rchats_bridge.on_lxmf_event)
|
||||
rchats_bridge.init()
|
||||
|
||||
# Reticulum (Internet backbone)
|
||||
reticulum_bridge.init()
|
||||
from .reticulum_bridge import _identity
|
||||
|
|
@ -36,6 +50,7 @@ async def lifespan(app: FastAPI):
|
|||
else:
|
||||
logger.error("No Reticulum identity available for LXMF")
|
||||
reticulum_bridge.announce()
|
||||
reticulum_bridge.announce_call_capability()
|
||||
|
||||
# MeshCore (LoRa mesh)
|
||||
if MESHCORE_ENABLED:
|
||||
|
|
@ -60,6 +75,25 @@ def verify_api_key(x_bridge_api_key: Annotated[str, Header()] = ""):
|
|||
raise HTTPException(status_code=401, detail="Invalid API key")
|
||||
|
||||
|
||||
# ═══════════════════════════════════════════════════════════════════
|
||||
# WebSocket (real-time events)
|
||||
# ═══════════════════════════════════════════════════════════════════
|
||||
|
||||
@app.websocket("/ws")
|
||||
async def websocket_endpoint(websocket: WebSocket):
|
||||
await websocket_manager.connect(websocket)
|
||||
try:
|
||||
while True:
|
||||
data = await websocket.receive_text()
|
||||
# Handle ping/pong
|
||||
if data == "ping":
|
||||
await websocket.send_text('{"type":"pong"}')
|
||||
except WebSocketDisconnect:
|
||||
websocket_manager.disconnect(websocket)
|
||||
except Exception:
|
||||
websocket_manager.disconnect(websocket)
|
||||
|
||||
|
||||
# ═══════════════════════════════════════════════════════════════════
|
||||
# Health & Combined Status
|
||||
# ═══════════════════════════════════════════════════════════════════
|
||||
|
|
@ -72,15 +106,18 @@ async def health():
|
|||
@app.get("/api/status")
|
||||
async def combined_status(x_bridge_api_key: Annotated[str, Header()] = ""):
|
||||
verify_api_key(x_bridge_api_key)
|
||||
result = {
|
||||
return {
|
||||
"reticulum": reticulum_bridge.get_status(),
|
||||
"meshcore": meshcore_bridge.get_status() if MESHCORE_ENABLED else {"connected": False, "device_info": {}, "contact_count": 0, "message_count": 0},
|
||||
"meshcore": meshcore_bridge.get_status() if MESHCORE_ENABLED else {
|
||||
"connected": False, "device_info": {}, "contact_count": 0, "message_count": 0
|
||||
},
|
||||
"propagation": lxmf_bridge.get_propagation_status(),
|
||||
"websocket_clients": websocket_manager.client_count(),
|
||||
}
|
||||
return result
|
||||
|
||||
|
||||
# ═══════════════════════════════════════════════════════════════════
|
||||
# Reticulum Endpoints (Internet backbone)
|
||||
# Reticulum Endpoints
|
||||
# ═══════════════════════════════════════════════════════════════════
|
||||
|
||||
@app.get("/api/reticulum/status", response_model=StatusResponse)
|
||||
|
|
@ -89,14 +126,14 @@ async def reticulum_status(x_bridge_api_key: Annotated[str, Header()] = ""):
|
|||
return reticulum_bridge.get_status()
|
||||
|
||||
|
||||
@app.get("/api/reticulum/nodes", response_model=NodesResponse)
|
||||
@app.get("/api/reticulum/nodes")
|
||||
async def reticulum_nodes(x_bridge_api_key: Annotated[str, Header()] = ""):
|
||||
verify_api_key(x_bridge_api_key)
|
||||
node_list = reticulum_bridge.get_nodes()
|
||||
return {"nodes": node_list, "total": len(node_list)}
|
||||
|
||||
|
||||
@app.get("/api/reticulum/topology", response_model=TopologyResponse)
|
||||
@app.get("/api/reticulum/topology")
|
||||
async def reticulum_topology(x_bridge_api_key: Annotated[str, Header()] = ""):
|
||||
verify_api_key(x_bridge_api_key)
|
||||
return reticulum_bridge.get_topology()
|
||||
|
|
@ -114,7 +151,11 @@ async def reticulum_identity(x_bridge_api_key: Annotated[str, Header()] = ""):
|
|||
return reticulum_bridge.get_identity_info()
|
||||
|
||||
|
||||
@app.get("/api/reticulum/messages", response_model=MessagesResponse)
|
||||
# ═══════════════════════════════════════════════════════════════════
|
||||
# LXMF Messages (with rich fields)
|
||||
# ═══════════════════════════════════════════════════════════════════
|
||||
|
||||
@app.get("/api/reticulum/messages")
|
||||
async def reticulum_messages(
|
||||
x_bridge_api_key: Annotated[str, Header()] = "",
|
||||
limit: int = 100, offset: int = 0,
|
||||
|
|
@ -124,7 +165,7 @@ async def reticulum_messages(
|
|||
return {"messages": msgs, "total": lxmf_bridge.get_total_count()}
|
||||
|
||||
|
||||
@app.post("/api/reticulum/messages", response_model=MessageOut)
|
||||
@app.post("/api/reticulum/messages")
|
||||
async def reticulum_send_message(
|
||||
body: MessageIn,
|
||||
x_bridge_api_key: Annotated[str, Header()] = "",
|
||||
|
|
@ -140,24 +181,120 @@ async def reticulum_send_message(
|
|||
return result
|
||||
|
||||
|
||||
# Keep old /api/status path as alias for backwards compat
|
||||
@app.get("/api/nodes", response_model=NodesResponse)
|
||||
async def nodes_compat(x_bridge_api_key: Annotated[str, Header()] = ""):
|
||||
@app.get("/api/reticulum/messages/{msg_id}")
|
||||
async def reticulum_get_message(msg_id: str, x_bridge_api_key: Annotated[str, Header()] = ""):
|
||||
verify_api_key(x_bridge_api_key)
|
||||
node_list = reticulum_bridge.get_nodes()
|
||||
return {"nodes": node_list, "total": len(node_list)}
|
||||
msg = lxmf_bridge.get_message(msg_id)
|
||||
if msg is None:
|
||||
raise HTTPException(status_code=404, detail="Message not found")
|
||||
return msg
|
||||
|
||||
|
||||
@app.get("/api/topology", response_model=TopologyResponse)
|
||||
async def topology_compat(x_bridge_api_key: Annotated[str, Header()] = ""):
|
||||
@app.get("/api/attachments/{filename}")
|
||||
async def get_attachment(filename: str, x_bridge_api_key: Annotated[str, Header()] = ""):
|
||||
verify_api_key(x_bridge_api_key)
|
||||
return reticulum_bridge.get_topology()
|
||||
# Sanitize filename
|
||||
safe = "".join(c for c in filename if c.isalnum() or c in ".-_")
|
||||
path = lxmf_bridge.get_attachment_path(safe)
|
||||
if path is None:
|
||||
raise HTTPException(status_code=404, detail="Attachment not found")
|
||||
return FileResponse(path)
|
||||
|
||||
|
||||
@app.get("/api/identity", response_model=IdentityResponse)
|
||||
async def identity_compat(x_bridge_api_key: Annotated[str, Header()] = ""):
|
||||
# ═══════════════════════════════════════════════════════════════════
|
||||
# Propagation Node Management
|
||||
# ═══════════════════════════════════════════════════════════════════
|
||||
|
||||
@app.get("/api/propagation/status")
|
||||
async def propagation_status(x_bridge_api_key: Annotated[str, Header()] = ""):
|
||||
verify_api_key(x_bridge_api_key)
|
||||
return reticulum_bridge.get_identity_info()
|
||||
return lxmf_bridge.get_propagation_status()
|
||||
|
||||
|
||||
@app.get("/api/propagation/nodes")
|
||||
async def propagation_nodes(x_bridge_api_key: Annotated[str, Header()] = ""):
|
||||
verify_api_key(x_bridge_api_key)
|
||||
nodes = lxmf_bridge.get_propagation_nodes()
|
||||
return {"nodes": nodes, "total": len(nodes)}
|
||||
|
||||
|
||||
class PropagationNodeSet(BaseModel):
|
||||
destination_hash: str
|
||||
|
||||
|
||||
@app.post("/api/propagation/preferred")
|
||||
async def set_preferred_propagation(body: PropagationNodeSet, x_bridge_api_key: Annotated[str, Header()] = ""):
|
||||
verify_api_key(x_bridge_api_key)
|
||||
ok = lxmf_bridge.set_preferred_propagation_node(body.destination_hash)
|
||||
if not ok:
|
||||
raise HTTPException(status_code=400, detail="Failed to set propagation node")
|
||||
return {"status": "ok", "destination_hash": body.destination_hash}
|
||||
|
||||
|
||||
@app.post("/api/propagation/sync")
|
||||
async def sync_propagation(x_bridge_api_key: Annotated[str, Header()] = ""):
|
||||
verify_api_key(x_bridge_api_key)
|
||||
lxmf_bridge.sync_propagation_messages()
|
||||
return {"status": "syncing"}
|
||||
|
||||
|
||||
# ═══════════════════════════════════════════════════════════════════
|
||||
# Audio Calls
|
||||
# ═══════════════════════════════════════════════════════════════════
|
||||
|
||||
class CallInitiate(BaseModel):
|
||||
destination_hash: str
|
||||
|
||||
|
||||
@app.get("/api/calls")
|
||||
async def list_calls(x_bridge_api_key: Annotated[str, Header()] = ""):
|
||||
verify_api_key(x_bridge_api_key)
|
||||
return {"calls": reticulum_bridge.get_active_calls()}
|
||||
|
||||
|
||||
@app.post("/api/calls")
|
||||
async def initiate_call(body: CallInitiate, x_bridge_api_key: Annotated[str, Header()] = ""):
|
||||
verify_api_key(x_bridge_api_key)
|
||||
result = reticulum_bridge.initiate_call(body.destination_hash)
|
||||
if not result.get("call_id"):
|
||||
raise HTTPException(status_code=400, detail=result.get("error", "Call failed"))
|
||||
return result
|
||||
|
||||
|
||||
class CallHangup(BaseModel):
|
||||
call_id: str
|
||||
|
||||
|
||||
@app.post("/api/calls/hangup")
|
||||
async def hangup_call(body: CallHangup, x_bridge_api_key: Annotated[str, Header()] = ""):
|
||||
verify_api_key(x_bridge_api_key)
|
||||
reticulum_bridge.hangup_call(body.call_id)
|
||||
return {"status": "ok"}
|
||||
|
||||
|
||||
# ═══════════════════════════════════════════════════════════════════
|
||||
# NomadNet Node Browser
|
||||
# ═══════════════════════════════════════════════════════════════════
|
||||
|
||||
@app.get("/api/nomadnet/nodes")
|
||||
async def nomadnet_nodes(x_bridge_api_key: Annotated[str, Header()] = ""):
|
||||
verify_api_key(x_bridge_api_key)
|
||||
nodes = reticulum_bridge.get_nomadnet_nodes()
|
||||
return {"nodes": nodes, "total": len(nodes)}
|
||||
|
||||
|
||||
class NomadNetBrowse(BaseModel):
|
||||
destination_hash: str
|
||||
path: str = "/"
|
||||
|
||||
|
||||
@app.post("/api/nomadnet/browse")
|
||||
async def nomadnet_browse(body: NomadNetBrowse, x_bridge_api_key: Annotated[str, Header()] = ""):
|
||||
verify_api_key(x_bridge_api_key)
|
||||
result = reticulum_bridge.browse_nomadnet_node(body.destination_hash, body.path)
|
||||
if "error" in result:
|
||||
raise HTTPException(status_code=400, detail=result["error"])
|
||||
return result
|
||||
|
||||
|
||||
# ═══════════════════════════════════════════════════════════════════
|
||||
|
|
@ -238,3 +375,26 @@ async def meshcore_advert(x_bridge_api_key: Annotated[str, Header()] = ""):
|
|||
async def meshcore_stats(x_bridge_api_key: Annotated[str, Header()] = ""):
|
||||
verify_api_key(x_bridge_api_key)
|
||||
return await meshcore_bridge.get_device_stats()
|
||||
|
||||
|
||||
# ═══════════════════════════════════════════════════════════════════
|
||||
# Backwards Compatibility Aliases
|
||||
# ═══════════════════════════════════════════════════════════════════
|
||||
|
||||
@app.get("/api/nodes")
|
||||
async def nodes_compat(x_bridge_api_key: Annotated[str, Header()] = ""):
|
||||
verify_api_key(x_bridge_api_key)
|
||||
node_list = reticulum_bridge.get_nodes()
|
||||
return {"nodes": node_list, "total": len(node_list)}
|
||||
|
||||
|
||||
@app.get("/api/topology")
|
||||
async def topology_compat(x_bridge_api_key: Annotated[str, Header()] = ""):
|
||||
verify_api_key(x_bridge_api_key)
|
||||
return reticulum_bridge.get_topology()
|
||||
|
||||
|
||||
@app.get("/api/identity")
|
||||
async def identity_compat(x_bridge_api_key: Annotated[str, Header()] = ""):
|
||||
verify_api_key(x_bridge_api_key)
|
||||
return reticulum_bridge.get_identity_info()
|
||||
|
|
|
|||
|
|
@ -0,0 +1,79 @@
|
|||
"""
|
||||
rChats bridge — relays LXMF and MeshCore messages to/from rChats.
|
||||
Messages arriving over mesh are forwarded to rChats internal API.
|
||||
Messages from rChats can be sent out over mesh.
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
import threading
|
||||
import time
|
||||
|
||||
import httpx
|
||||
|
||||
from .config import RCHATS_BRIDGE_ENABLED, RCHATS_INTERNAL_URL
|
||||
|
||||
logger = logging.getLogger("rmesh.rchats_bridge")
|
||||
|
||||
|
||||
def init():
|
||||
"""Initialize the rChats bridge."""
|
||||
if not RCHATS_BRIDGE_ENABLED:
|
||||
logger.info("rChats bridge disabled")
|
||||
return
|
||||
logger.info("rChats bridge enabled — forwarding to %s", RCHATS_INTERNAL_URL)
|
||||
|
||||
|
||||
def on_lxmf_event(event: dict):
|
||||
"""Handle LXMF events and forward relevant ones to rChats."""
|
||||
if not RCHATS_BRIDGE_ENABLED:
|
||||
return
|
||||
|
||||
event_type = event.get("type", "")
|
||||
|
||||
if event_type == "lxmf.delivery":
|
||||
msg = event.get("message", {})
|
||||
_forward_to_rchats(msg, source="reticulum")
|
||||
|
||||
elif event_type == "meshcore.message":
|
||||
msg = event.get("message", {})
|
||||
_forward_to_rchats(msg, source="meshcore")
|
||||
|
||||
|
||||
def _forward_to_rchats(msg: dict, source: str):
|
||||
"""Forward a mesh message to rChats internal API."""
|
||||
space_slug = msg.get("space_slug")
|
||||
if not space_slug:
|
||||
return # Only forward space-scoped messages
|
||||
|
||||
payload = {
|
||||
"source": source,
|
||||
"sender": msg.get("sender_hash", "") or msg.get("sender", ""),
|
||||
"content": msg.get("content", ""),
|
||||
"title": msg.get("title", ""),
|
||||
"has_image": "image" in msg.get("fields", {}),
|
||||
"has_audio": "audio" in msg.get("fields", {}),
|
||||
"has_files": "file_attachments" in msg.get("fields", {}),
|
||||
"space": space_slug,
|
||||
"timestamp": msg.get("timestamp", time.time()),
|
||||
}
|
||||
|
||||
threading.Thread(
|
||||
target=_post_to_rchats, args=(space_slug, payload), daemon=True
|
||||
).start()
|
||||
|
||||
|
||||
def _post_to_rchats(space_slug: str, payload: dict):
|
||||
"""POST message to rChats internal mesh-relay endpoint."""
|
||||
try:
|
||||
url = f"{RCHATS_INTERNAL_URL}/api/internal/mesh-relay"
|
||||
with httpx.Client(timeout=10) as client:
|
||||
response = client.post(url, json=payload)
|
||||
if response.status_code < 400:
|
||||
logger.info("Forwarded mesh message to rChats space '%s'", space_slug)
|
||||
else:
|
||||
logger.warning("rChats rejected message: %d %s", response.status_code, response.text[:100])
|
||||
except httpx.ConnectError:
|
||||
logger.debug("rChats not reachable at %s", RCHATS_INTERNAL_URL)
|
||||
except Exception as e:
|
||||
logger.warning("Failed to forward to rChats: %s", e)
|
||||
|
|
@ -1,5 +1,6 @@
|
|||
"""
|
||||
Reticulum bridge — singleton RNS instance, identity management, topology queries.
|
||||
Reticulum bridge — singleton RNS instance, identity management,
|
||||
topology queries, signal quality tracking, audio call support.
|
||||
"""
|
||||
|
||||
import time
|
||||
|
|
@ -14,17 +15,44 @@ logger = logging.getLogger("rmesh.reticulum")
|
|||
_rns_instance: RNS.Reticulum | None = None
|
||||
_identity: RNS.Identity | None = None
|
||||
_destination: RNS.Destination | None = None
|
||||
_call_destination: RNS.Destination | None = None
|
||||
_start_time: float = 0
|
||||
_announced_destinations: dict[str, dict] = {}
|
||||
_lock = threading.Lock()
|
||||
_event_listeners: list = []
|
||||
|
||||
APP_NAME = "rmesh"
|
||||
ASPECT = "bridge"
|
||||
|
||||
|
||||
def _safe_decode(data) -> str | None:
|
||||
"""Safely decode app_data which may be binary."""
|
||||
if data is None:
|
||||
return None
|
||||
if isinstance(data, bytes):
|
||||
try:
|
||||
return data.decode("utf-8")
|
||||
except UnicodeDecodeError:
|
||||
return data.hex()
|
||||
return str(data)
|
||||
|
||||
|
||||
def register_event_listener(callback):
|
||||
_event_listeners.append(callback)
|
||||
|
||||
|
||||
def _emit_event(event_type: str, data: dict):
|
||||
event = {"type": event_type, **data}
|
||||
for listener in _event_listeners:
|
||||
try:
|
||||
listener(event)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
def init():
|
||||
"""Initialize the Reticulum instance and server identity."""
|
||||
global _rns_instance, _identity, _destination, _start_time
|
||||
global _rns_instance, _identity, _destination, _call_destination, _start_time
|
||||
|
||||
if _rns_instance is not None:
|
||||
return
|
||||
|
|
@ -43,30 +71,275 @@ def init():
|
|||
_identity.to_file(identity_path)
|
||||
logger.info("Created new identity")
|
||||
|
||||
# Create a destination for this bridge
|
||||
# Bridge destination
|
||||
_destination = RNS.Destination(
|
||||
_identity, RNS.Destination.IN, RNS.Destination.SINGLE,
|
||||
APP_NAME, ASPECT,
|
||||
)
|
||||
|
||||
# Register announce handler to track network
|
||||
RNS.Transport.register_announce_handler(_announce_handler)
|
||||
# Audio call destination
|
||||
_call_destination = RNS.Destination(
|
||||
_identity, RNS.Destination.IN, RNS.Destination.SINGLE,
|
||||
"call", "audio",
|
||||
)
|
||||
_call_destination.set_link_established_callback(_on_incoming_call_link)
|
||||
|
||||
# Register announce handlers with signal quality
|
||||
RNS.Transport.register_announce_handler(_AnnounceHandler())
|
||||
|
||||
logger.info("Reticulum bridge ready — identity: %s", _identity.hexhash)
|
||||
|
||||
|
||||
def _announce_handler(destination_hash, announced_identity, app_data):
|
||||
"""Callback when we hear an announce from another node."""
|
||||
with _lock:
|
||||
_announced_destinations[destination_hash.hex()] = {
|
||||
"destination_hash": destination_hash.hex(),
|
||||
"app_data": app_data.decode("utf-8") if app_data else None,
|
||||
class _AnnounceHandler:
|
||||
"""Track announces with RSSI/SNR/quality metrics."""
|
||||
aspect_filter = None # Catch all announces
|
||||
|
||||
def received_announce(self, destination_hash, announced_identity, app_data, announce_packet_hash=None):
|
||||
# Extract signal quality from the announce packet
|
||||
rssi = None
|
||||
snr = None
|
||||
quality = None
|
||||
|
||||
# Try to get signal info from the packet
|
||||
try:
|
||||
if announce_packet_hash:
|
||||
packet = RNS.Transport.packet_hashmap.get(announce_packet_hash)
|
||||
if packet:
|
||||
rssi = getattr(packet, "rssi", None)
|
||||
snr = getattr(packet, "snr", None)
|
||||
quality = getattr(packet, "q", None) or getattr(packet, "quality", None)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
dest_hex = destination_hash.hex()
|
||||
node_data = {
|
||||
"destination_hash": dest_hex,
|
||||
"identity_hash": announced_identity.hexhash if announced_identity else "",
|
||||
"app_data": _safe_decode(app_data),
|
||||
"last_heard": time.time(),
|
||||
"rssi": rssi,
|
||||
"snr": snr,
|
||||
"quality": quality,
|
||||
}
|
||||
|
||||
with _lock:
|
||||
_announced_destinations[dest_hex] = node_data
|
||||
|
||||
_emit_event("announce", {"node": node_data})
|
||||
|
||||
|
||||
# ═══════════════════════════════════════════════════════════════════
|
||||
# Audio Calls over Reticulum Links
|
||||
# ═══════════════════════════════════════════════════════════════════
|
||||
|
||||
_active_calls: dict[str, dict] = {}
|
||||
|
||||
|
||||
def _on_incoming_call_link(link):
|
||||
"""Handle incoming audio call link."""
|
||||
call_id = str(link.hash.hex()) if hasattr(link, "hash") else str(id(link))
|
||||
_active_calls[call_id] = {
|
||||
"id": call_id,
|
||||
"link": link,
|
||||
"direction": "inbound",
|
||||
"started_at": time.time(),
|
||||
"peer_hash": link.destination.hexhash if hasattr(link.destination, "hexhash") else "",
|
||||
}
|
||||
|
||||
link.set_packet_callback(lambda msg, pkt: _on_call_audio_packet(call_id, msg))
|
||||
link.set_link_closed_callback(lambda l: _on_call_ended(call_id))
|
||||
|
||||
logger.info("Incoming audio call: %s", call_id[:16])
|
||||
_emit_event("call.incoming", {"call_id": call_id, "peer": _active_calls[call_id]["peer_hash"]})
|
||||
|
||||
|
||||
def initiate_call(destination_hash_hex: str) -> dict:
|
||||
"""Initiate an audio call to a destination."""
|
||||
if _identity is None:
|
||||
return {"call_id": "", "error": "No identity"}
|
||||
|
||||
try:
|
||||
dest_hash = bytes.fromhex(destination_hash_hex)
|
||||
dest_identity = RNS.Identity.recall(dest_hash)
|
||||
if dest_identity is None:
|
||||
RNS.Transport.request_path(dest_hash)
|
||||
return {"call_id": "", "error": "Peer not found, path requested"}
|
||||
|
||||
dest = RNS.Destination(
|
||||
dest_identity, RNS.Destination.OUT, RNS.Destination.SINGLE,
|
||||
"call", "audio",
|
||||
)
|
||||
|
||||
link = RNS.Link(dest)
|
||||
call_id = str(id(link))
|
||||
|
||||
_active_calls[call_id] = {
|
||||
"id": call_id,
|
||||
"link": link,
|
||||
"direction": "outbound",
|
||||
"started_at": time.time(),
|
||||
"peer_hash": destination_hash_hex,
|
||||
}
|
||||
|
||||
link.set_link_established_callback(lambda l: _emit_event("call.established", {"call_id": call_id}))
|
||||
link.set_packet_callback(lambda msg, pkt: _on_call_audio_packet(call_id, msg))
|
||||
link.set_link_closed_callback(lambda l: _on_call_ended(call_id))
|
||||
|
||||
return {"call_id": call_id, "peer": destination_hash_hex}
|
||||
except Exception as e:
|
||||
return {"call_id": "", "error": str(e)}
|
||||
|
||||
|
||||
def send_call_audio(call_id: str, audio_data: bytes) -> bool:
|
||||
"""Send audio data over an active call link."""
|
||||
call = _active_calls.get(call_id)
|
||||
if not call or not call["link"]:
|
||||
return False
|
||||
|
||||
try:
|
||||
link = call["link"]
|
||||
if link.status == RNS.Link.ACTIVE and len(audio_data) <= RNS.Link.MDU:
|
||||
RNS.Packet(link, audio_data).send()
|
||||
return True
|
||||
except Exception:
|
||||
pass
|
||||
return False
|
||||
|
||||
|
||||
def hangup_call(call_id: str):
|
||||
"""End an active call."""
|
||||
call = _active_calls.pop(call_id, None)
|
||||
if call and call["link"]:
|
||||
try:
|
||||
call["link"].teardown()
|
||||
except Exception:
|
||||
pass
|
||||
_emit_event("call.ended", {"call_id": call_id})
|
||||
|
||||
|
||||
def get_active_calls() -> list[dict]:
|
||||
"""Return active calls (without link objects)."""
|
||||
return [{
|
||||
"id": c["id"],
|
||||
"direction": c["direction"],
|
||||
"started_at": c["started_at"],
|
||||
"peer_hash": c["peer_hash"],
|
||||
"duration": time.time() - c["started_at"],
|
||||
} for c in _active_calls.values()]
|
||||
|
||||
|
||||
def _on_call_audio_packet(call_id: str, audio_data):
|
||||
"""Handle received audio data from a call."""
|
||||
_emit_event("call.audio", {"call_id": call_id, "size": len(audio_data) if audio_data else 0})
|
||||
|
||||
|
||||
def _on_call_ended(call_id: str):
|
||||
"""Handle call link closure."""
|
||||
_active_calls.pop(call_id, None)
|
||||
logger.info("Call ended: %s", call_id[:16])
|
||||
_emit_event("call.ended", {"call_id": call_id})
|
||||
|
||||
|
||||
def announce_call_capability():
|
||||
"""Announce that this node can receive audio calls."""
|
||||
if _call_destination:
|
||||
_call_destination.announce(app_data=b"rMesh Audio")
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
# ═══════════════════════════════════════════════════════════════════
|
||||
# NomadNet Node Browser
|
||||
# ═══════════════════════════════════════════════════════════════════
|
||||
|
||||
_nomadnet_nodes: dict[str, dict] = {}
|
||||
|
||||
|
||||
class _NomadNetAnnounceHandler:
|
||||
"""Track NomadNet node announces."""
|
||||
aspect_filter = "nomadnetwork.node"
|
||||
|
||||
def received_announce(self, destination_hash, announced_identity, app_data, announce_packet_hash=None):
|
||||
dest_hex = destination_hash.hex()
|
||||
node_name = ""
|
||||
if app_data:
|
||||
try:
|
||||
node_name = app_data.decode("utf-8") if isinstance(app_data, bytes) else str(app_data)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
_nomadnet_nodes[dest_hex] = {
|
||||
"destination_hash": dest_hex,
|
||||
"name": node_name,
|
||||
"identity_hash": announced_identity.hexhash if announced_identity else "",
|
||||
"last_heard": time.time(),
|
||||
}
|
||||
_emit_event("nomadnet.announce", {"node": _nomadnet_nodes[dest_hex]})
|
||||
|
||||
|
||||
def get_nomadnet_nodes() -> list[dict]:
|
||||
"""Return discovered NomadNet nodes."""
|
||||
return list(_nomadnet_nodes.values())
|
||||
|
||||
|
||||
def browse_nomadnet_node(destination_hash_hex: str, page_path: str = "/") -> dict:
|
||||
"""Request a page from a NomadNet node. Returns async — result via event."""
|
||||
if _identity is None:
|
||||
return {"error": "No identity"}
|
||||
|
||||
try:
|
||||
dest_hash = bytes.fromhex(destination_hash_hex)
|
||||
dest_identity = RNS.Identity.recall(dest_hash)
|
||||
if dest_identity is None:
|
||||
RNS.Transport.request_path(dest_hash)
|
||||
return {"error": "Node not found, path requested"}
|
||||
|
||||
dest = RNS.Destination(
|
||||
dest_identity, RNS.Destination.OUT, RNS.Destination.SINGLE,
|
||||
"nomadnetwork", "node",
|
||||
)
|
||||
|
||||
link = RNS.Link(dest)
|
||||
|
||||
def on_established(l):
|
||||
# Request page
|
||||
l.request(
|
||||
page_path.encode("utf-8"),
|
||||
response_callback=lambda receipt: _on_nomadnet_response(destination_hash_hex, page_path, receipt),
|
||||
failed_callback=lambda receipt: _emit_event("nomadnet.page.error", {
|
||||
"node": destination_hash_hex, "path": page_path, "error": "Request failed"
|
||||
}),
|
||||
)
|
||||
|
||||
link.set_link_established_callback(on_established)
|
||||
return {"status": "requesting", "node": destination_hash_hex, "path": page_path}
|
||||
|
||||
except Exception as e:
|
||||
return {"error": str(e)}
|
||||
|
||||
|
||||
def _on_nomadnet_response(node_hash: str, page_path: str, receipt):
|
||||
"""Handle NomadNet page response."""
|
||||
try:
|
||||
response = receipt.response
|
||||
content = response.decode("utf-8") if isinstance(response, bytes) else str(response)
|
||||
_emit_event("nomadnet.page.downloaded", {
|
||||
"node": node_hash,
|
||||
"path": page_path,
|
||||
"content": content,
|
||||
"size": len(content),
|
||||
})
|
||||
except Exception as e:
|
||||
_emit_event("nomadnet.page.error", {
|
||||
"node": node_hash, "path": page_path, "error": str(e)
|
||||
})
|
||||
|
||||
|
||||
# ═══════════════════════════════════════════════════════════════════
|
||||
# Status / Query
|
||||
# ═══════════════════════════════════════════════════════════════════
|
||||
|
||||
def get_status() -> dict:
|
||||
"""Return transport status info."""
|
||||
if _rns_instance is None:
|
||||
return {"online": False, "transport_enabled": False, "identity_hash": "",
|
||||
"uptime_seconds": 0, "announced_count": 0, "path_count": 0}
|
||||
|
|
@ -81,21 +354,20 @@ def get_status() -> dict:
|
|||
"uptime_seconds": time.time() - _start_time,
|
||||
"announced_count": announced_count,
|
||||
"path_count": len(RNS.Transport.destinations) if hasattr(RNS.Transport, "destinations") else 0,
|
||||
"active_calls": len(_active_calls),
|
||||
"nomadnet_nodes": len(_nomadnet_nodes),
|
||||
}
|
||||
|
||||
|
||||
def get_nodes() -> list[dict]:
|
||||
"""Return list of known announced destinations."""
|
||||
with _lock:
|
||||
return list(_announced_destinations.values())
|
||||
|
||||
|
||||
def get_topology() -> dict:
|
||||
"""Return nodes and links for visualization."""
|
||||
nodes = get_nodes()
|
||||
links = []
|
||||
|
||||
# Build links from destinations/path tables if available
|
||||
dest_table = getattr(RNS.Transport, "destinations", {})
|
||||
if isinstance(dest_table, (dict, list)):
|
||||
try:
|
||||
|
|
@ -110,7 +382,7 @@ def get_topology() -> dict:
|
|||
"active": True,
|
||||
})
|
||||
except Exception:
|
||||
pass # Transport internals may vary between RNS versions
|
||||
pass
|
||||
|
||||
return {
|
||||
"nodes": nodes,
|
||||
|
|
@ -121,7 +393,6 @@ def get_topology() -> dict:
|
|||
|
||||
|
||||
def get_identity_info() -> dict:
|
||||
"""Return server identity information."""
|
||||
if _identity is None:
|
||||
return {"identity_hash": "", "public_key_hex": ""}
|
||||
return {
|
||||
|
|
@ -131,7 +402,6 @@ def get_identity_info() -> dict:
|
|||
|
||||
|
||||
def announce():
|
||||
"""Announce this bridge on the network."""
|
||||
if _destination is None:
|
||||
return {"announced": False, "identity_hash": ""}
|
||||
_destination.announce(app_data=b"rMesh Bridge")
|
||||
|
|
|
|||
|
|
@ -0,0 +1,63 @@
|
|||
"""
|
||||
WebSocket manager — broadcasts real-time events to connected clients.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
from typing import Set
|
||||
|
||||
from fastapi import WebSocket
|
||||
|
||||
logger = logging.getLogger("rmesh.websocket")
|
||||
|
||||
_clients: Set[WebSocket] = set()
|
||||
_loop: asyncio.AbstractEventLoop | None = None
|
||||
|
||||
|
||||
def set_event_loop(loop: asyncio.AbstractEventLoop):
|
||||
global _loop
|
||||
_loop = loop
|
||||
|
||||
|
||||
async def connect(websocket: WebSocket):
|
||||
await websocket.accept()
|
||||
_clients.add(websocket)
|
||||
logger.info("WebSocket client connected (%d total)", len(_clients))
|
||||
|
||||
|
||||
def disconnect(websocket: WebSocket):
|
||||
_clients.discard(websocket)
|
||||
logger.info("WebSocket client disconnected (%d remaining)", len(_clients))
|
||||
|
||||
|
||||
async def broadcast(data: dict):
|
||||
"""Send event to all connected WebSocket clients."""
|
||||
if not _clients:
|
||||
return
|
||||
|
||||
message = json.dumps(data)
|
||||
dead = set()
|
||||
for client in _clients:
|
||||
try:
|
||||
await client.send_text(message)
|
||||
except Exception:
|
||||
dead.add(client)
|
||||
|
||||
for client in dead:
|
||||
_clients.discard(client)
|
||||
|
||||
|
||||
def on_event(event: dict):
|
||||
"""Synchronous callback that schedules broadcast on the event loop.
|
||||
Use this as the event_listener for reticulum_bridge and lxmf_bridge."""
|
||||
if _loop is None or not _clients:
|
||||
return
|
||||
try:
|
||||
asyncio.run_coroutine_threadsafe(broadcast(event), _loop)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
def client_count() -> int:
|
||||
return len(_clients)
|
||||
|
|
@ -4,3 +4,5 @@ fastapi>=0.115.0
|
|||
uvicorn[standard]>=0.30.0
|
||||
pydantic>=2.0
|
||||
meshcore>=2.3.0
|
||||
httpx>=0.27.0
|
||||
websockets>=12.0
|
||||
|
|
|
|||
Loading…
Reference in New Issue