rmesh-reticulum/app/reticulum_bridge.py

412 lines
14 KiB
Python

"""
Reticulum bridge — singleton RNS instance, identity management,
topology queries, signal quality tracking, audio call support.
"""
import time
import threading
import logging
import RNS
from .config import RNS_CONFIG_DIR
logger = logging.getLogger("rmesh.reticulum")
_rns_instance: RNS.Reticulum | None = None
_identity: RNS.Identity | None = None
_destination: RNS.Destination | None = None
_call_destination: RNS.Destination | None = None
_start_time: float = 0
_announced_destinations: dict[str, dict] = {}
_lock = threading.Lock()
_event_listeners: list = []
APP_NAME = "rmesh"
ASPECT = "bridge"
def _safe_decode(data) -> str | None:
"""Safely decode app_data which may be binary."""
if data is None:
return None
if isinstance(data, bytes):
try:
return data.decode("utf-8")
except UnicodeDecodeError:
return data.hex()
return str(data)
def register_event_listener(callback):
_event_listeners.append(callback)
def _emit_event(event_type: str, data: dict):
event = {"type": event_type, **data}
for listener in _event_listeners:
try:
listener(event)
except Exception:
pass
def init():
"""Initialize the Reticulum instance and server identity."""
global _rns_instance, _identity, _destination, _call_destination, _start_time
if _rns_instance is not None:
return
logger.info("Initializing Reticulum instance from %s", RNS_CONFIG_DIR)
_rns_instance = RNS.Reticulum(configdir=RNS_CONFIG_DIR)
_start_time = time.time()
# Load or create persistent identity
identity_path = f"{RNS_CONFIG_DIR}/storage/rmesh_identity"
if RNS.Identity.from_file(identity_path) is not None:
_identity = RNS.Identity.from_file(identity_path)
logger.info("Loaded existing identity")
else:
_identity = RNS.Identity()
_identity.to_file(identity_path)
logger.info("Created new identity")
# Bridge destination
_destination = RNS.Destination(
_identity, RNS.Destination.IN, RNS.Destination.SINGLE,
APP_NAME, ASPECT,
)
# Audio call destination
_call_destination = RNS.Destination(
_identity, RNS.Destination.IN, RNS.Destination.SINGLE,
"call", "audio",
)
_call_destination.set_link_established_callback(_on_incoming_call_link)
# Register announce handlers with signal quality
RNS.Transport.register_announce_handler(_AnnounceHandler())
logger.info("Reticulum bridge ready — identity: %s", _identity.hexhash)
class _AnnounceHandler:
"""Track announces with RSSI/SNR/quality metrics."""
aspect_filter = None # Catch all announces
def received_announce(self, destination_hash, announced_identity, app_data, announce_packet_hash=None):
# Extract signal quality from the announce packet
rssi = None
snr = None
quality = None
# Try to get signal info from the packet
try:
if announce_packet_hash:
packet = RNS.Transport.packet_hashmap.get(announce_packet_hash)
if packet:
rssi = getattr(packet, "rssi", None)
snr = getattr(packet, "snr", None)
quality = getattr(packet, "q", None) or getattr(packet, "quality", None)
except Exception:
pass
dest_hex = destination_hash.hex()
node_data = {
"destination_hash": dest_hex,
"identity_hash": announced_identity.hexhash if announced_identity else "",
"app_data": _safe_decode(app_data),
"last_heard": time.time(),
"rssi": rssi,
"snr": snr,
"quality": quality,
}
with _lock:
_announced_destinations[dest_hex] = node_data
_emit_event("announce", {"node": node_data})
# ═══════════════════════════════════════════════════════════════════
# Audio Calls over Reticulum Links
# ═══════════════════════════════════════════════════════════════════
_active_calls: dict[str, dict] = {}
def _on_incoming_call_link(link):
"""Handle incoming audio call link."""
call_id = str(link.hash.hex()) if hasattr(link, "hash") else str(id(link))
_active_calls[call_id] = {
"id": call_id,
"link": link,
"direction": "inbound",
"started_at": time.time(),
"peer_hash": link.destination.hexhash if hasattr(link.destination, "hexhash") else "",
}
link.set_packet_callback(lambda msg, pkt: _on_call_audio_packet(call_id, msg))
link.set_link_closed_callback(lambda l: _on_call_ended(call_id))
logger.info("Incoming audio call: %s", call_id[:16])
_emit_event("call.incoming", {"call_id": call_id, "peer": _active_calls[call_id]["peer_hash"]})
def initiate_call(destination_hash_hex: str) -> dict:
"""Initiate an audio call to a destination."""
if _identity is None:
return {"call_id": "", "error": "No identity"}
try:
dest_hash = bytes.fromhex(destination_hash_hex)
dest_identity = RNS.Identity.recall(dest_hash)
if dest_identity is None:
RNS.Transport.request_path(dest_hash)
return {"call_id": "", "error": "Peer not found, path requested"}
dest = RNS.Destination(
dest_identity, RNS.Destination.OUT, RNS.Destination.SINGLE,
"call", "audio",
)
link = RNS.Link(dest)
call_id = str(id(link))
_active_calls[call_id] = {
"id": call_id,
"link": link,
"direction": "outbound",
"started_at": time.time(),
"peer_hash": destination_hash_hex,
}
link.set_link_established_callback(lambda l: _emit_event("call.established", {"call_id": call_id}))
link.set_packet_callback(lambda msg, pkt: _on_call_audio_packet(call_id, msg))
link.set_link_closed_callback(lambda l: _on_call_ended(call_id))
return {"call_id": call_id, "peer": destination_hash_hex}
except Exception as e:
return {"call_id": "", "error": str(e)}
def send_call_audio(call_id: str, audio_data: bytes) -> bool:
"""Send audio data over an active call link."""
call = _active_calls.get(call_id)
if not call or not call["link"]:
return False
try:
link = call["link"]
if link.status == RNS.Link.ACTIVE and len(audio_data) <= RNS.Link.MDU:
RNS.Packet(link, audio_data).send()
return True
except Exception:
pass
return False
def hangup_call(call_id: str):
"""End an active call."""
call = _active_calls.pop(call_id, None)
if call and call["link"]:
try:
call["link"].teardown()
except Exception:
pass
_emit_event("call.ended", {"call_id": call_id})
def get_active_calls() -> list[dict]:
"""Return active calls (without link objects)."""
return [{
"id": c["id"],
"direction": c["direction"],
"started_at": c["started_at"],
"peer_hash": c["peer_hash"],
"duration": time.time() - c["started_at"],
} for c in _active_calls.values()]
def _on_call_audio_packet(call_id: str, audio_data):
"""Handle received audio data from a call."""
_emit_event("call.audio", {"call_id": call_id, "size": len(audio_data) if audio_data else 0})
def _on_call_ended(call_id: str):
"""Handle call link closure."""
_active_calls.pop(call_id, None)
logger.info("Call ended: %s", call_id[:16])
_emit_event("call.ended", {"call_id": call_id})
def announce_call_capability():
"""Announce that this node can receive audio calls."""
if _call_destination:
_call_destination.announce(app_data=b"rMesh Audio")
return True
return False
# ═══════════════════════════════════════════════════════════════════
# NomadNet Node Browser
# ═══════════════════════════════════════════════════════════════════
_nomadnet_nodes: dict[str, dict] = {}
class _NomadNetAnnounceHandler:
"""Track NomadNet node announces."""
aspect_filter = "nomadnetwork.node"
def received_announce(self, destination_hash, announced_identity, app_data, announce_packet_hash=None):
dest_hex = destination_hash.hex()
node_name = ""
if app_data:
try:
node_name = app_data.decode("utf-8") if isinstance(app_data, bytes) else str(app_data)
except Exception:
pass
_nomadnet_nodes[dest_hex] = {
"destination_hash": dest_hex,
"name": node_name,
"identity_hash": announced_identity.hexhash if announced_identity else "",
"last_heard": time.time(),
}
_emit_event("nomadnet.announce", {"node": _nomadnet_nodes[dest_hex]})
def get_nomadnet_nodes() -> list[dict]:
"""Return discovered NomadNet nodes."""
return list(_nomadnet_nodes.values())
def browse_nomadnet_node(destination_hash_hex: str, page_path: str = "/") -> dict:
"""Request a page from a NomadNet node. Returns async — result via event."""
if _identity is None:
return {"error": "No identity"}
try:
dest_hash = bytes.fromhex(destination_hash_hex)
dest_identity = RNS.Identity.recall(dest_hash)
if dest_identity is None:
RNS.Transport.request_path(dest_hash)
return {"error": "Node not found, path requested"}
dest = RNS.Destination(
dest_identity, RNS.Destination.OUT, RNS.Destination.SINGLE,
"nomadnetwork", "node",
)
link = RNS.Link(dest)
def on_established(l):
# Request page
l.request(
page_path.encode("utf-8"),
response_callback=lambda receipt: _on_nomadnet_response(destination_hash_hex, page_path, receipt),
failed_callback=lambda receipt: _emit_event("nomadnet.page.error", {
"node": destination_hash_hex, "path": page_path, "error": "Request failed"
}),
)
link.set_link_established_callback(on_established)
return {"status": "requesting", "node": destination_hash_hex, "path": page_path}
except Exception as e:
return {"error": str(e)}
def _on_nomadnet_response(node_hash: str, page_path: str, receipt):
"""Handle NomadNet page response."""
try:
response = receipt.response
content = response.decode("utf-8") if isinstance(response, bytes) else str(response)
_emit_event("nomadnet.page.downloaded", {
"node": node_hash,
"path": page_path,
"content": content,
"size": len(content),
})
except Exception as e:
_emit_event("nomadnet.page.error", {
"node": node_hash, "path": page_path, "error": str(e)
})
# ═══════════════════════════════════════════════════════════════════
# Status / Query
# ═══════════════════════════════════════════════════════════════════
def get_status() -> dict:
if _rns_instance is None:
return {"online": False, "transport_enabled": False, "identity_hash": "",
"uptime_seconds": 0, "announced_count": 0, "path_count": 0}
with _lock:
announced_count = len(_announced_destinations)
return {
"online": True,
"transport_enabled": getattr(RNS.Transport, "transport_enabled", lambda: False)() if callable(getattr(RNS.Transport, "transport_enabled", None)) else bool(getattr(RNS.Transport, "TRANSPORT", False)),
"identity_hash": _identity.hexhash if _identity else "",
"uptime_seconds": time.time() - _start_time,
"announced_count": announced_count,
"path_count": len(RNS.Transport.destinations) if hasattr(RNS.Transport, "destinations") else 0,
"active_calls": len(_active_calls),
"nomadnet_nodes": len(_nomadnet_nodes),
}
def get_nodes() -> list[dict]:
with _lock:
return list(_announced_destinations.values())
def get_topology() -> dict:
nodes = get_nodes()
links = []
dest_table = getattr(RNS.Transport, "destinations", {})
if isinstance(dest_table, (dict, list)):
try:
items = dest_table.items() if isinstance(dest_table, dict) else enumerate(dest_table)
for key, entry in items:
dest_hex = key.hex() if isinstance(key, bytes) else str(key)
hops = entry[2] if isinstance(entry, (list, tuple)) and len(entry) > 2 else 0
links.append({
"source": _identity.hexhash if _identity else "",
"target": dest_hex,
"hops": hops,
"active": True,
})
except Exception:
pass
return {
"nodes": nodes,
"links": links,
"node_count": len(nodes),
"link_count": len(links),
}
def get_identity_info() -> dict:
if _identity is None:
return {"identity_hash": "", "public_key_hex": ""}
return {
"identity_hash": _identity.hexhash,
"public_key_hex": _identity.get_public_key().hex() if _identity.get_public_key() else "",
}
def announce():
if _destination is None:
return {"announced": False, "identity_hash": ""}
_destination.announce(app_data=b"rMesh Bridge")
return {
"announced": True,
"identity_hash": _identity.hexhash if _identity else "",
}