fix: mqtt connection handling

- Added proper MQTT callback handlers
- Made on_connect trigger _post_connect asynchronously
- Improved connection error handling
- Added sleep after successful connection
- Added sleep between retries
This commit is contained in:
Josh Finlay 2025-01-08 10:18:42 +10:00
parent a065964bd1
commit 597d0b9204
1 changed files with 55 additions and 43 deletions

View File

@ -62,40 +62,74 @@ class HomeAssistantMQTT:
asyncio.create_task(self._log_event("MQTT Disabled")) asyncio.create_task(self._log_event("MQTT Disabled"))
async def _reconnect_loop(self): async def _reconnect_loop(self):
"""Continuously try to reconnect when connection is lost""" """Loop to handle reconnection"""
while self._enabled: while self._enabled:
try: try:
if not self._connected: if not self._connected:
logger.info("Attempting to reconnect to MQTT broker...") logger.info("Attempting to reconnect to MQTT broker...")
await self.connect() await self.connect()
await asyncio.sleep(5) await asyncio.sleep(5) # Wait before checking connection status
else:
await asyncio.sleep(1) # Regular check interval
except Exception as e: except Exception as e:
logger.error(f"Reconnection attempt failed: {e}") logger.error(f"Error in reconnect loop: {e}")
await asyncio.sleep(5) await asyncio.sleep(5) # Wait before retry
def on_connect(self, client, flags, rc, properties): def on_connect(self, client, flags, rc, properties):
"""Callback for when connection is established""" """Callback when connected to MQTT broker"""
logger.info(f"Connected to MQTT broker (rc: {rc})") logger.info(f"Connected to MQTT broker (rc: {rc})")
self._connected = True
asyncio.create_task(self._post_connect()) asyncio.create_task(self._post_connect())
def on_message(self, client, topic, payload, qos, properties):
"""Callback for when a message is received"""
try:
logger.debug(f"MQTT message received - Topic: {topic}, Payload: {payload}")
if topic == self.command_topic and self.command_callback:
command = payload.decode()
logger.info(f"MQTT command received: {command}")
asyncio.create_task(self.command_callback(command))
except Exception as e:
logger.error(f"Error processing MQTT message: {e}", exc_info=True)
def on_disconnect(self, client, packet, exc=None): def on_disconnect(self, client, packet, exc=None):
"""Callback for when connection is lost""" """Callback when disconnected from MQTT broker"""
logger.warning(f"Disconnected from MQTT broker{f': {exc}' if exc else ''}")
self._connected = False self._connected = False
if self._enabled and not self._reconnect_task: if exc:
self._reconnect_task = asyncio.create_task(self._reconnect_loop()) logger.error(f"Disconnected from MQTT broker due to error: {exc}")
else:
logger.warning("Disconnected from MQTT broker")
def on_message(self, client, topic, payload, qos, properties):
"""Callback when message received"""
try:
# Handle command messages
if topic == self.command_topic and self.command_callback:
command = payload.decode().upper()
logger.debug(f"Received command: {command}")
self.command_callback(command)
except Exception as e:
logger.error(f"Error handling message: {e}")
def on_subscribe(self, client, mid, qos, properties):
"""Callback when subscription confirmed"""
logger.debug(f"Subscription confirmed (mid: {mid}, qos: {qos})")
async def connect(self):
"""Connect to MQTT broker"""
if self.client and self._connected:
logger.debug("Already connected to MQTT broker")
return
try:
# Create new client if needed
if not self.client:
self.client = MQTTClient(self.client_id)
self.client.on_connect = self.on_connect
self.client.on_disconnect = self.on_disconnect
self.client.on_message = self.on_message
self.client.on_subscribe = self.on_subscribe
if self.username and self.password:
self.client.set_auth_credentials(self.username, self.password)
logger.info(f"Connecting to MQTT broker {self.broker}:{self.port}...")
await self.client.connect(self.broker, self.port)
except Exception as e:
logger.error(f"Failed to connect to MQTT broker: {e}")
if self.client:
await self.client.disconnect()
self.client = None
self._connected = False
async def _post_connect(self): async def _post_connect(self):
"""Post connection setup - subscribe to topics""" """Post connection setup - subscribe to topics"""
@ -144,28 +178,6 @@ class HomeAssistantMQTT:
logger.error(f"Failed in post-connect setup: {e}", exc_info=True) logger.error(f"Failed in post-connect setup: {e}", exc_info=True)
await self.client.disconnect() await self.client.disconnect()
async def connect(self):
"""Connect to MQTT broker"""
try:
self.client = MQTTClient(self.client_id)
# Set callbacks
self.client.on_connect = self.on_connect
self.client.on_message = self.on_message
self.client.on_disconnect = self.on_disconnect
# Set credentials if provided
if self.username and self.password:
self.client.set_auth_credentials(self.username, self.password)
logger.info(f"Connecting to MQTT broker {self.broker}:{self.port}...")
await self.client.connect(self.broker, self.port)
logger.info("Initiating connection to MQTT broker")
except Exception as e:
logger.error(f"Failed to connect to MQTT broker: {e}")
await self._log_event("MQTT Connection Failed", success=False)
raise
async def disconnect(self): async def disconnect(self):
"""Disconnect from MQTT broker""" """Disconnect from MQTT broker"""
if self.client and self._connected: if self.client and self._connected: