dlbGatekeeper/backend/mqtt_integration.py

183 lines
7.3 KiB
Python

import os
import json
import asyncio
import logging
from gmqtt import Client as MQTTClient
from typing import Optional, Callable
logger = logging.getLogger(__name__)
class HomeAssistantMQTT:
def __init__(self):
# MQTT Configuration
self.broker = os.getenv("MQTT_BROKER", "localhost")
self.port = int(os.getenv("MQTT_PORT", "1883"))
self.username = os.getenv("MQTT_USERNAME", None)
self.password = os.getenv("MQTT_PASSWORD", None)
self.client_id = os.getenv("MQTT_CLIENT_ID", "gatekeeper")
# Home Assistant MQTT topics
self.node_id = "gatekeeper"
self.object_id = "gate"
self.discovery_prefix = "homeassistant"
self.state_topic = f"homeassistant/cover/{self.node_id}/{self.object_id}/state"
self.command_topic = f"homeassistant/cover/{self.node_id}/{self.object_id}/command"
self.config_topic = f"homeassistant/cover/{self.node_id}/{self.object_id}/config"
self.availability_topic = f"homeassistant/cover/{self.node_id}/{self.object_id}/availability"
self.client: Optional[MQTTClient] = None
self.command_callback: Optional[Callable] = None
self._connected = False
self._reconnect_task: Optional[asyncio.Task] = None
self._enabled = False
def enable(self, enabled: bool = True):
"""Enable or disable MQTT integration"""
self._enabled = enabled
if enabled and not self._connected and not self._reconnect_task:
# Set up command handler if not already set
from main import handle_mqtt_command # Import here to avoid circular import
self.set_command_callback(handle_mqtt_command)
# Start reconnection
self._reconnect_task = asyncio.create_task(self._reconnect_loop())
elif not enabled and self._reconnect_task:
self._reconnect_task.cancel()
self._reconnect_task = None
asyncio.create_task(self.disconnect())
async def _reconnect_loop(self):
"""Continuously try to reconnect when connection is lost"""
while self._enabled:
try:
if not self._connected:
await self.connect()
await asyncio.sleep(5) # Wait before checking connection again
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"Reconnection attempt failed: {e}")
await asyncio.sleep(5) # Wait before retrying
def on_connect(self, client, flags, rc, properties):
"""Callback for when connection is established"""
logger.info("Connected to MQTT broker")
self._connected = True
asyncio.create_task(self._post_connect())
def on_message(self, client, topic, payload, qos, properties):
"""Callback for when a message is received"""
if topic == self.command_topic and self.command_callback:
try:
command = payload.decode()
asyncio.create_task(self.command_callback(command))
except Exception as e:
logger.error(f"Error processing command: {e}")
def on_disconnect(self, client, packet, exc=None):
"""Callback for when connection is lost"""
logger.info("Disconnected from MQTT broker")
self._connected = False
# Start reconnection if enabled
if self._enabled and not self._reconnect_task:
self._reconnect_task = asyncio.create_task(self._reconnect_loop())
async def _post_connect(self):
"""Tasks to run after connection is established"""
try:
# Send Home Assistant discovery configuration
await self.publish_discovery_config()
# Publish availability status
await self.publish(self.availability_topic, "online", retain=True)
# Subscribe to command topic
await self.subscribe(self.command_topic)
except Exception as e:
logger.error(f"Error in post-connect tasks: {e}")
async def connect(self):
"""Connect to MQTT broker"""
try:
self.client = MQTTClient(self.client_id)
# Set callbacks
self.client.on_connect = self.on_connect
self.client.on_message = self.on_message
self.client.on_disconnect = self.on_disconnect
# Set credentials if provided
if self.username and self.password:
self.client.set_auth_credentials(self.username, self.password)
# Connect to broker
await self.client.connect(self.broker, self.port)
logger.info("Initiating connection to MQTT broker")
except Exception as e:
logger.error(f"Failed to connect to MQTT broker: {e}")
raise
async def disconnect(self):
"""Disconnect from MQTT broker"""
if self.client and self._connected:
try:
await self.publish(self.availability_topic, "offline", retain=True)
await self.client.disconnect()
except Exception as e:
logger.error(f"Error during disconnect: {e}")
async def publish(self, topic: str, payload: str, retain: bool = False):
"""Publish a message to a topic"""
if self.client and self._connected:
try:
await self.client.publish(topic, payload, retain=retain)
except Exception as e:
logger.error(f"Failed to publish message: {e}")
async def subscribe(self, topic: str):
"""Subscribe to a topic"""
if self.client and self._connected:
try:
await self.client.subscribe([(topic, 0)])
except Exception as e:
logger.error(f"Failed to subscribe to topic: {e}")
async def publish_discovery_config(self):
"""Publish Home Assistant MQTT discovery configuration"""
config = {
"name": "Gate",
"unique_id": f"{self.node_id}_{self.object_id}",
"device_class": "gate",
"command_topic": self.command_topic,
"state_topic": self.state_topic,
"availability_topic": self.availability_topic,
"payload_available": "online",
"payload_not_available": "offline",
"payload_open": "OPEN",
"payload_close": "CLOSE",
"payload_stop": "STOP",
"state_open": "open",
"state_closed": "closed",
"state_opening": "opening",
"state_closing": "closing",
"device": {
"identifiers": [self.node_id],
"name": "Gate Keeper",
"model": "DLB Gate Controller",
"manufacturer": "Athena Networks",
}
}
try:
await self.publish(self.config_topic, json.dumps(config), retain=True)
except Exception as e:
logger.error(f"Failed to publish discovery config: {e}")
def set_command_callback(self, callback: Callable):
"""Set callback for handling commands"""
self.command_callback = callback
async def publish_state(self, state: str):
"""Publish gate state"""
await self.publish(self.state_topic, state)