""" LXMF bridge — message send/receive via the LXMF protocol on Reticulum. """ import json import time import uuid import threading import logging import os import RNS import LXMF from .config import LXMF_STORAGE_DIR, LXMF_MESSAGES_FILE logger = logging.getLogger("rmesh.lxmf") _router: LXMF.LXMRouter | None = None _local_delivery_destination: RNS.Destination | None = None _identity: RNS.Identity | None = None _messages: list[dict] = [] _lock = threading.Lock() def init(identity: RNS.Identity): """Initialize the LXMF router with the given identity.""" global _router, _local_delivery_destination, _identity _identity = identity os.makedirs(LXMF_STORAGE_DIR, exist_ok=True) _router = LXMF.LXMRouter( identity=_identity, storagepath=LXMF_STORAGE_DIR, ) # Enable propagation node _router.enable_propagation() # Register local delivery destination for receiving messages _local_delivery_destination = _router.register_delivery_identity( _identity, display_name="rMesh Bridge", ) _router.register_delivery_callback(_delivery_callback) # Load persisted messages _load_messages() logger.info("LXMF bridge ready — delivery hash: %s", _identity.hexhash) def _delivery_callback(message): """Handle incoming LXMF message.""" msg_record = { "id": str(uuid.uuid4()), "direction": "inbound", "sender_hash": message.source_hash.hex() if message.source_hash else "", "recipient_hash": _identity.hexhash if _identity else "", "title": message.title.decode("utf-8") if isinstance(message.title, bytes) else str(message.title or ""), "content": message.content.decode("utf-8") if isinstance(message.content, bytes) else str(message.content or ""), "status": "delivered", "timestamp": time.time(), } with _lock: _messages.append(msg_record) _save_messages() logger.info("Received LXMF message from %s", msg_record["sender_hash"][:16]) def send_message(destination_hash_hex: str, content: str, title: str = "") -> dict: """Send an LXMF message to a destination hash.""" if _router is None or _identity is None: return {"id": "", "status": "failed"} try: dest_hash = bytes.fromhex(destination_hash_hex) except ValueError: return {"id": "", "status": "failed"} # Look up identity for destination dest_identity = RNS.Identity.recall(dest_hash) msg_id = str(uuid.uuid4()) if dest_identity is None: # Request path first, the message may be deliverable later RNS.Transport.request_path(dest_hash) msg_record = { "id": msg_id, "direction": "outbound", "sender_hash": _identity.hexhash, "recipient_hash": destination_hash_hex, "title": title, "content": content, "status": "pending", "timestamp": time.time(), } with _lock: _messages.append(msg_record) _save_messages() return msg_record # Create destination for the recipient dest = RNS.Destination( dest_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, "lxmf", "delivery", ) lxm = LXMF.LXMessage( dest, _router.get_delivery_destination(), content.encode("utf-8") if isinstance(content, str) else content, title=title.encode("utf-8") if isinstance(title, str) else title, desired_method=LXMF.LXMessage.DIRECT, ) msg_record = { "id": msg_id, "direction": "outbound", "sender_hash": _identity.hexhash, "recipient_hash": destination_hash_hex, "title": title, "content": content, "status": "pending", "timestamp": time.time(), } def _delivery_callback_outbound(message): with _lock: for m in _messages: if m["id"] == msg_id: m["status"] = "delivered" break _save_messages() def _failed_callback(message): with _lock: for m in _messages: if m["id"] == msg_id: m["status"] = "failed" break _save_messages() lxm.delivery_callback = _delivery_callback_outbound lxm.failed_callback = _failed_callback _router.handle_outbound(lxm) with _lock: _messages.append(msg_record) _save_messages() return msg_record def get_messages(limit: int = 100, offset: int = 0) -> list[dict]: """Return stored messages.""" with _lock: sorted_msgs = sorted(_messages, key=lambda m: m["timestamp"], reverse=True) return sorted_msgs[offset:offset + limit] def get_message(msg_id: str) -> dict | None: """Return a single message by ID.""" with _lock: for m in _messages: if m["id"] == msg_id: return m return None def get_total_count() -> int: """Return total message count.""" with _lock: return len(_messages) def _load_messages(): """Load messages from persistent storage.""" global _messages if os.path.exists(LXMF_MESSAGES_FILE): try: with open(LXMF_MESSAGES_FILE, "r") as f: _messages = json.load(f) logger.info("Loaded %d persisted messages", len(_messages)) except (json.JSONDecodeError, OSError) as e: logger.warning("Could not load messages: %s", e) _messages = [] def _save_messages(): """Persist messages to storage (call with _lock held).""" try: os.makedirs(os.path.dirname(LXMF_MESSAGES_FILE), exist_ok=True) with open(LXMF_MESSAGES_FILE, "w") as f: json.dump(_messages, f) except OSError as e: logger.warning("Could not save messages: %s", e)