""" MeshCore bridge — connects to a MeshCore Companion node via TCP or serial, handles contacts, messaging, and device status. """ import asyncio import json import time import uuid import os import logging from typing import Optional from .config import ( MESHCORE_HOST, MESHCORE_PORT, MESHCORE_SERIAL, MESHCORE_DATA_DIR, MESHCORE_MESSAGES_FILE, ) logger = logging.getLogger("rmesh.meshcore") _mc = None # MeshCore instance _connected = False _device_info: dict = {} _contacts: list[dict] = [] _messages: list[dict] = [] _event_loop: Optional[asyncio.AbstractEventLoop] = None _listener_task: Optional[asyncio.Task] = None async def init(): """Connect to MeshCore companion node.""" global _mc, _connected, _event_loop import meshcore as mc_lib os.makedirs(MESHCORE_DATA_DIR, exist_ok=True) _load_messages() _event_loop = asyncio.get_event_loop() try: if MESHCORE_SERIAL: logger.info("Connecting to MeshCore companion via serial: %s", MESHCORE_SERIAL) _mc = await mc_lib.MeshCore.create_serial( port=MESHCORE_SERIAL, auto_reconnect=True, max_reconnect_attempts=10, ) elif MESHCORE_HOST: logger.info("Connecting to MeshCore companion via TCP: %s:%d", MESHCORE_HOST, MESHCORE_PORT) _mc = await mc_lib.MeshCore.create_tcp( host=MESHCORE_HOST, port=MESHCORE_PORT, auto_reconnect=True, max_reconnect_attempts=10, ) else: logger.warning("No MeshCore connection configured (set MESHCORE_HOST or MESHCORE_SERIAL)") return await _mc.connect() _connected = True # Subscribe to events _mc.subscribe(mc_lib.EventType.CONTACT_MSG_RECV, _on_contact_message) _mc.subscribe(mc_lib.EventType.CHANNEL_MSG_RECV, _on_channel_message) _mc.subscribe(mc_lib.EventType.NEW_CONTACT, _on_new_contact) _mc.subscribe(mc_lib.EventType.CONNECTED, _on_connected) _mc.subscribe(mc_lib.EventType.DISCONNECTED, _on_disconnected) # Fetch initial state await _refresh_device_info() await _refresh_contacts() logger.info("MeshCore bridge ready — device: %s", _device_info.get("name", "unknown")) except Exception as e: logger.error("Failed to connect to MeshCore companion: %s", e) _connected = False async def _refresh_device_info(): """Fetch device info from companion.""" global _device_info if not _mc: return try: info = await _mc.commands.device.send_device_query() if info: _device_info = { "name": getattr(info, "name", ""), "firmware": getattr(info, "firmware", ""), "freq": getattr(info, "freq", 0), "bw": getattr(info, "bw", 0), "sf": getattr(info, "sf", 0), "cr": getattr(info, "cr", 0), "tx_power": getattr(info, "tx_power", 0), } except Exception as e: logger.warning("Failed to query device info: %s", e) async def _refresh_contacts(): """Fetch contacts from companion.""" global _contacts if not _mc: return try: await _mc.get_contacts_async() # meshcore_py stores contacts internally contacts_raw = _mc._contacts if hasattr(_mc, '_contacts') else {} _contacts = [] for key, contact in contacts_raw.items(): _contacts.append({ "key_prefix": key[:16] if isinstance(key, str) else "", "name": getattr(contact, "name", "unknown"), "type": getattr(contact, "type", 0), "last_seen": getattr(contact, "last_seen", None), "path_known": getattr(contact, "path_known", False), "public_key": key if isinstance(key, str) else "", }) except Exception as e: logger.warning("Failed to fetch contacts: %s", e) def _on_contact_message(event): """Handle incoming direct message.""" msg = { "id": str(uuid.uuid4()), "type": "direct", "direction": "inbound", "sender": getattr(event, "sender_name", "") or getattr(event, "sender", ""), "sender_key": getattr(event, "sender_key", ""), "content": getattr(event, "text", "") or getattr(event, "content", ""), "channel": None, "timestamp": time.time(), "status": "delivered", } _messages.append(msg) _save_messages() logger.info("MeshCore DM from %s: %s", msg["sender"], msg["content"][:50]) def _on_channel_message(event): """Handle incoming channel message.""" msg = { "id": str(uuid.uuid4()), "type": "channel", "direction": "inbound", "sender": getattr(event, "sender_name", "") or getattr(event, "sender", ""), "sender_key": getattr(event, "sender_key", ""), "content": getattr(event, "text", "") or getattr(event, "content", ""), "channel": getattr(event, "channel_name", "") or getattr(event, "channel", ""), "timestamp": time.time(), "status": "delivered", } _messages.append(msg) _save_messages() logger.info("MeshCore channel msg [%s] from %s", msg["channel"], msg["sender"]) def _on_new_contact(event): """Handle new contact discovery.""" logger.info("MeshCore new contact: %s", getattr(event, "name", "unknown")) # Refresh contacts in background if _event_loop: asyncio.run_coroutine_threadsafe(_refresh_contacts(), _event_loop) def _on_connected(event): global _connected _connected = True logger.info("MeshCore companion connected") def _on_disconnected(event): global _connected _connected = False logger.warning("MeshCore companion disconnected") # --- Public API --- def get_status() -> dict: """Return MeshCore connection status.""" return { "connected": _connected, "device_info": _device_info, "contact_count": len(_contacts), "message_count": len(_messages), } def get_contacts() -> list[dict]: """Return known contacts.""" return list(_contacts) async def refresh_contacts() -> list[dict]: """Force refresh contacts from companion.""" await _refresh_contacts() return list(_contacts) async def send_message(contact_name: str, content: str) -> dict: """Send a direct message to a contact by name.""" if not _mc or not _connected: return {"id": "", "status": "failed", "error": "Not connected"} try: contact = _mc.get_contact_by_name(contact_name) if not contact: return {"id": "", "status": "failed", "error": f"Contact '{contact_name}' not found"} await _mc.commands.messaging.send_msg(contact, content) msg = { "id": str(uuid.uuid4()), "type": "direct", "direction": "outbound", "sender": _device_info.get("name", "self"), "sender_key": "", "content": content, "channel": None, "recipient": contact_name, "timestamp": time.time(), "status": "sent", } _messages.append(msg) _save_messages() return msg except Exception as e: logger.error("Failed to send message: %s", e) return {"id": "", "status": "failed", "error": str(e)} async def send_channel_message(channel_idx: int, content: str) -> dict: """Send a message to a channel by index.""" if not _mc or not _connected: return {"id": "", "status": "failed", "error": "Not connected"} try: await _mc.commands.messaging.send_chan_msg(channel_idx, content) msg = { "id": str(uuid.uuid4()), "type": "channel", "direction": "outbound", "sender": _device_info.get("name", "self"), "sender_key": "", "content": content, "channel": str(channel_idx), "timestamp": time.time(), "status": "sent", } _messages.append(msg) _save_messages() return msg except Exception as e: logger.error("Failed to send channel message: %s", e) return {"id": "", "status": "failed", "error": str(e)} async def send_advert() -> dict: """Send an advertisement to the mesh.""" if not _mc or not _connected: return {"advertised": False, "error": "Not connected"} try: await _mc.commands.device.send_advert() return {"advertised": True, "name": _device_info.get("name", "")} except Exception as e: return {"advertised": False, "error": str(e)} async def get_device_stats() -> dict: """Get radio and core stats from companion.""" if not _mc or not _connected: return {} stats = {} try: radio = await _mc.commands.device.get_stats_radio() if radio: stats["radio"] = {k: v for k, v in vars(radio).items() if not k.startswith("_")} if hasattr(radio, "__dict__") else str(radio) except Exception: pass try: core = await _mc.commands.device.get_stats_core() if core: stats["core"] = {k: v for k, v in vars(core).items() if not k.startswith("_")} if hasattr(core, "__dict__") else str(core) except Exception: pass try: bat = await _mc.commands.device.get_bat() if bat is not None: stats["battery"] = bat except Exception: pass return stats def get_messages(limit: int = 100, offset: int = 0) -> list[dict]: """Return stored messages.""" sorted_msgs = sorted(_messages, key=lambda m: m["timestamp"], reverse=True) return sorted_msgs[offset:offset + limit] def get_message_count() -> int: return len(_messages) async def disconnect(): """Disconnect from companion.""" global _connected if _mc: try: await _mc.disconnect() except Exception: pass _connected = False # --- Persistence --- def _load_messages(): global _messages if os.path.exists(MESHCORE_MESSAGES_FILE): try: with open(MESHCORE_MESSAGES_FILE, "r") as f: _messages = json.load(f) logger.info("Loaded %d MeshCore messages", len(_messages)) except (json.JSONDecodeError, OSError) as e: logger.warning("Could not load MeshCore messages: %s", e) _messages = [] def _save_messages(): try: os.makedirs(os.path.dirname(MESHCORE_MESSAGES_FILE), exist_ok=True) with open(MESHCORE_MESSAGES_FILE, "w") as f: json.dump(_messages, f) except OSError as e: logger.warning("Could not save MeshCore messages: %s", e)