""" Reticulum bridge — singleton RNS instance, identity management, topology queries. """ import time import threading import logging import RNS from .config import RNS_CONFIG_DIR logger = logging.getLogger("rmesh.reticulum") _rns_instance: RNS.Reticulum | None = None _identity: RNS.Identity | None = None _destination: RNS.Destination | None = None _start_time: float = 0 _announced_destinations: dict[str, dict] = {} _lock = threading.Lock() APP_NAME = "rmesh" ASPECT = "bridge" def init(): """Initialize the Reticulum instance and server identity.""" global _rns_instance, _identity, _destination, _start_time if _rns_instance is not None: return logger.info("Initializing Reticulum instance from %s", RNS_CONFIG_DIR) _rns_instance = RNS.Reticulum(configdir=RNS_CONFIG_DIR) _start_time = time.time() # Load or create persistent identity identity_path = f"{RNS_CONFIG_DIR}/storage/rmesh_identity" if RNS.Identity.from_file(identity_path) is not None: _identity = RNS.Identity.from_file(identity_path) logger.info("Loaded existing identity") else: _identity = RNS.Identity() _identity.to_file(identity_path) logger.info("Created new identity") # Create a destination for this bridge _destination = RNS.Destination( _identity, RNS.Destination.IN, RNS.Destination.SINGLE, APP_NAME, ASPECT, ) # Register announce handler to track network RNS.Transport.register_announce_handler(_announce_handler) logger.info("Reticulum bridge ready — identity: %s", _identity.hexhash) def _announce_handler(destination_hash, announced_identity, app_data): """Callback when we hear an announce from another node.""" with _lock: _announced_destinations[destination_hash.hex()] = { "destination_hash": destination_hash.hex(), "app_data": app_data.decode("utf-8") if app_data else None, "last_heard": time.time(), } def get_status() -> dict: """Return transport status info.""" if _rns_instance is None: return {"online": False, "transport_enabled": False, "identity_hash": "", "uptime_seconds": 0, "announced_count": 0, "path_count": 0} with _lock: announced_count = len(_announced_destinations) return { "online": True, "transport_enabled": getattr(RNS.Transport, "transport_enabled", lambda: False)() if callable(getattr(RNS.Transport, "transport_enabled", None)) else bool(getattr(RNS.Transport, "TRANSPORT", False)), "identity_hash": _identity.hexhash if _identity else "", "uptime_seconds": time.time() - _start_time, "announced_count": announced_count, "path_count": len(RNS.Transport.destinations) if hasattr(RNS.Transport, "destinations") else 0, } def get_nodes() -> list[dict]: """Return list of known announced destinations.""" with _lock: return list(_announced_destinations.values()) def get_topology() -> dict: """Return nodes and links for visualization.""" nodes = get_nodes() links = [] # Build links from destinations/path tables if available dest_table = getattr(RNS.Transport, "destinations", {}) if isinstance(dest_table, (dict, list)): try: items = dest_table.items() if isinstance(dest_table, dict) else enumerate(dest_table) for key, entry in items: dest_hex = key.hex() if isinstance(key, bytes) else str(key) hops = entry[2] if isinstance(entry, (list, tuple)) and len(entry) > 2 else 0 links.append({ "source": _identity.hexhash if _identity else "", "target": dest_hex, "hops": hops, "active": True, }) except Exception: pass # Transport internals may vary between RNS versions return { "nodes": nodes, "links": links, "node_count": len(nodes), "link_count": len(links), } def get_identity_info() -> dict: """Return server identity information.""" if _identity is None: return {"identity_hash": "", "public_key_hex": ""} return { "identity_hash": _identity.hexhash, "public_key_hex": _identity.get_public_key().hex() if _identity.get_public_key() else "", } def announce(): """Announce this bridge on the network.""" if _destination is None: return {"announced": False, "identity_hash": ""} _destination.announce(app_data=b"rMesh Bridge") return { "announced": True, "identity_hash": _identity.hexhash if _identity else "", }