rmesh-reticulum/app/lxmf_bridge.py

534 lines
20 KiB
Python

"""
LXMF bridge — rich message handling, propagation node management,
auto-resend on announce, file/image/audio attachments.
"""
import base64
import json
import time
import uuid
import threading
import logging
import os
import RNS
import LXMF
from .config import (
LXMF_STORAGE_DIR, LXMF_MESSAGES_FILE, LXMF_ATTACHMENTS_DIR,
AUTO_RESEND_ON_ANNOUNCE, LOCAL_PROPAGATION_NODE,
PREFERRED_PROPAGATION_NODE, PROPAGATION_SYNC_INTERVAL,
)
logger = logging.getLogger("rmesh.lxmf")
_router: LXMF.LXMRouter | None = None
_local_delivery_destination: RNS.Destination | None = None
_identity: RNS.Identity | None = None
_messages: list[dict] = []
_lock = threading.Lock()
_event_listeners: list = [] # WebSocket broadcast callbacks
_propagation_nodes: dict[str, dict] = {}
_last_propagation_sync: float = 0
def init(identity: RNS.Identity):
"""Initialize the LXMF router with the given identity."""
global _router, _local_delivery_destination, _identity
_identity = identity
os.makedirs(LXMF_STORAGE_DIR, exist_ok=True)
os.makedirs(LXMF_ATTACHMENTS_DIR, exist_ok=True)
_router = LXMF.LXMRouter(
identity=_identity,
storagepath=LXMF_STORAGE_DIR,
)
# Increase transfer limit for attachments (10 MB)
_router.delivery_per_transfer_limit = 10000
# Enable local propagation node
if LOCAL_PROPAGATION_NODE:
_router.enable_propagation()
logger.info("Local LXMF propagation node enabled")
# Set preferred propagation node
if PREFERRED_PROPAGATION_NODE:
try:
_router.set_outbound_propagation_node(bytes.fromhex(PREFERRED_PROPAGATION_NODE))
logger.info("Set preferred propagation node: %s", PREFERRED_PROPAGATION_NODE[:16])
except Exception as e:
logger.warning("Failed to set propagation node: %s", e)
# Register local delivery
_local_delivery_destination = _router.register_delivery_identity(
_identity,
display_name="rMesh Bridge",
)
_router.register_delivery_callback(_delivery_callback)
# Register announce handlers for auto-resend and propagation tracking
RNS.Transport.register_announce_handler(_LxmfAnnounceHandler())
RNS.Transport.register_announce_handler(_PropagationAnnounceHandler())
_load_messages()
# Start propagation sync thread
if PROPAGATION_SYNC_INTERVAL > 0:
sync_thread = threading.Thread(target=_propagation_sync_loop, daemon=True)
sync_thread.start()
logger.info("LXMF bridge ready — delivery hash: %s", _identity.hexhash)
def register_event_listener(callback):
"""Register a callback for real-time events (WebSocket broadcast)."""
_event_listeners.append(callback)
def _emit_event(event_type: str, data: dict):
"""Emit event to all registered listeners."""
event = {"type": event_type, **data}
for listener in _event_listeners:
try:
listener(event)
except Exception:
pass
# ═══════════════════════════════════════════════════════════════════
# LXMF Rich Field Parsing
# ═══════════════════════════════════════════════════════════════════
def _parse_lxmf_fields(lxmf_message) -> dict:
"""Extract all LXMF fields from a message into a serializable dict."""
fields = {}
try:
lxmf_fields = lxmf_message.get_fields()
except Exception:
return fields
# Image attachment
if LXMF.FIELD_IMAGE in lxmf_fields:
try:
image_data = lxmf_fields[LXMF.FIELD_IMAGE]
image_type = image_data[0] if len(image_data) > 0 else "unknown"
image_bytes = image_data[1] if len(image_data) > 1 else b""
if image_bytes:
attachment_id = str(uuid.uuid4())
ext = "jpg" if "jpeg" in str(image_type).lower() or "jpg" in str(image_type).lower() else "png"
filename = f"{attachment_id}.{ext}"
filepath = os.path.join(LXMF_ATTACHMENTS_DIR, filename)
with open(filepath, "wb") as f:
f.write(image_bytes if isinstance(image_bytes, bytes) else bytes(image_bytes))
fields["image"] = {
"type": str(image_type),
"filename": filename,
"size": len(image_bytes),
"base64": base64.b64encode(image_bytes if isinstance(image_bytes, bytes) else bytes(image_bytes)).decode("ascii")[:200] + "...",
}
except Exception as e:
logger.warning("Failed to parse image field: %s", e)
# File attachments
if LXMF.FIELD_FILE_ATTACHMENTS in lxmf_fields:
try:
file_list = lxmf_fields[LXMF.FIELD_FILE_ATTACHMENTS]
attachments = []
for file_data in file_list:
file_name = file_data[0] if len(file_data) > 0 else "unknown"
file_bytes = file_data[1] if len(file_data) > 1 else b""
if file_bytes:
safe_name = "".join(c for c in str(file_name) if c.isalnum() or c in ".-_")
attachment_id = str(uuid.uuid4())
stored_name = f"{attachment_id}_{safe_name}"
filepath = os.path.join(LXMF_ATTACHMENTS_DIR, stored_name)
with open(filepath, "wb") as f:
f.write(file_bytes if isinstance(file_bytes, bytes) else bytes(file_bytes))
attachments.append({
"name": str(file_name),
"stored_name": stored_name,
"size": len(file_bytes),
})
if attachments:
fields["file_attachments"] = attachments
except Exception as e:
logger.warning("Failed to parse file attachments: %s", e)
# Audio message (Codec2)
if LXMF.FIELD_AUDIO in lxmf_fields:
try:
audio_data = lxmf_fields[LXMF.FIELD_AUDIO]
audio_mode = audio_data[0] if len(audio_data) > 0 else 0
audio_bytes = audio_data[1] if len(audio_data) > 1 else b""
if audio_bytes:
attachment_id = str(uuid.uuid4())
filename = f"{attachment_id}.c2"
filepath = os.path.join(LXMF_ATTACHMENTS_DIR, filename)
with open(filepath, "wb") as f:
f.write(audio_bytes if isinstance(audio_bytes, bytes) else bytes(audio_bytes))
fields["audio"] = {
"codec": "codec2",
"mode": audio_mode,
"filename": filename,
"size": len(audio_bytes),
}
except Exception as e:
logger.warning("Failed to parse audio field: %s", e)
# Icon appearance
if LXMF.FIELD_ICON_APPEARANCE in lxmf_fields:
try:
icon_data = lxmf_fields[LXMF.FIELD_ICON_APPEARANCE]
fields["icon"] = {
"name": str(icon_data[0]) if len(icon_data) > 0 else "",
"foreground": "#" + icon_data[1].hex() if len(icon_data) > 1 and hasattr(icon_data[1], "hex") else "",
"background": "#" + icon_data[2].hex() if len(icon_data) > 2 and hasattr(icon_data[2], "hex") else "",
}
except Exception as e:
logger.warning("Failed to parse icon field: %s", e)
# Commands
if LXMF.FIELD_COMMANDS in lxmf_fields:
try:
fields["commands"] = [str(cmd) for cmd in lxmf_fields[LXMF.FIELD_COMMANDS]]
except Exception:
pass
return fields
# ═══════════════════════════════════════════════════════════════════
# Message Handling
# ═══════════════════════════════════════════════════════════════════
def _delivery_callback(message):
"""Handle incoming LXMF message with rich field extraction."""
# Parse rich fields
fields = _parse_lxmf_fields(message)
# Get signal quality if available
rssi = getattr(message, "rssi", None)
snr = getattr(message, "snr", None)
quality = getattr(message, "quality", None)
msg_record = {
"id": str(uuid.uuid4()),
"direction": "inbound",
"sender_hash": message.source_hash.hex() if message.source_hash else "",
"recipient_hash": _identity.hexhash if _identity else "",
"title": message.title.decode("utf-8") if isinstance(message.title, bytes) else str(message.title or ""),
"content": message.content.decode("utf-8") if isinstance(message.content, bytes) else str(message.content or ""),
"fields": fields,
"rssi": rssi,
"snr": snr,
"quality": quality,
"status": "delivered",
"timestamp": time.time(),
}
with _lock:
_messages.append(msg_record)
_save_messages()
logger.info("Received LXMF message from %s (fields: %s)",
msg_record["sender_hash"][:16],
list(fields.keys()) if fields else "none")
_emit_event("lxmf.delivery", {"message": msg_record})
def send_message(destination_hash_hex: str, content: str, title: str = "",
image_bytes: bytes = None, image_type: str = "image/jpeg",
file_attachments: list = None,
audio_bytes: bytes = None, audio_mode: int = 0,
try_propagation_on_fail: bool = True) -> dict:
"""Send an LXMF message with optional attachments."""
if _router is None or _identity is None:
return {"id": "", "status": "failed"}
try:
dest_hash = bytes.fromhex(destination_hash_hex)
except ValueError:
return {"id": "", "status": "failed"}
dest_identity = RNS.Identity.recall(dest_hash)
msg_id = str(uuid.uuid4())
if dest_identity is None:
RNS.Transport.request_path(dest_hash)
msg_record = {
"id": msg_id, "direction": "outbound",
"sender_hash": _identity.hexhash, "recipient_hash": destination_hash_hex,
"title": title, "content": content, "fields": {},
"status": "pending", "timestamp": time.time(),
}
with _lock:
_messages.append(msg_record)
_save_messages()
return msg_record
dest = RNS.Destination(
dest_identity, RNS.Destination.OUT, RNS.Destination.SINGLE,
"lxmf", "delivery",
)
lxm = LXMF.LXMessage(
dest, _router.get_delivery_destination(),
content.encode("utf-8") if isinstance(content, str) else content,
title=title.encode("utf-8") if isinstance(title, str) else title,
desired_method=LXMF.LXMessage.DIRECT,
)
# Add rich fields
if image_bytes:
lxm.fields[LXMF.FIELD_IMAGE] = [image_type, image_bytes]
if file_attachments:
lxm.fields[LXMF.FIELD_FILE_ATTACHMENTS] = [
[att["name"], att["bytes"]] for att in file_attachments
]
if audio_bytes:
lxm.fields[LXMF.FIELD_AUDIO] = [audio_mode, audio_bytes]
# Enable propagation fallback
lxm.try_propagation_on_fail = try_propagation_on_fail
msg_record = {
"id": msg_id, "direction": "outbound",
"sender_hash": _identity.hexhash, "recipient_hash": destination_hash_hex,
"title": title, "content": content, "fields": {},
"status": "pending", "timestamp": time.time(),
}
def _on_delivered(message):
with _lock:
for m in _messages:
if m["id"] == msg_id:
m["status"] = "delivered"
break
_save_messages()
_emit_event("lxmf.state_updated", {"message_id": msg_id, "status": "delivered"})
def _on_failed(message):
# Try propagation node if direct failed
if getattr(message, "try_propagation_on_fail", False) and _router.outbound_propagation_node:
logger.info("Direct delivery failed, trying propagation node for %s", destination_hash_hex[:16])
message.desired_method = LXMF.LXMessage.PROPAGATED
message.try_propagation_on_fail = False
_router.handle_outbound(message)
return
with _lock:
for m in _messages:
if m["id"] == msg_id:
m["status"] = "failed"
break
_save_messages()
_emit_event("lxmf.state_updated", {"message_id": msg_id, "status": "failed"})
lxm.delivery_callback = _on_delivered
lxm.failed_callback = _on_failed
_router.handle_outbound(lxm)
with _lock:
_messages.append(msg_record)
_save_messages()
_emit_event("lxmf.created", {"message": msg_record})
return msg_record
# ═══════════════════════════════════════════════════════════════════
# Auto-Resend Failed Messages on Announce
# ═══════════════════════════════════════════════════════════════════
class _LxmfAnnounceHandler:
"""Re-attempt failed messages when a peer announces."""
aspect_filter = "lxmf.delivery"
def received_announce(self, destination_hash, announced_identity, app_data, announce_packet_hash=None):
if not AUTO_RESEND_ON_ANNOUNCE:
return
dest_hex = destination_hash.hex()
resend_count = 0
with _lock:
for msg in _messages:
if (msg["status"] == "failed" and
msg["direction"] == "outbound" and
msg["recipient_hash"] == dest_hex):
msg["status"] = "pending"
resend_count += 1
# Re-send in background
threading.Thread(
target=_resend_message, args=(msg,), daemon=True
).start()
if resend_count > 0:
_save_messages()
if resend_count > 0:
logger.info("Auto-resending %d failed messages to %s", resend_count, dest_hex[:16])
def _resend_message(msg_record: dict):
"""Resend a previously failed message."""
time.sleep(1) # Brief delay to let path establish
result = send_message(
msg_record["recipient_hash"],
msg_record["content"],
msg_record.get("title", ""),
)
if result.get("status") == "pending":
# Remove the duplicate — original is being retried
with _lock:
_messages[:] = [m for m in _messages if m["id"] != result["id"]]
_save_messages()
# ═══════════════════════════════════════════════════════════════════
# Propagation Node Management
# ═══════════════════════════════════════════════════════════════════
class _PropagationAnnounceHandler:
"""Track LXMF propagation node announces."""
aspect_filter = "lxmf.propagation"
def received_announce(self, destination_hash, announced_identity, app_data, announce_packet_hash=None):
dest_hex = destination_hash.hex()
node_info = {
"destination_hash": dest_hex,
"identity_hash": announced_identity.hexhash if announced_identity else "",
"last_heard": time.time(),
"enabled": True,
"per_transfer_limit": 0,
}
# Parse propagation node app data
if app_data:
try:
import RNS.vendor.umsgpack as msgpack
data = msgpack.unpackb(app_data)
if isinstance(data, dict):
node_info["enabled"] = data.get("enabled", True)
node_info["per_transfer_limit"] = data.get("per_transfer_limit", 0)
except Exception:
pass
_propagation_nodes[dest_hex] = node_info
_emit_event("propagation.announce", {"node": node_info})
def get_propagation_nodes() -> list[dict]:
"""Return list of known propagation nodes."""
return list(_propagation_nodes.values())
def set_preferred_propagation_node(destination_hash_hex: str) -> bool:
"""Set the preferred outbound propagation node."""
if _router is None:
return False
try:
if destination_hash_hex:
_router.set_outbound_propagation_node(bytes.fromhex(destination_hash_hex))
else:
_router.outbound_propagation_node = None
return True
except Exception as e:
logger.warning("Failed to set propagation node: %s", e)
return False
def get_propagation_status() -> dict:
"""Return propagation node status."""
if _router is None:
return {"enabled": False, "outbound_node": None, "transfer_state": "idle"}
outbound = _router.get_outbound_propagation_node()
return {
"local_enabled": LOCAL_PROPAGATION_NODE,
"outbound_node": outbound.hex() if outbound else None,
"transfer_state": str(getattr(_router, "propagation_transfer_state", "unknown")),
"known_nodes": len(_propagation_nodes),
"last_sync": _last_propagation_sync,
}
def sync_propagation_messages():
"""Request messages from the propagation node."""
global _last_propagation_sync
if _router is None or _identity is None:
return
try:
_router.request_messages_from_propagation_node(_identity)
_last_propagation_sync = time.time()
logger.info("Syncing messages from propagation node")
except Exception as e:
logger.warning("Propagation sync failed: %s", e)
def _propagation_sync_loop():
"""Background thread to periodically sync propagation messages."""
while True:
time.sleep(PROPAGATION_SYNC_INTERVAL)
if _router and _router.get_outbound_propagation_node():
sync_propagation_messages()
# ═══════════════════════════════════════════════════════════════════
# Query / Persistence
# ═══════════════════════════════════════════════════════════════════
def get_messages(limit: int = 100, offset: int = 0) -> list[dict]:
with _lock:
sorted_msgs = sorted(_messages, key=lambda m: m["timestamp"], reverse=True)
return sorted_msgs[offset:offset + limit]
def get_message(msg_id: str) -> dict | None:
with _lock:
for m in _messages:
if m["id"] == msg_id:
return m
return None
def get_total_count() -> int:
with _lock:
return len(_messages)
def get_attachment_path(filename: str) -> str | None:
"""Return full path to an attachment file if it exists."""
filepath = os.path.join(LXMF_ATTACHMENTS_DIR, filename)
if os.path.exists(filepath):
return filepath
return None
def _load_messages():
global _messages
if os.path.exists(LXMF_MESSAGES_FILE):
try:
with open(LXMF_MESSAGES_FILE, "r") as f:
_messages = json.load(f)
logger.info("Loaded %d persisted messages", len(_messages))
except (json.JSONDecodeError, OSError) as e:
logger.warning("Could not load messages: %s", e)
_messages = []
def _save_messages():
try:
os.makedirs(os.path.dirname(LXMF_MESSAGES_FILE), exist_ok=True)
with open(LXMF_MESSAGES_FILE, "w") as f:
json.dump(_messages, f)
except OSError as e:
logger.warning("Could not save messages: %s", e)