142 lines
4.6 KiB
Python
142 lines
4.6 KiB
Python
"""
|
|
Reticulum bridge — singleton RNS instance, identity management, topology queries.
|
|
"""
|
|
|
|
import time
|
|
import threading
|
|
import logging
|
|
import RNS
|
|
|
|
from .config import RNS_CONFIG_DIR
|
|
|
|
logger = logging.getLogger("rmesh.reticulum")
|
|
|
|
_rns_instance: RNS.Reticulum | None = None
|
|
_identity: RNS.Identity | None = None
|
|
_destination: RNS.Destination | None = None
|
|
_start_time: float = 0
|
|
_announced_destinations: dict[str, dict] = {}
|
|
_lock = threading.Lock()
|
|
|
|
APP_NAME = "rmesh"
|
|
ASPECT = "bridge"
|
|
|
|
|
|
def init():
|
|
"""Initialize the Reticulum instance and server identity."""
|
|
global _rns_instance, _identity, _destination, _start_time
|
|
|
|
if _rns_instance is not None:
|
|
return
|
|
|
|
logger.info("Initializing Reticulum instance from %s", RNS_CONFIG_DIR)
|
|
_rns_instance = RNS.Reticulum(configdir=RNS_CONFIG_DIR)
|
|
_start_time = time.time()
|
|
|
|
# Load or create persistent identity
|
|
identity_path = f"{RNS_CONFIG_DIR}/storage/rmesh_identity"
|
|
if RNS.Identity.from_file(identity_path) is not None:
|
|
_identity = RNS.Identity.from_file(identity_path)
|
|
logger.info("Loaded existing identity")
|
|
else:
|
|
_identity = RNS.Identity()
|
|
_identity.to_file(identity_path)
|
|
logger.info("Created new identity")
|
|
|
|
# Create a destination for this bridge
|
|
_destination = RNS.Destination(
|
|
_identity, RNS.Destination.IN, RNS.Destination.SINGLE,
|
|
APP_NAME, ASPECT,
|
|
)
|
|
|
|
# Register announce handler to track network
|
|
RNS.Transport.register_announce_handler(_announce_handler)
|
|
|
|
logger.info("Reticulum bridge ready — identity: %s", _identity.hexhash)
|
|
|
|
|
|
def _announce_handler(destination_hash, announced_identity, app_data):
|
|
"""Callback when we hear an announce from another node."""
|
|
with _lock:
|
|
_announced_destinations[destination_hash.hex()] = {
|
|
"destination_hash": destination_hash.hex(),
|
|
"app_data": app_data.decode("utf-8") if app_data else None,
|
|
"last_heard": time.time(),
|
|
}
|
|
|
|
|
|
def get_status() -> dict:
|
|
"""Return transport status info."""
|
|
if _rns_instance is None:
|
|
return {"online": False, "transport_enabled": False, "identity_hash": "",
|
|
"uptime_seconds": 0, "announced_count": 0, "path_count": 0}
|
|
|
|
with _lock:
|
|
announced_count = len(_announced_destinations)
|
|
|
|
return {
|
|
"online": True,
|
|
"transport_enabled": getattr(RNS.Transport, "transport_enabled", lambda: False)() if callable(getattr(RNS.Transport, "transport_enabled", None)) else bool(getattr(RNS.Transport, "TRANSPORT", False)),
|
|
"identity_hash": _identity.hexhash if _identity else "",
|
|
"uptime_seconds": time.time() - _start_time,
|
|
"announced_count": announced_count,
|
|
"path_count": len(RNS.Transport.destinations) if hasattr(RNS.Transport, "destinations") else 0,
|
|
}
|
|
|
|
|
|
def get_nodes() -> list[dict]:
|
|
"""Return list of known announced destinations."""
|
|
with _lock:
|
|
return list(_announced_destinations.values())
|
|
|
|
|
|
def get_topology() -> dict:
|
|
"""Return nodes and links for visualization."""
|
|
nodes = get_nodes()
|
|
links = []
|
|
|
|
# Build links from destinations/path tables if available
|
|
dest_table = getattr(RNS.Transport, "destinations", {})
|
|
if isinstance(dest_table, (dict, list)):
|
|
try:
|
|
items = dest_table.items() if isinstance(dest_table, dict) else enumerate(dest_table)
|
|
for key, entry in items:
|
|
dest_hex = key.hex() if isinstance(key, bytes) else str(key)
|
|
hops = entry[2] if isinstance(entry, (list, tuple)) and len(entry) > 2 else 0
|
|
links.append({
|
|
"source": _identity.hexhash if _identity else "",
|
|
"target": dest_hex,
|
|
"hops": hops,
|
|
"active": True,
|
|
})
|
|
except Exception:
|
|
pass # Transport internals may vary between RNS versions
|
|
|
|
return {
|
|
"nodes": nodes,
|
|
"links": links,
|
|
"node_count": len(nodes),
|
|
"link_count": len(links),
|
|
}
|
|
|
|
|
|
def get_identity_info() -> dict:
|
|
"""Return server identity information."""
|
|
if _identity is None:
|
|
return {"identity_hash": "", "public_key_hex": ""}
|
|
return {
|
|
"identity_hash": _identity.hexhash,
|
|
"public_key_hex": _identity.get_public_key().hex() if _identity.get_public_key() else "",
|
|
}
|
|
|
|
|
|
def announce():
|
|
"""Announce this bridge on the network."""
|
|
if _destination is None:
|
|
return {"announced": False, "identity_hash": ""}
|
|
_destination.announce(app_data=b"rMesh Bridge")
|
|
return {
|
|
"announced": True,
|
|
"identity_hash": _identity.hexhash if _identity else "",
|
|
}
|