import os import json import asyncio from typing import Optional, Callable, Union from gmqtt import Client as MQTTClient import logging # Get logger logger = logging.getLogger("gatekeeper") class HomeAssistantMQTT: def __init__(self): # MQTT Configuration - will be updated via update_settings self.broker = "localhost" self.port = 1883 self.username = None self.password = None self.client_id = "gatekeeper" # Home Assistant MQTT topics self.node_id = "gatekeeper" self.object_id = "gate" self.discovery_prefix = "homeassistant" self.state_topic = f"homeassistant/cover/{self.node_id}/{self.object_id}/state" self.command_topic = f"homeassistant/cover/{self.node_id}/{self.object_id}/command" self.config_topic = f"homeassistant/cover/{self.node_id}/{self.object_id}/config" self.availability_topic = f"homeassistant/cover/{self.node_id}/{self.object_id}/availability" logger.debug(f"MQTT topics configured - State: {self.state_topic}, Command: {self.command_topic}") self.client: Optional[MQTTClient] = None self.command_callback: Optional[Callable] = None self._connected = False self._reconnect_task: Optional[asyncio.Task] = None self._enabled = False self._event_callback: Optional[Callable] = None def set_event_callback(self, callback: Callable): """Set callback for logging events""" self._event_callback = callback async def _log_event(self, action: str, success: bool = True): """Log MQTT events if callback is set""" if self._event_callback: await self._event_callback(action, success) def enable(self, enabled: bool = True): """Enable or disable MQTT integration""" self._enabled = enabled if enabled and not self._connected and not self._reconnect_task: # Set up command handler if not already set from main import handle_mqtt_command # Import here to avoid circular import self.set_command_callback(handle_mqtt_command) # Start reconnection self._reconnect_task = asyncio.create_task(self._reconnect_loop()) asyncio.create_task(self._log_event("MQTT Enabled")) elif not enabled and self._reconnect_task: self._reconnect_task.cancel() self._reconnect_task = None asyncio.create_task(self.disconnect()) asyncio.create_task(self._log_event("MQTT Disabled")) async def _reconnect_loop(self): """Continuously try to reconnect when connection is lost""" while self._enabled: try: if not self._connected: logger.info("Attempting to reconnect to MQTT broker...") await self.connect() await asyncio.sleep(5) except Exception as e: logger.error(f"Reconnection attempt failed: {e}") await asyncio.sleep(5) def on_connect(self, client, flags, rc, properties): """Callback for when connection is established""" logger.info(f"Connected to MQTT broker (rc: {rc})") self._connected = True asyncio.create_task(self._post_connect()) def on_message(self, client, topic, payload, qos, properties): """Callback for when a message is received""" try: logger.debug(f"MQTT message received - Topic: {topic}, Payload: {payload}") if topic == self.command_topic and self.command_callback: command = payload.decode() logger.info(f"MQTT command received: {command}") asyncio.create_task(self.command_callback(command)) except Exception as e: logger.error(f"Error processing MQTT message: {e}", exc_info=True) def on_disconnect(self, client, packet, exc=None): """Callback for when connection is lost""" logger.warning(f"Disconnected from MQTT broker{f': {exc}' if exc else ''}") self._connected = False if self._enabled and not self._reconnect_task: self._reconnect_task = asyncio.create_task(self._reconnect_loop()) async def _post_connect(self): """Post connection setup - subscribe to topics""" logger.info("Connected to MQTT broker") self._connected = True try: # Subscribe to command topic self.client.subscribe(self.command_topic, qos=1) logger.info(f"Subscribed to command topic: {self.command_topic}") # Publish discovery config config = { "name": "Gate", "unique_id": f"{self.node_id}_{self.object_id}", "device_class": "gate", "command_topic": self.command_topic, "state_topic": self.state_topic, "availability_topic": self.availability_topic, "payload_open": "OPEN", "payload_close": "CLOSE", "state_open": "open", "state_closed": "closed" } self.client.publish( self.config_topic, json.dumps(config), qos=1, retain=True ) logger.info("Published Home Assistant discovery config") # Publish initial availability and state self.client.publish( self.availability_topic, "online", qos=1, retain=True ) logger.info("Published availability: online") await self.publish_state("closed") logger.info("Published initial state: closed") except Exception as e: logger.error(f"Failed in post-connect setup: {e}", exc_info=True) await self.client.disconnect() async def connect(self): """Connect to MQTT broker""" try: self.client = MQTTClient(self.client_id) # Set callbacks self.client.on_connect = self.on_connect self.client.on_message = self.on_message self.client.on_disconnect = self.on_disconnect # Set credentials if provided if self.username and self.password: self.client.set_auth_credentials(self.username, self.password) logger.info(f"Connecting to MQTT broker {self.broker}:{self.port}...") await self.client.connect(self.broker, self.port) logger.info("Initiating connection to MQTT broker") except Exception as e: logger.error(f"Failed to connect to MQTT broker: {e}") await self._log_event("MQTT Connection Failed", success=False) raise async def disconnect(self): """Disconnect from MQTT broker""" if self.client and self._connected: try: await self.client.publish( self.availability_topic, "offline", qos=1, retain=True ) await self.client.disconnect() except Exception as e: logger.error(f"Error during disconnect: {e}") async def publish(self, topic: str, payload: str, retain: bool = False): """Publish a message to a topic""" if self.client and self._connected: try: self.client.publish(topic, payload, retain=retain) except Exception as e: logger.error(f"Failed to publish message: {e}") async def subscribe(self, topic: str): """Subscribe to a topic""" if self.client and self._connected: try: self.client.subscribe(topic, qos=1) except Exception as e: logger.error(f"Failed to subscribe to topic: {e}") async def publish_state(self, state: Union[bool, str]): """Publish state to MQTT""" if not self._connected: logger.warning("Cannot publish state - not connected to MQTT broker") return try: if isinstance(state, bool): state_str = "open" if state else "closed" else: state_str = state.lower() self.client.publish( self.state_topic, state_str, qos=1, retain=True ) logger.debug(f"Published state: {state_str}") if self._event_callback: self._event_callback(f"Published state: {state_str}") except Exception as e: logger.error(f"Failed to publish state: {e}") def set_command_callback(self, callback: Callable): """Set callback for handling commands""" self.command_callback = callback def is_connected(self) -> bool: """Return current connection status""" return self._connected def update_settings(self, settings): """Update MQTT settings""" logger.info(f"Updating MQTT settings - Broker: {settings.broker}:{settings.port}") self.broker = settings.broker self.port = int(settings.port) self.username = settings.username if settings.username else None self.password = settings.password if settings.password else None self.client_id = settings.clientId # If we're connected, disconnect and reconnect with new settings if self._connected: logger.info("Reconnecting with new settings...") asyncio.create_task(self.disconnect()) # Connection will be re-established by reconnect loop if enabled