rmesh-reticulum/app/meshcore_bridge.py

349 lines
11 KiB
Python

"""
MeshCore bridge — connects to a MeshCore Companion node via TCP or serial,
handles contacts, messaging, and device status.
"""
import asyncio
import json
import time
import uuid
import os
import logging
from typing import Optional
from .config import (
MESHCORE_HOST, MESHCORE_PORT, MESHCORE_SERIAL,
MESHCORE_DATA_DIR, MESHCORE_MESSAGES_FILE,
)
logger = logging.getLogger("rmesh.meshcore")
_mc = None # MeshCore instance
_connected = False
_device_info: dict = {}
_contacts: list[dict] = []
_messages: list[dict] = []
_event_loop: Optional[asyncio.AbstractEventLoop] = None
_listener_task: Optional[asyncio.Task] = None
async def init():
"""Connect to MeshCore companion node."""
global _mc, _connected, _event_loop
import meshcore as mc_lib
os.makedirs(MESHCORE_DATA_DIR, exist_ok=True)
_load_messages()
_event_loop = asyncio.get_event_loop()
try:
if MESHCORE_SERIAL:
logger.info("Connecting to MeshCore companion via serial: %s", MESHCORE_SERIAL)
_mc = await mc_lib.MeshCore.create_serial(
port=MESHCORE_SERIAL,
auto_reconnect=True,
max_reconnect_attempts=10,
)
elif MESHCORE_HOST:
logger.info("Connecting to MeshCore companion via TCP: %s:%d", MESHCORE_HOST, MESHCORE_PORT)
_mc = await mc_lib.MeshCore.create_tcp(
host=MESHCORE_HOST,
port=MESHCORE_PORT,
auto_reconnect=True,
max_reconnect_attempts=10,
)
else:
logger.warning("No MeshCore connection configured (set MESHCORE_HOST or MESHCORE_SERIAL)")
return
await _mc.connect()
_connected = True
# Subscribe to events
_mc.subscribe(mc_lib.EventType.CONTACT_MSG_RECV, _on_contact_message)
_mc.subscribe(mc_lib.EventType.CHANNEL_MSG_RECV, _on_channel_message)
_mc.subscribe(mc_lib.EventType.NEW_CONTACT, _on_new_contact)
_mc.subscribe(mc_lib.EventType.CONNECTED, _on_connected)
_mc.subscribe(mc_lib.EventType.DISCONNECTED, _on_disconnected)
# Fetch initial state
await _refresh_device_info()
await _refresh_contacts()
logger.info("MeshCore bridge ready — device: %s", _device_info.get("name", "unknown"))
except Exception as e:
logger.error("Failed to connect to MeshCore companion: %s", e)
_connected = False
async def _refresh_device_info():
"""Fetch device info from companion."""
global _device_info
if not _mc:
return
try:
info = await _mc.commands.device.send_device_query()
if info:
_device_info = {
"name": getattr(info, "name", ""),
"firmware": getattr(info, "firmware", ""),
"freq": getattr(info, "freq", 0),
"bw": getattr(info, "bw", 0),
"sf": getattr(info, "sf", 0),
"cr": getattr(info, "cr", 0),
"tx_power": getattr(info, "tx_power", 0),
}
except Exception as e:
logger.warning("Failed to query device info: %s", e)
async def _refresh_contacts():
"""Fetch contacts from companion."""
global _contacts
if not _mc:
return
try:
await _mc.get_contacts_async()
# meshcore_py stores contacts internally
contacts_raw = _mc._contacts if hasattr(_mc, '_contacts') else {}
_contacts = []
for key, contact in contacts_raw.items():
_contacts.append({
"key_prefix": key[:16] if isinstance(key, str) else "",
"name": getattr(contact, "name", "unknown"),
"type": getattr(contact, "type", 0),
"last_seen": getattr(contact, "last_seen", None),
"path_known": getattr(contact, "path_known", False),
"public_key": key if isinstance(key, str) else "",
})
except Exception as e:
logger.warning("Failed to fetch contacts: %s", e)
def _on_contact_message(event):
"""Handle incoming direct message."""
msg = {
"id": str(uuid.uuid4()),
"type": "direct",
"direction": "inbound",
"sender": getattr(event, "sender_name", "") or getattr(event, "sender", ""),
"sender_key": getattr(event, "sender_key", ""),
"content": getattr(event, "text", "") or getattr(event, "content", ""),
"channel": None,
"timestamp": time.time(),
"status": "delivered",
}
_messages.append(msg)
_save_messages()
logger.info("MeshCore DM from %s: %s", msg["sender"], msg["content"][:50])
def _on_channel_message(event):
"""Handle incoming channel message."""
msg = {
"id": str(uuid.uuid4()),
"type": "channel",
"direction": "inbound",
"sender": getattr(event, "sender_name", "") or getattr(event, "sender", ""),
"sender_key": getattr(event, "sender_key", ""),
"content": getattr(event, "text", "") or getattr(event, "content", ""),
"channel": getattr(event, "channel_name", "") or getattr(event, "channel", ""),
"timestamp": time.time(),
"status": "delivered",
}
_messages.append(msg)
_save_messages()
logger.info("MeshCore channel msg [%s] from %s", msg["channel"], msg["sender"])
def _on_new_contact(event):
"""Handle new contact discovery."""
logger.info("MeshCore new contact: %s", getattr(event, "name", "unknown"))
# Refresh contacts in background
if _event_loop:
asyncio.run_coroutine_threadsafe(_refresh_contacts(), _event_loop)
def _on_connected(event):
global _connected
_connected = True
logger.info("MeshCore companion connected")
def _on_disconnected(event):
global _connected
_connected = False
logger.warning("MeshCore companion disconnected")
# --- Public API ---
def get_status() -> dict:
"""Return MeshCore connection status."""
return {
"connected": _connected,
"device_info": _device_info,
"contact_count": len(_contacts),
"message_count": len(_messages),
}
def get_contacts() -> list[dict]:
"""Return known contacts."""
return list(_contacts)
async def refresh_contacts() -> list[dict]:
"""Force refresh contacts from companion."""
await _refresh_contacts()
return list(_contacts)
async def send_message(contact_name: str, content: str) -> dict:
"""Send a direct message to a contact by name."""
if not _mc or not _connected:
return {"id": "", "status": "failed", "error": "Not connected"}
try:
contact = _mc.get_contact_by_name(contact_name)
if not contact:
return {"id": "", "status": "failed", "error": f"Contact '{contact_name}' not found"}
await _mc.commands.messaging.send_msg(contact, content)
msg = {
"id": str(uuid.uuid4()),
"type": "direct",
"direction": "outbound",
"sender": _device_info.get("name", "self"),
"sender_key": "",
"content": content,
"channel": None,
"recipient": contact_name,
"timestamp": time.time(),
"status": "sent",
}
_messages.append(msg)
_save_messages()
return msg
except Exception as e:
logger.error("Failed to send message: %s", e)
return {"id": "", "status": "failed", "error": str(e)}
async def send_channel_message(channel_idx: int, content: str) -> dict:
"""Send a message to a channel by index."""
if not _mc or not _connected:
return {"id": "", "status": "failed", "error": "Not connected"}
try:
await _mc.commands.messaging.send_chan_msg(channel_idx, content)
msg = {
"id": str(uuid.uuid4()),
"type": "channel",
"direction": "outbound",
"sender": _device_info.get("name", "self"),
"sender_key": "",
"content": content,
"channel": str(channel_idx),
"timestamp": time.time(),
"status": "sent",
}
_messages.append(msg)
_save_messages()
return msg
except Exception as e:
logger.error("Failed to send channel message: %s", e)
return {"id": "", "status": "failed", "error": str(e)}
async def send_advert() -> dict:
"""Send an advertisement to the mesh."""
if not _mc or not _connected:
return {"advertised": False, "error": "Not connected"}
try:
await _mc.commands.device.send_advert()
return {"advertised": True, "name": _device_info.get("name", "")}
except Exception as e:
return {"advertised": False, "error": str(e)}
async def get_device_stats() -> dict:
"""Get radio and core stats from companion."""
if not _mc or not _connected:
return {}
stats = {}
try:
radio = await _mc.commands.device.get_stats_radio()
if radio:
stats["radio"] = {k: v for k, v in vars(radio).items() if not k.startswith("_")} if hasattr(radio, "__dict__") else str(radio)
except Exception:
pass
try:
core = await _mc.commands.device.get_stats_core()
if core:
stats["core"] = {k: v for k, v in vars(core).items() if not k.startswith("_")} if hasattr(core, "__dict__") else str(core)
except Exception:
pass
try:
bat = await _mc.commands.device.get_bat()
if bat is not None:
stats["battery"] = bat
except Exception:
pass
return stats
def get_messages(limit: int = 100, offset: int = 0) -> list[dict]:
"""Return stored messages."""
sorted_msgs = sorted(_messages, key=lambda m: m["timestamp"], reverse=True)
return sorted_msgs[offset:offset + limit]
def get_message_count() -> int:
return len(_messages)
async def disconnect():
"""Disconnect from companion."""
global _connected
if _mc:
try:
await _mc.disconnect()
except Exception:
pass
_connected = False
# --- Persistence ---
def _load_messages():
global _messages
if os.path.exists(MESHCORE_MESSAGES_FILE):
try:
with open(MESHCORE_MESSAGES_FILE, "r") as f:
_messages = json.load(f)
logger.info("Loaded %d MeshCore messages", len(_messages))
except (json.JSONDecodeError, OSError) as e:
logger.warning("Could not load MeshCore messages: %s", e)
_messages = []
def _save_messages():
try:
os.makedirs(os.path.dirname(MESHCORE_MESSAGES_FILE), exist_ok=True)
with open(MESHCORE_MESSAGES_FILE, "w") as f:
json.dump(_messages, f)
except OSError as e:
logger.warning("Could not save MeshCore messages: %s", e)