import os import json import asyncio import logging from gmqtt import Client as MQTTClient from typing import Optional, Callable logger = logging.getLogger(__name__) class HomeAssistantMQTT: def __init__(self): # MQTT Configuration self.broker = os.getenv("MQTT_BROKER", "localhost") self.port = int(os.getenv("MQTT_PORT", "1883")) self.username = os.getenv("MQTT_USERNAME", None) self.password = os.getenv("MQTT_PASSWORD", None) self.client_id = os.getenv("MQTT_CLIENT_ID", "gatekeeper") # Home Assistant MQTT topics self.node_id = "gatekeeper" self.object_id = "gate" self.discovery_prefix = "homeassistant" self.state_topic = f"homeassistant/cover/{self.node_id}/{self.object_id}/state" self.command_topic = f"homeassistant/cover/{self.node_id}/{self.object_id}/command" self.config_topic = f"homeassistant/cover/{self.node_id}/{self.object_id}/config" self.availability_topic = f"homeassistant/cover/{self.node_id}/{self.object_id}/availability" self.client: Optional[MQTTClient] = None self.command_callback: Optional[Callable] = None self._connected = False self._reconnect_task: Optional[asyncio.Task] = None self._enabled = False self._event_callback: Optional[Callable] = None def set_event_callback(self, callback: Callable): """Set callback for logging events""" self._event_callback = callback async def _log_event(self, action: str, success: bool = True): """Log MQTT events if callback is set""" if self._event_callback: await self._event_callback(action, success) def enable(self, enabled: bool = True): """Enable or disable MQTT integration""" self._enabled = enabled if enabled and not self._connected and not self._reconnect_task: # Set up command handler if not already set from main import handle_mqtt_command # Import here to avoid circular import self.set_command_callback(handle_mqtt_command) # Start reconnection self._reconnect_task = asyncio.create_task(self._reconnect_loop()) asyncio.create_task(self._log_event("MQTT Enabled")) elif not enabled and self._reconnect_task: self._reconnect_task.cancel() self._reconnect_task = None asyncio.create_task(self.disconnect()) asyncio.create_task(self._log_event("MQTT Disabled")) async def _reconnect_loop(self): """Continuously try to reconnect when connection is lost""" while self._enabled: try: if not self._connected: await self.connect() await asyncio.sleep(5) # Wait before checking connection again except asyncio.CancelledError: break except Exception as e: logger.error(f"Reconnection attempt failed: {e}") await asyncio.sleep(5) # Wait before retrying def on_connect(self, client, flags, rc, properties): """Callback for when connection is established""" logger.info("Connected to MQTT broker") self._connected = True asyncio.create_task(self._log_event("MQTT Connected")) asyncio.create_task(self._post_connect()) def on_message(self, client, topic, payload, qos, properties): """Callback for when a message is received""" if topic == self.command_topic and self.command_callback: try: command = payload.decode() asyncio.create_task(self.command_callback(command)) except Exception as e: logger.error(f"Error processing command: {e}") def on_disconnect(self, client, packet, exc=None): """Callback for when connection is lost""" logger.info("Disconnected from MQTT broker") self._connected = False # Log disconnect event asyncio.create_task(self._log_event("MQTT Disconnected")) # Start reconnection if enabled if self._enabled and not self._reconnect_task: self._reconnect_task = asyncio.create_task(self._reconnect_loop()) async def _post_connect(self): """Tasks to run after connection is established""" try: # Send Home Assistant discovery configuration await self.publish_discovery_config() # Publish availability status await self.publish(self.availability_topic, "online", retain=True) # Subscribe to command topic await self.subscribe(self.command_topic) except Exception as e: logger.error(f"Error in post-connect tasks: {e}") async def connect(self): """Connect to MQTT broker""" try: self.client = MQTTClient(self.client_id) # Set callbacks self.client.on_connect = self.on_connect self.client.on_message = self.on_message self.client.on_disconnect = self.on_disconnect # Set credentials if provided if self.username and self.password: self.client.set_auth_credentials(self.username, self.password) # Connect to broker await self.client.connect(self.broker, self.port) logger.info("Initiating connection to MQTT broker") except Exception as e: logger.error(f"Failed to connect to MQTT broker: {e}") await self._log_event("MQTT Connection Failed", success=False) raise async def disconnect(self): """Disconnect from MQTT broker""" if self.client and self._connected: try: await self.publish(self.availability_topic, "offline", retain=True) await self.client.disconnect() except Exception as e: logger.error(f"Error during disconnect: {e}") async def publish(self, topic: str, payload: str, retain: bool = False): """Publish a message to a topic""" if self.client and self._connected: try: await self.client.publish(topic, payload, retain=retain) except Exception as e: logger.error(f"Failed to publish message: {e}") async def subscribe(self, topic: str): """Subscribe to a topic""" if self.client and self._connected: try: await self.client.subscribe([(topic, 0)]) except Exception as e: logger.error(f"Failed to subscribe to topic: {e}") async def publish_discovery_config(self): """Publish Home Assistant MQTT discovery configuration""" config = { "name": "Gate", "unique_id": f"{self.node_id}_{self.object_id}", "device_class": "gate", "command_topic": self.command_topic, "state_topic": self.state_topic, "availability_topic": self.availability_topic, "payload_available": "online", "payload_not_available": "offline", "payload_open": "OPEN", "payload_close": "CLOSE", "payload_stop": "STOP", "state_open": "open", "state_closed": "closed", "state_opening": "opening", "state_closing": "closing", "device": { "identifiers": [self.node_id], "name": "Gate Keeper", "model": "DLB Gate Controller", "manufacturer": "Athena Networks", } } try: await self.publish(self.config_topic, json.dumps(config), retain=True) except Exception as e: logger.error(f"Failed to publish discovery config: {e}") def set_command_callback(self, callback: Callable): """Set callback for handling commands""" self.command_callback = callback async def publish_state(self, state: str): """Publish gate state""" await self.publish(self.state_topic, state)