265 lines
10 KiB
Python
265 lines
10 KiB
Python
import os
|
|
import json
|
|
import asyncio
|
|
from typing import Optional, Callable, Union
|
|
from gmqtt import Client as MQTTClient
|
|
import logging
|
|
|
|
# Get logger
|
|
logger = logging.getLogger("gatekeeper")
|
|
|
|
class HomeAssistantMQTT:
|
|
def __init__(self):
|
|
# MQTT Configuration - will be updated via update_settings
|
|
self.broker = "localhost"
|
|
self.port = 1883
|
|
self.username = None
|
|
self.password = None
|
|
self.client_id = "gatekeeper"
|
|
|
|
# Home Assistant MQTT topics
|
|
self.node_id = "gatekeeper"
|
|
self.object_id = "gate"
|
|
self.discovery_prefix = "homeassistant"
|
|
|
|
self.state_topic = f"homeassistant/cover/{self.node_id}/{self.object_id}/state"
|
|
self.command_topic = f"homeassistant/cover/{self.node_id}/{self.object_id}/command"
|
|
self.config_topic = f"homeassistant/cover/{self.node_id}/{self.object_id}/config"
|
|
self.availability_topic = f"homeassistant/cover/{self.node_id}/{self.object_id}/availability"
|
|
|
|
logger.debug(f"MQTT topics configured - State: {self.state_topic}, Command: {self.command_topic}")
|
|
|
|
self.client: Optional[MQTTClient] = None
|
|
self.command_callback: Optional[Callable] = None
|
|
self._connected = False
|
|
self._reconnect_task: Optional[asyncio.Task] = None
|
|
self._enabled = False
|
|
self._event_callback: Optional[Callable] = None
|
|
|
|
def set_event_callback(self, callback: Callable):
|
|
"""Set callback for logging events"""
|
|
self._event_callback = callback
|
|
|
|
async def _log_event(self, action: str, success: bool = True):
|
|
"""Log MQTT events if callback is set"""
|
|
if self._event_callback:
|
|
await self._event_callback(action, success)
|
|
|
|
def enable(self, enabled: bool = True):
|
|
"""Enable or disable MQTT integration"""
|
|
self._enabled = enabled
|
|
if enabled and not self._connected and not self._reconnect_task:
|
|
# Set up command handler if not already set
|
|
from main import handle_mqtt_command # Import here to avoid circular import
|
|
self.set_command_callback(handle_mqtt_command)
|
|
# Start reconnection
|
|
self._reconnect_task = asyncio.create_task(self._reconnect_loop())
|
|
asyncio.create_task(self._log_event("MQTT Enabled"))
|
|
elif not enabled and self._reconnect_task:
|
|
self._reconnect_task.cancel()
|
|
self._reconnect_task = None
|
|
asyncio.create_task(self.disconnect())
|
|
asyncio.create_task(self._log_event("MQTT Disabled"))
|
|
|
|
async def _reconnect_loop(self):
|
|
"""Loop to handle reconnection"""
|
|
while self._enabled:
|
|
try:
|
|
if not self._connected:
|
|
logger.info("Attempting to reconnect to MQTT broker...")
|
|
await self.connect()
|
|
await asyncio.sleep(5) # Wait before checking connection status
|
|
else:
|
|
await asyncio.sleep(1) # Regular check interval
|
|
except Exception as e:
|
|
logger.error(f"Error in reconnect loop: {e}")
|
|
await asyncio.sleep(5) # Wait before retry
|
|
|
|
def on_connect(self, client, flags, rc, properties):
|
|
"""Callback when connected to MQTT broker"""
|
|
logger.info(f"Connected to MQTT broker (rc: {rc})")
|
|
asyncio.create_task(self._post_connect())
|
|
|
|
def on_disconnect(self, client, packet, exc=None):
|
|
"""Callback when disconnected from MQTT broker"""
|
|
self._connected = False
|
|
if exc:
|
|
logger.error(f"Disconnected from MQTT broker due to error: {exc}")
|
|
else:
|
|
logger.warning("Disconnected from MQTT broker")
|
|
|
|
def on_message(self, client, topic, payload, qos, properties):
|
|
"""Callback when message received"""
|
|
try:
|
|
decoded_payload = payload.decode()
|
|
logger.info(f"MQTT message received - Topic: {topic}, Payload: {decoded_payload}, QoS: {qos}")
|
|
|
|
# Handle command messages
|
|
if topic == self.command_topic and self.command_callback:
|
|
command = decoded_payload.upper()
|
|
logger.debug(f"Processing command: {command}")
|
|
# Create task for async callback
|
|
asyncio.create_task(self.command_callback(command))
|
|
else:
|
|
logger.debug(f"Message received on non-command topic: {topic}")
|
|
except Exception as e:
|
|
logger.error(f"Error handling message: {e}", exc_info=True)
|
|
|
|
def on_subscribe(self, client, mid, qos, properties):
|
|
"""Callback when subscription confirmed"""
|
|
logger.debug(f"Subscription confirmed (mid: {mid}, qos: {qos})")
|
|
|
|
async def connect(self):
|
|
"""Connect to MQTT broker"""
|
|
if self.client and self._connected:
|
|
logger.debug("Already connected to MQTT broker")
|
|
return
|
|
|
|
try:
|
|
# Create new client if needed
|
|
if not self.client:
|
|
self.client = MQTTClient(self.client_id)
|
|
self.client.on_connect = self.on_connect
|
|
self.client.on_disconnect = self.on_disconnect
|
|
self.client.on_message = self.on_message
|
|
self.client.on_subscribe = self.on_subscribe
|
|
|
|
if self.username and self.password:
|
|
self.client.set_auth_credentials(self.username, self.password)
|
|
|
|
logger.info(f"Connecting to MQTT broker {self.broker}:{self.port}...")
|
|
await self.client.connect(self.broker, self.port)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to connect to MQTT broker: {e}")
|
|
if self.client:
|
|
await self.client.disconnect()
|
|
self.client = None
|
|
self._connected = False
|
|
|
|
async def _post_connect(self):
|
|
"""Post connection setup - subscribe to topics"""
|
|
logger.info("Connected to MQTT broker")
|
|
self._connected = True
|
|
try:
|
|
# Subscribe to command topic
|
|
self.client.subscribe(self.command_topic, qos=1)
|
|
logger.info(f"Subscribed to command topic: {self.command_topic}")
|
|
|
|
# Publish discovery config
|
|
config = {
|
|
"name": "Gate",
|
|
"unique_id": f"{self.node_id}_{self.object_id}",
|
|
"device_class": "gate",
|
|
"command_topic": self.command_topic,
|
|
"state_topic": self.state_topic,
|
|
"availability_topic": self.availability_topic,
|
|
"payload_open": "OPEN",
|
|
"payload_close": "CLOSE",
|
|
"state_open": "open",
|
|
"state_closed": "closed"
|
|
}
|
|
|
|
self.client.publish(
|
|
self.config_topic,
|
|
json.dumps(config),
|
|
qos=1,
|
|
retain=True
|
|
)
|
|
logger.info("Published Home Assistant discovery config")
|
|
|
|
# Publish initial availability and state
|
|
self.client.publish(
|
|
self.availability_topic,
|
|
"online",
|
|
qos=1,
|
|
retain=True
|
|
)
|
|
logger.info("Published availability: online")
|
|
|
|
await self.publish_state("closed")
|
|
logger.info("Published initial state: closed")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed in post-connect setup: {e}", exc_info=True)
|
|
await self.client.disconnect()
|
|
|
|
async def disconnect(self):
|
|
"""Disconnect from MQTT broker"""
|
|
if self.client and self._connected:
|
|
try:
|
|
await self.client.publish(
|
|
self.availability_topic,
|
|
"offline",
|
|
qos=1,
|
|
retain=True
|
|
)
|
|
await self.client.disconnect()
|
|
except Exception as e:
|
|
logger.error(f"Error during disconnect: {e}")
|
|
|
|
async def publish(self, topic: str, payload: str, retain: bool = False):
|
|
"""Publish a message to a topic"""
|
|
if self.client and self._connected:
|
|
try:
|
|
self.client.publish(topic, payload, retain=retain)
|
|
except Exception as e:
|
|
logger.error(f"Failed to publish message: {e}")
|
|
|
|
async def subscribe(self, topic: str):
|
|
"""Subscribe to a topic"""
|
|
if self.client and self._connected:
|
|
try:
|
|
self.client.subscribe(topic, qos=1)
|
|
except Exception as e:
|
|
logger.error(f"Failed to subscribe to topic: {e}")
|
|
|
|
async def publish_state(self, state: Union[bool, str]):
|
|
"""Publish state to MQTT"""
|
|
if not self._connected:
|
|
logger.warning("Cannot publish state - not connected to MQTT broker")
|
|
return
|
|
|
|
try:
|
|
if isinstance(state, bool):
|
|
state_str = "open" if state else "closed"
|
|
else:
|
|
state_str = state.lower()
|
|
|
|
self.client.publish(
|
|
self.state_topic,
|
|
state_str,
|
|
qos=1,
|
|
retain=True
|
|
)
|
|
logger.debug(f"Published state: {state_str}")
|
|
|
|
if self._event_callback:
|
|
self._event_callback(f"Published state: {state_str}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to publish state: {e}")
|
|
|
|
def set_command_callback(self, callback: Callable):
|
|
"""Set callback for handling commands"""
|
|
self.command_callback = callback
|
|
|
|
def is_connected(self) -> bool:
|
|
"""Return current connection status"""
|
|
return self._connected
|
|
|
|
def update_settings(self, settings):
|
|
"""Update MQTT settings"""
|
|
logger.info(f"Updating MQTT settings - Broker: {settings.broker}:{settings.port}")
|
|
self.broker = settings.broker
|
|
self.port = int(settings.port)
|
|
self.username = settings.username if settings.username else None
|
|
self.password = settings.password if settings.password else None
|
|
self.client_id = settings.clientId
|
|
|
|
# If we're connected, disconnect and reconnect with new settings
|
|
if self._connected:
|
|
logger.info("Reconnecting with new settings...")
|
|
asyncio.create_task(self.disconnect())
|
|
# Connection will be re-established by reconnect loop if enabled
|