rmesh-reticulum/app/lxmf_bridge.py

205 lines
5.8 KiB
Python

"""
LXMF bridge — message send/receive via the LXMF protocol on Reticulum.
"""
import json
import time
import uuid
import threading
import logging
import os
import RNS
import LXMF
from .config import LXMF_STORAGE_DIR, LXMF_MESSAGES_FILE
logger = logging.getLogger("rmesh.lxmf")
_router: LXMF.LXMRouter | None = None
_local_delivery_destination: RNS.Destination | None = None
_identity: RNS.Identity | None = None
_messages: list[dict] = []
_lock = threading.Lock()
def init(identity: RNS.Identity):
"""Initialize the LXMF router with the given identity."""
global _router, _local_delivery_destination, _identity
_identity = identity
os.makedirs(LXMF_STORAGE_DIR, exist_ok=True)
_router = LXMF.LXMRouter(
identity=_identity,
storagepath=LXMF_STORAGE_DIR,
)
# Enable propagation node
_router.enable_propagation()
# Register local delivery destination for receiving messages
_local_delivery_destination = _router.register_delivery_identity(
_identity,
display_name="rMesh Bridge",
)
_router.register_delivery_callback(_delivery_callback)
# Load persisted messages
_load_messages()
logger.info("LXMF bridge ready — delivery hash: %s", _identity.hexhash)
def _delivery_callback(message):
"""Handle incoming LXMF message."""
msg_record = {
"id": str(uuid.uuid4()),
"direction": "inbound",
"sender_hash": message.source_hash.hex() if message.source_hash else "",
"recipient_hash": _identity.hexhash if _identity else "",
"title": message.title.decode("utf-8") if isinstance(message.title, bytes) else str(message.title or ""),
"content": message.content.decode("utf-8") if isinstance(message.content, bytes) else str(message.content or ""),
"status": "delivered",
"timestamp": time.time(),
}
with _lock:
_messages.append(msg_record)
_save_messages()
logger.info("Received LXMF message from %s", msg_record["sender_hash"][:16])
def send_message(destination_hash_hex: str, content: str, title: str = "") -> dict:
"""Send an LXMF message to a destination hash."""
if _router is None or _identity is None:
return {"id": "", "status": "failed"}
try:
dest_hash = bytes.fromhex(destination_hash_hex)
except ValueError:
return {"id": "", "status": "failed"}
# Look up identity for destination
dest_identity = RNS.Identity.recall(dest_hash)
msg_id = str(uuid.uuid4())
if dest_identity is None:
# Request path first, the message may be deliverable later
RNS.Transport.request_path(dest_hash)
msg_record = {
"id": msg_id,
"direction": "outbound",
"sender_hash": _identity.hexhash,
"recipient_hash": destination_hash_hex,
"title": title,
"content": content,
"status": "pending",
"timestamp": time.time(),
}
with _lock:
_messages.append(msg_record)
_save_messages()
return msg_record
# Create destination for the recipient
dest = RNS.Destination(
dest_identity, RNS.Destination.OUT, RNS.Destination.SINGLE,
"lxmf", "delivery",
)
lxm = LXMF.LXMessage(
dest,
_router.get_delivery_destination(),
content.encode("utf-8") if isinstance(content, str) else content,
title=title.encode("utf-8") if isinstance(title, str) else title,
desired_method=LXMF.LXMessage.DIRECT,
)
msg_record = {
"id": msg_id,
"direction": "outbound",
"sender_hash": _identity.hexhash,
"recipient_hash": destination_hash_hex,
"title": title,
"content": content,
"status": "pending",
"timestamp": time.time(),
}
def _delivery_callback_outbound(message):
with _lock:
for m in _messages:
if m["id"] == msg_id:
m["status"] = "delivered"
break
_save_messages()
def _failed_callback(message):
with _lock:
for m in _messages:
if m["id"] == msg_id:
m["status"] = "failed"
break
_save_messages()
lxm.delivery_callback = _delivery_callback_outbound
lxm.failed_callback = _failed_callback
_router.handle_outbound(lxm)
with _lock:
_messages.append(msg_record)
_save_messages()
return msg_record
def get_messages(limit: int = 100, offset: int = 0) -> list[dict]:
"""Return stored messages."""
with _lock:
sorted_msgs = sorted(_messages, key=lambda m: m["timestamp"], reverse=True)
return sorted_msgs[offset:offset + limit]
def get_message(msg_id: str) -> dict | None:
"""Return a single message by ID."""
with _lock:
for m in _messages:
if m["id"] == msg_id:
return m
return None
def get_total_count() -> int:
"""Return total message count."""
with _lock:
return len(_messages)
def _load_messages():
"""Load messages from persistent storage."""
global _messages
if os.path.exists(LXMF_MESSAGES_FILE):
try:
with open(LXMF_MESSAGES_FILE, "r") as f:
_messages = json.load(f)
logger.info("Loaded %d persisted messages", len(_messages))
except (json.JSONDecodeError, OSError) as e:
logger.warning("Could not load messages: %s", e)
_messages = []
def _save_messages():
"""Persist messages to storage (call with _lock held)."""
try:
os.makedirs(os.path.dirname(LXMF_MESSAGES_FILE), exist_ok=True)
with open(LXMF_MESSAGES_FILE, "w") as f:
json.dump(_messages, f)
except OSError as e:
logger.warning("Could not save messages: %s", e)