diff --git a/backend/mqtt_integration.py b/backend/mqtt_integration.py index e3b84a6..8635170 100644 --- a/backend/mqtt_integration.py +++ b/backend/mqtt_integration.py @@ -62,40 +62,74 @@ class HomeAssistantMQTT: asyncio.create_task(self._log_event("MQTT Disabled")) async def _reconnect_loop(self): - """Continuously try to reconnect when connection is lost""" + """Loop to handle reconnection""" while self._enabled: try: if not self._connected: logger.info("Attempting to reconnect to MQTT broker...") await self.connect() - await asyncio.sleep(5) + await asyncio.sleep(5) # Wait before checking connection status + else: + await asyncio.sleep(1) # Regular check interval except Exception as e: - logger.error(f"Reconnection attempt failed: {e}") - await asyncio.sleep(5) + logger.error(f"Error in reconnect loop: {e}") + await asyncio.sleep(5) # Wait before retry def on_connect(self, client, flags, rc, properties): - """Callback for when connection is established""" + """Callback when connected to MQTT broker""" logger.info(f"Connected to MQTT broker (rc: {rc})") - self._connected = True asyncio.create_task(self._post_connect()) - def on_message(self, client, topic, payload, qos, properties): - """Callback for when a message is received""" - try: - logger.debug(f"MQTT message received - Topic: {topic}, Payload: {payload}") - if topic == self.command_topic and self.command_callback: - command = payload.decode() - logger.info(f"MQTT command received: {command}") - asyncio.create_task(self.command_callback(command)) - except Exception as e: - logger.error(f"Error processing MQTT message: {e}", exc_info=True) - def on_disconnect(self, client, packet, exc=None): - """Callback for when connection is lost""" - logger.warning(f"Disconnected from MQTT broker{f': {exc}' if exc else ''}") + """Callback when disconnected from MQTT broker""" self._connected = False - if self._enabled and not self._reconnect_task: - self._reconnect_task = asyncio.create_task(self._reconnect_loop()) + if exc: + logger.error(f"Disconnected from MQTT broker due to error: {exc}") + else: + logger.warning("Disconnected from MQTT broker") + + def on_message(self, client, topic, payload, qos, properties): + """Callback when message received""" + try: + # Handle command messages + if topic == self.command_topic and self.command_callback: + command = payload.decode().upper() + logger.debug(f"Received command: {command}") + self.command_callback(command) + except Exception as e: + logger.error(f"Error handling message: {e}") + + def on_subscribe(self, client, mid, qos, properties): + """Callback when subscription confirmed""" + logger.debug(f"Subscription confirmed (mid: {mid}, qos: {qos})") + + async def connect(self): + """Connect to MQTT broker""" + if self.client and self._connected: + logger.debug("Already connected to MQTT broker") + return + + try: + # Create new client if needed + if not self.client: + self.client = MQTTClient(self.client_id) + self.client.on_connect = self.on_connect + self.client.on_disconnect = self.on_disconnect + self.client.on_message = self.on_message + self.client.on_subscribe = self.on_subscribe + + if self.username and self.password: + self.client.set_auth_credentials(self.username, self.password) + + logger.info(f"Connecting to MQTT broker {self.broker}:{self.port}...") + await self.client.connect(self.broker, self.port) + + except Exception as e: + logger.error(f"Failed to connect to MQTT broker: {e}") + if self.client: + await self.client.disconnect() + self.client = None + self._connected = False async def _post_connect(self): """Post connection setup - subscribe to topics""" @@ -144,28 +178,6 @@ class HomeAssistantMQTT: logger.error(f"Failed in post-connect setup: {e}", exc_info=True) await self.client.disconnect() - async def connect(self): - """Connect to MQTT broker""" - try: - self.client = MQTTClient(self.client_id) - - # Set callbacks - self.client.on_connect = self.on_connect - self.client.on_message = self.on_message - self.client.on_disconnect = self.on_disconnect - - # Set credentials if provided - if self.username and self.password: - self.client.set_auth_credentials(self.username, self.password) - - logger.info(f"Connecting to MQTT broker {self.broker}:{self.port}...") - await self.client.connect(self.broker, self.port) - logger.info("Initiating connection to MQTT broker") - except Exception as e: - logger.error(f"Failed to connect to MQTT broker: {e}") - await self._log_event("MQTT Connection Failed", success=False) - raise - async def disconnect(self): """Disconnect from MQTT broker""" if self.client and self._connected: