205 lines
5.8 KiB
Python
205 lines
5.8 KiB
Python
"""
|
|
LXMF bridge — message send/receive via the LXMF protocol on Reticulum.
|
|
"""
|
|
|
|
import json
|
|
import time
|
|
import uuid
|
|
import threading
|
|
import logging
|
|
import os
|
|
|
|
import RNS
|
|
import LXMF
|
|
|
|
from .config import LXMF_STORAGE_DIR, LXMF_MESSAGES_FILE
|
|
|
|
logger = logging.getLogger("rmesh.lxmf")
|
|
|
|
_router: LXMF.LXMRouter | None = None
|
|
_local_delivery_destination: RNS.Destination | None = None
|
|
_identity: RNS.Identity | None = None
|
|
_messages: list[dict] = []
|
|
_lock = threading.Lock()
|
|
|
|
|
|
def init(identity: RNS.Identity):
|
|
"""Initialize the LXMF router with the given identity."""
|
|
global _router, _local_delivery_destination, _identity
|
|
|
|
_identity = identity
|
|
|
|
os.makedirs(LXMF_STORAGE_DIR, exist_ok=True)
|
|
|
|
_router = LXMF.LXMRouter(
|
|
identity=_identity,
|
|
storagepath=LXMF_STORAGE_DIR,
|
|
)
|
|
|
|
# Enable propagation node
|
|
_router.enable_propagation()
|
|
|
|
# Register local delivery destination for receiving messages
|
|
_local_delivery_destination = _router.register_delivery_identity(
|
|
_identity,
|
|
display_name="rMesh Bridge",
|
|
)
|
|
|
|
_router.register_delivery_callback(_delivery_callback)
|
|
|
|
# Load persisted messages
|
|
_load_messages()
|
|
|
|
logger.info("LXMF bridge ready — delivery hash: %s", _identity.hexhash)
|
|
|
|
|
|
def _delivery_callback(message):
|
|
"""Handle incoming LXMF message."""
|
|
msg_record = {
|
|
"id": str(uuid.uuid4()),
|
|
"direction": "inbound",
|
|
"sender_hash": message.source_hash.hex() if message.source_hash else "",
|
|
"recipient_hash": _identity.hexhash if _identity else "",
|
|
"title": message.title.decode("utf-8") if isinstance(message.title, bytes) else str(message.title or ""),
|
|
"content": message.content.decode("utf-8") if isinstance(message.content, bytes) else str(message.content or ""),
|
|
"status": "delivered",
|
|
"timestamp": time.time(),
|
|
}
|
|
|
|
with _lock:
|
|
_messages.append(msg_record)
|
|
_save_messages()
|
|
|
|
logger.info("Received LXMF message from %s", msg_record["sender_hash"][:16])
|
|
|
|
|
|
def send_message(destination_hash_hex: str, content: str, title: str = "") -> dict:
|
|
"""Send an LXMF message to a destination hash."""
|
|
if _router is None or _identity is None:
|
|
return {"id": "", "status": "failed"}
|
|
|
|
try:
|
|
dest_hash = bytes.fromhex(destination_hash_hex)
|
|
except ValueError:
|
|
return {"id": "", "status": "failed"}
|
|
|
|
# Look up identity for destination
|
|
dest_identity = RNS.Identity.recall(dest_hash)
|
|
|
|
msg_id = str(uuid.uuid4())
|
|
|
|
if dest_identity is None:
|
|
# Request path first, the message may be deliverable later
|
|
RNS.Transport.request_path(dest_hash)
|
|
msg_record = {
|
|
"id": msg_id,
|
|
"direction": "outbound",
|
|
"sender_hash": _identity.hexhash,
|
|
"recipient_hash": destination_hash_hex,
|
|
"title": title,
|
|
"content": content,
|
|
"status": "pending",
|
|
"timestamp": time.time(),
|
|
}
|
|
with _lock:
|
|
_messages.append(msg_record)
|
|
_save_messages()
|
|
return msg_record
|
|
|
|
# Create destination for the recipient
|
|
dest = RNS.Destination(
|
|
dest_identity, RNS.Destination.OUT, RNS.Destination.SINGLE,
|
|
"lxmf", "delivery",
|
|
)
|
|
|
|
lxm = LXMF.LXMessage(
|
|
dest,
|
|
_router.get_delivery_destination(),
|
|
content.encode("utf-8") if isinstance(content, str) else content,
|
|
title=title.encode("utf-8") if isinstance(title, str) else title,
|
|
desired_method=LXMF.LXMessage.DIRECT,
|
|
)
|
|
|
|
msg_record = {
|
|
"id": msg_id,
|
|
"direction": "outbound",
|
|
"sender_hash": _identity.hexhash,
|
|
"recipient_hash": destination_hash_hex,
|
|
"title": title,
|
|
"content": content,
|
|
"status": "pending",
|
|
"timestamp": time.time(),
|
|
}
|
|
|
|
def _delivery_callback_outbound(message):
|
|
with _lock:
|
|
for m in _messages:
|
|
if m["id"] == msg_id:
|
|
m["status"] = "delivered"
|
|
break
|
|
_save_messages()
|
|
|
|
def _failed_callback(message):
|
|
with _lock:
|
|
for m in _messages:
|
|
if m["id"] == msg_id:
|
|
m["status"] = "failed"
|
|
break
|
|
_save_messages()
|
|
|
|
lxm.delivery_callback = _delivery_callback_outbound
|
|
lxm.failed_callback = _failed_callback
|
|
|
|
_router.handle_outbound(lxm)
|
|
|
|
with _lock:
|
|
_messages.append(msg_record)
|
|
_save_messages()
|
|
|
|
return msg_record
|
|
|
|
|
|
def get_messages(limit: int = 100, offset: int = 0) -> list[dict]:
|
|
"""Return stored messages."""
|
|
with _lock:
|
|
sorted_msgs = sorted(_messages, key=lambda m: m["timestamp"], reverse=True)
|
|
return sorted_msgs[offset:offset + limit]
|
|
|
|
|
|
def get_message(msg_id: str) -> dict | None:
|
|
"""Return a single message by ID."""
|
|
with _lock:
|
|
for m in _messages:
|
|
if m["id"] == msg_id:
|
|
return m
|
|
return None
|
|
|
|
|
|
def get_total_count() -> int:
|
|
"""Return total message count."""
|
|
with _lock:
|
|
return len(_messages)
|
|
|
|
|
|
def _load_messages():
|
|
"""Load messages from persistent storage."""
|
|
global _messages
|
|
if os.path.exists(LXMF_MESSAGES_FILE):
|
|
try:
|
|
with open(LXMF_MESSAGES_FILE, "r") as f:
|
|
_messages = json.load(f)
|
|
logger.info("Loaded %d persisted messages", len(_messages))
|
|
except (json.JSONDecodeError, OSError) as e:
|
|
logger.warning("Could not load messages: %s", e)
|
|
_messages = []
|
|
|
|
|
|
def _save_messages():
|
|
"""Persist messages to storage (call with _lock held)."""
|
|
try:
|
|
os.makedirs(os.path.dirname(LXMF_MESSAGES_FILE), exist_ok=True)
|
|
with open(LXMF_MESSAGES_FILE, "w") as f:
|
|
json.dump(_messages, f)
|
|
except OSError as e:
|
|
logger.warning("Could not save messages: %s", e)
|