From 9046cbca1d1adb595890a478416ac4c933f9d683 Mon Sep 17 00:00:00 2001 From: Josh Finlay Date: Wed, 8 Jan 2025 09:11:39 +1000 Subject: [PATCH] refactor: improve database operations and settings management - Added dedicated database operation functions - Improved settings validation and updates - Added proper state machine for gate status - Added MQTT error handling - Added startup state validation - Fixed partial settings updates --- backend/main.py | 597 +++++++++++++++++++++++++++++------------------- 1 file changed, 368 insertions(+), 229 deletions(-) diff --git a/backend/main.py b/backend/main.py index 10da112..2437c46 100644 --- a/backend/main.py +++ b/backend/main.py @@ -13,6 +13,7 @@ from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel from typing import Optional, Dict, Any, List, Union import RPi.GPIO as GPIO +from enum import Enum, auto from mqtt_integration import HomeAssistantMQTT @@ -37,8 +38,8 @@ class MQTTSettings(BaseModel): enabled: bool = False class GPIOSettings(BaseModel): - gatePin: int = 17 # Default GPIO pin for gate control - statusPin: int = 27 # Default GPIO pin for gate status + gatePin: int = 15 # Relay control pin + statusPin: int = 7 # Gate open status pin class LoggingSettings(BaseModel): level: str = "WARNING" # Default to WARNING level @@ -52,9 +53,43 @@ class Settings(BaseModel): gpio: GPIOSettings = GPIOSettings() logging: LoggingSettings = LoggingSettings() -class GateStatus(BaseModel): - isOpen: bool - lastChanged: str +class GateState(Enum): + """Gate state enumeration""" + UNKNOWN = auto() + OPEN = auto() + CLOSED = auto() + TRANSITIONING = auto() + +class GateStatus: + """Gate status tracking""" + def __init__(self): + self.state = GateState.UNKNOWN + self.last_change = datetime.now() + self.transition_start = None + + def update(self, is_open: bool) -> bool: + """Update state and return True if state changed""" + now = datetime.now() + new_state = GateState.OPEN if is_open else GateState.CLOSED + + if new_state != self.state: + self.state = new_state + self.last_change = now + return True + return False + + @property + def is_open(self) -> bool: + """Return True if gate is open""" + return self.state == GateState.OPEN + + @property + def last_changed(self) -> str: + """Return ISO formatted last change time""" + return self.last_change.isoformat() + +# Initialize gate status +gate_status = GateStatus() # Configure logging def setup_logging(settings: Settings): @@ -121,16 +156,77 @@ def get_db(): """Get a new database connection""" return DBConnection() +# Database operations +async def add_event(timestamp: str, action: str, source: str, success: bool = True): + """Add an event to the database""" + async with get_db() as db: + await db.execute( + "INSERT INTO events (timestamp, action, source, success) VALUES (?, ?, ?, ?)", + (timestamp, action, source, success) + ) + await db.commit() + +async def add_gate_status(timestamp: str): + """Add a gate status change to the database""" + async with get_db() as db: + await db.execute( + "INSERT INTO gate_status (timestamp) VALUES (?)", + (timestamp,) + ) + await db.commit() + +async def update_settings(settings_update: Dict[str, Any]): + """Update settings in the database""" + async with get_db() as db: + # Update each setting in the database + for key, value in settings_update.items(): + await db.execute( + "INSERT OR REPLACE INTO settings (key, value) VALUES (?, ?)", + (key, json.dumps(value)) + ) + await db.commit() + +async def get_settings(): + """Get current settings from the database""" + async with get_db() as db: + cursor = await db.execute("SELECT key, value FROM settings") + rows = await cursor.fetchall() + settings = {} + for key, value in rows: + settings[key] = json.loads(value) + return settings + +async def get_events(limit: int = 10, offset: int = 0): + """Get recent gate events with pagination""" + async with get_db() as db: + db.row_factory = aiosqlite.Row + + # Get total count + cursor = await db.execute("SELECT COUNT(*) as count FROM events") + row = await cursor.fetchone() + total_count = row['count'] + + # Get paginated events + cursor = await db.execute( + """ + SELECT * FROM events + ORDER BY timestamp DESC + LIMIT ? OFFSET ? + """, + (limit, offset) + ) + events = await cursor.fetchall() + return { + "events": [dict(event) for event in events], + "total": total_count, + "hasMore": (offset + limit) < total_count + } + # Set up MQTT event logging async def log_mqtt_event(action: str, success: bool = True): """Log MQTT events to the database and log file""" logger.info(f"MQTT Event - {action} (Success: {success})") - async with get_db() as db: - await db.execute( - "INSERT INTO events (timestamp, action, source, success) VALUES (?, ?, ?, ?)", - (datetime.utcnow().isoformat(), action, "MQTT", success) - ) - await db.commit() + await add_event(datetime.utcnow().isoformat(), action, "MQTT", success) ha_mqtt.set_event_callback(log_mqtt_event) @@ -144,7 +240,8 @@ app.add_middleware( ) # Gate control -async def trigger_gate() -> bool: +async def trigger_gate(): + """Trigger the gate relay""" try: settings = app.state.current_settings if not settings: @@ -152,12 +249,29 @@ async def trigger_gate() -> bool: settings = Settings() gate_pin = settings.gpio.gatePin + trigger_duration = settings.triggerDuration / 1000.0 # Convert to seconds + + # Activate relay (pull pin HIGH) GPIO.output(gate_pin, GPIO.HIGH) - await asyncio.sleep(settings.triggerDuration / 1000) # Convert ms to seconds + logger.info(f"Gate triggered - pin {gate_pin} set HIGH") + + # Log event + timestamp = datetime.now().isoformat() + await add_event(timestamp, "gate triggered", "api") + + # Wait for specified duration + await asyncio.sleep(trigger_duration) + + # Deactivate relay (pull pin LOW) GPIO.output(gate_pin, GPIO.LOW) + logger.info(f"Gate trigger complete - pin {gate_pin} set LOW") + return True except Exception as e: - logger.error(f"Error triggering gate: {e}") + logger.error(f"Failed to trigger gate: {e}", exc_info=True) + # Log failure + timestamp = datetime.now().isoformat() + await add_event(timestamp, "gate trigger failed", "api", False) return False last_open_time = None @@ -169,59 +283,49 @@ async def update_gate_status(): global gate_monitor_running if gate_monitor_running: - logger.warning("Gate status monitor already running, skipping...") + logger.warning("Gate status monitor already running") return - + gate_monitor_running = True - logger.info("Starting gate status monitoring task") + consecutive_errors = 0 try: - settings = app.state.current_settings - if not settings: - logger.warning("No settings available, using default settings") - settings = Settings() - - status_pin = settings.gpio.statusPin - last_status = None - consecutive_errors = 0 - while True: try: - if not gate_monitor_running: - logger.info("Gate status monitor stopped") - break - - current_status = GPIO.input(status_pin) == GPIO.HIGH + settings = app.state.current_settings + if not settings: + logger.warning("No settings available, using default settings") + settings = Settings() - if last_status != current_status: - timestamp = datetime.now() - logger.info(f"Gate status changed to: {'open' if current_status else 'closed'}") + # Check current status (LOW = closed, HIGH = open) + is_open = GPIO.input(settings.gpio.statusPin) == GPIO.HIGH + + # Update state machine + if gate_status.update(is_open): + logger.info(f"Gate status changed to: {gate_status.state.name}") logger.debug("Updating database with new status") - async with get_db() as db: - await db.execute( - "INSERT INTO gate_status (timestamp) VALUES (?)", - (timestamp.isoformat(),) - ) - - await db.execute( - "INSERT INTO events (timestamp, action, source, success) VALUES (?, ?, ?, ?)", - (timestamp.isoformat(), f"gate {'opened' if current_status else 'closed'}", "sensor", True) - ) - await db.commit() + # Update database + await add_gate_status(gate_status.last_changed) + await add_event( + gate_status.last_changed, + f"gate {gate_status.state.name.lower()}", + "sensor" + ) + + # Update MQTT state if enabled + if settings.mqtt.enabled: + await publish_mqtt_state(gate_status.is_open) - await ha_mqtt.publish_state(current_status) - last_status = current_status consecutive_errors = 0 - else: - logger.debug(f"Gate status unchanged: {'open' if current_status else 'closed'}") + # Sleep for a short time before next check await asyncio.sleep(0.5) except Exception as e: consecutive_errors += 1 wait_time = min(30, 2 ** consecutive_errors) - logger.error(f"Error in update_gate_status (attempt {consecutive_errors}): {e}", exc_info=True) + logger.error(f"Error in gate_status_monitor (attempt {consecutive_errors}): {e}", exc_info=True) logger.warning(f"Retrying in {wait_time} seconds...") await asyncio.sleep(wait_time) finally: @@ -268,12 +372,7 @@ async def check_auto_close(): logger.warning(f"Gate has been open for {time_open:.1f} seconds, auto-closing") timestamp = current_time.isoformat() - async with get_db() as db: - await db.execute( - "INSERT INTO events (timestamp, action, source, success) VALUES (?, ?, ?, ?)", - (timestamp, "auto-close", "system", True) - ) - await db.commit() + await add_event(timestamp, "auto-close", "system") await trigger_gate() last_open_time = None @@ -323,14 +422,6 @@ async def trigger(): settings = app.state.current_settings or Settings() current_status = GPIO.input(settings.gpio.statusPin) == GPIO.HIGH - # Log event - async with get_db() as db: - await db.execute( - "INSERT INTO events (timestamp, action, source, success) VALUES (?, ?, ?, ?)", - (timestamp, "trigger gate", "api", success) - ) - await db.commit() - return {"success": success, "timestamp": timestamp, "isOpen": current_status} except Exception as e: logger.error("Error triggering gate", exc_info=True) @@ -341,16 +432,11 @@ async def get_status(): """Get current gate status""" try: settings = app.state.current_settings or Settings() + # LOW (0V) means gate is closed, HIGH (3.3V) means gate is open is_open = GPIO.input(settings.gpio.statusPin) == GPIO.HIGH + gate_status.update(is_open) - async with get_db() as db: - cursor = await db.execute( - "SELECT timestamp FROM gate_status ORDER BY timestamp DESC LIMIT 1" - ) - row = await cursor.fetchone() - last_changed = row[0] if row else datetime.now().isoformat() - - return {"isOpen": is_open, "lastChanged": last_changed} + return {"isOpen": gate_status.is_open, "lastChanged": gate_status.last_changed} except Exception as e: logger.error("Error getting gate status", exc_info=True) raise HTTPException(status_code=500, detail="Failed to get gate status") @@ -358,44 +444,17 @@ async def get_status(): @app.get("/api/events") async def get_events(limit: int = 10, offset: int = 0): """Get recent gate events with pagination""" - async with get_db() as db: - db.row_factory = aiosqlite.Row - - # Get total count - cursor = await db.execute("SELECT COUNT(*) as count FROM events") - row = await cursor.fetchone() - total_count = row['count'] - - # Get paginated events - cursor = await db.execute( - """ - SELECT * FROM events - ORDER BY timestamp DESC - LIMIT ? OFFSET ? - """, - (limit, offset) - ) - events = await cursor.fetchall() - return { - "events": [dict(event) for event in events], - "total": total_count, - "hasMore": (offset + limit) < total_count - } + return await get_events(limit, offset) @app.get("/api/settings") -async def get_settings(): +async def get_settings_route(): """Get current settings""" - async with get_db() as db: - cursor = await db.execute("SELECT key, value FROM settings") - rows = await cursor.fetchall() - settings = {} - for key, value in rows: - settings[key] = json.loads(value) - + try: + settings_dict = await get_settings() return { - "maxOpenTimeSeconds": settings.get("maxOpenTimeSeconds", "300"), - "triggerDuration": settings.get("triggerDuration", "500"), - "mqtt": settings.get("mqtt", { + "maxOpenTimeSeconds": settings_dict.get("maxOpenTimeSeconds", 300), + "triggerDuration": settings_dict.get("triggerDuration", 500), + "mqtt": settings_dict.get("mqtt", { "broker": "localhost", "port": "1883", "username": "", @@ -403,83 +462,81 @@ async def get_settings(): "clientId": "gatekeeper", "enabled": False }), - "gpio": settings.get("gpio", { - "gatePin": 17, - "statusPin": 27 + "gpio": settings_dict.get("gpio", { + "gatePin": 15, + "statusPin": 7 }), - "logging": settings.get("logging", { + "logging": settings_dict.get("logging", { "level": "WARNING", "maxBytes": 10 * 1024 * 1024, "backupCount": 5 }) } + except Exception as e: + logger.error(f"Failed to get settings: {e}", exc_info=True) + raise HTTPException(status_code=500, detail="Failed to get settings") @app.post("/api/settings") -async def update_settings(settings: Settings): +async def update_settings_route(settings_update: Dict[str, Any]): """Update settings""" try: - async with get_db() as db: - # Update each setting - for key, value in settings.dict().items(): - await db.execute( - "INSERT OR REPLACE INTO settings (key, value) VALUES (?, ?)", - (key, json.dumps(value)) - ) - await db.commit() - - # Update environment variables and MQTT connection - if settings.mqtt: - # Update environment variables - os.environ["MQTT_BROKER"] = settings.mqtt["broker"] - os.environ["MQTT_PORT"] = settings.mqtt["port"] - os.environ["MQTT_USERNAME"] = settings.mqtt["username"] - os.environ["MQTT_PASSWORD"] = settings.mqtt["password"] - os.environ["MQTT_CLIENT_ID"] = settings.mqtt["clientId"] - - # Enable/disable MQTT based on settings - ha_mqtt.enable(settings.mqtt.get("enabled", False)) - - # Log settings update event with details - changes = [] - if settings.maxOpenTimeSeconds: - changes.append(f"Max Open Time: {settings.maxOpenTimeSeconds}s") - if settings.triggerDuration: - changes.append(f"Trigger Duration: {settings.triggerDuration}ms") - if settings.mqtt: - mqtt_status = "Enabled" if settings.mqtt.get("enabled") else "Disabled" - changes.append(f"MQTT: {mqtt_status}") - if settings.mqtt.get("enabled"): - changes.append(f"Broker: {settings.mqtt['broker']}:{settings.mqtt['port']}") - - await db.execute( - "INSERT INTO events (timestamp, action, source, success) VALUES (?, ?, ?, ?)", - ( - datetime.utcnow().isoformat(), - f"Settings Updated ({'; '.join(changes)})", - "Settings", - True - ) - ) - await db.commit() - - # Update logging configuration - setup_logging(settings) - - return {"success": True} + # Store original settings for comparison + old_settings = await get_settings() + + # Update settings in database + await update_settings(settings_update) + + # Load new settings and merge with existing + current_settings = await get_settings() + new_settings = Settings(**current_settings) # Use complete settings + app.state.current_settings = new_settings + + # Update MQTT if configuration changed + if "mqtt" in settings_update: + if new_settings.mqtt.enabled: + os.environ["MQTT_BROKER"] = new_settings.mqtt.broker + os.environ["MQTT_PORT"] = new_settings.mqtt.port + os.environ["MQTT_USERNAME"] = new_settings.mqtt.username + os.environ["MQTT_PASSWORD"] = new_settings.mqtt.password + os.environ["MQTT_CLIENT_ID"] = new_settings.mqtt.clientId + await init_mqtt(new_settings) + + # Update logging if configuration changed + if "logging" in settings_update: + setup_logging(new_settings) + + # Log changes + timestamp = datetime.now().isoformat() + changes = [] + if "maxOpenTimeSeconds" in settings_update: + changes.append(f"Max Open Time: {new_settings.maxOpenTimeSeconds}s") + if "triggerDuration" in settings_update: + changes.append(f"Trigger Duration: {new_settings.triggerDuration}ms") + if "mqtt" in settings_update: + mqtt_status = "Enabled" if new_settings.mqtt.enabled else "Disabled" + changes.append(f"MQTT: {mqtt_status}") + if new_settings.mqtt.enabled: + changes.append(f"Broker: {new_settings.mqtt.broker}:{new_settings.mqtt.port}") + + await add_event( + timestamp, + f"Settings Updated ({'; '.join(changes)})", + "api" + ) + + return {"success": True, "message": "Settings updated successfully"} + except Exception as e: - # Log failure event - async with get_db() as db: - await db.execute( - "INSERT INTO events (timestamp, action, source, success) VALUES (?, ?, ?, ?)", - ( - datetime.utcnow().isoformat(), - f"Settings Update Failed: {str(e)}", - "Settings", - False - ) - ) - await db.commit() - raise HTTPException(status_code=500, detail=str(e)) + logger.error(f"Failed to update settings: {e}", exc_info=True) + # Log failure + timestamp = datetime.now().isoformat() + await add_event( + timestamp, + f"Settings update failed: {str(e)}", + "api", + False + ) + raise HTTPException(status_code=500, detail=f"Failed to update settings: {str(e)}") # Serve static files app.mount("/", StaticFiles(directory="../public", html=True), name="static") @@ -553,91 +610,173 @@ async def init_db(): # Load settings from database async def load_settings(): - """Load settings and initialize components based on settings""" + """Load settings from database""" try: + # Use existing get_settings function settings_dict = await get_settings() settings = Settings(**settings_dict) - # Store settings in app state for easy access - app.state.current_settings = settings + # Validate settings + is_valid, errors = validate_settings(settings) + if not is_valid: + logger.warning(f"Invalid settings detected: {', '.join(errors)}") + logger.info("Using default settings") + settings = Settings() # Use defaults - # Configure MQTT based on settings - if settings.mqtt: - # Update environment variables + # Store in app state + app.state.current_settings = settings + return settings + except Exception as e: + logger.error(f"Failed to load settings: {e}", exc_info=True) + # Return default settings on error + default_settings = Settings() + app.state.current_settings = default_settings + return default_settings + +def validate_settings(settings: Settings) -> tuple[bool, list[str]]: + """Validate settings and return (is_valid, error_messages)""" + errors = [] + + # Validate time settings + if settings.maxOpenTimeSeconds < 0: + errors.append("maxOpenTimeSeconds must be positive") + if settings.triggerDuration < 100: # Min 100ms + errors.append("triggerDuration must be at least 100ms") + + # Validate GPIO settings + if settings.gpio.gatePin < 0 or settings.gpio.statusPin < 0: + errors.append("GPIO pins must be positive") + if settings.gpio.gatePin == settings.gpio.statusPin: + errors.append("Gate and status pins must be different") + + # Validate MQTT settings if enabled + if settings.mqtt.enabled: + if not settings.mqtt.broker: + errors.append("MQTT broker required when MQTT is enabled") + if not settings.mqtt.port: + errors.append("MQTT port required when MQTT is enabled") + + return len(errors) == 0, errors + +async def init_mqtt(settings: Settings): + """Initialize MQTT if enabled in settings""" + if settings.mqtt.enabled: + try: + logger.info("MQTT enabled, initializing connection...") os.environ["MQTT_BROKER"] = settings.mqtt.broker os.environ["MQTT_PORT"] = settings.mqtt.port os.environ["MQTT_USERNAME"] = settings.mqtt.username os.environ["MQTT_PASSWORD"] = settings.mqtt.password os.environ["MQTT_CLIENT_ID"] = settings.mqtt.clientId - # Enable/disable MQTT based on settings - if settings.mqtt.enabled: - logger.info("MQTT enabled in settings, initializing connection") - ha_mqtt.enable() - else: - logger.info("MQTT disabled in settings") - - # Set other application settings - os.environ["MAX_OPEN_TIME_SECONDS"] = str(settings.maxOpenTimeSeconds) - os.environ["TRIGGER_DURATION"] = str(settings.triggerDuration) - - # Configure logging - setup_logging(settings) + # Try to enable MQTT with timeout + try: + async with asyncio.timeout(10): # 10 second timeout + ha_mqtt.enable() + logger.info("MQTT initialized successfully") + except asyncio.TimeoutError: + logger.error("MQTT initialization timed out") + return False + + except Exception as e: + logger.error(f"Failed to initialize MQTT: {e}", exc_info=True) + return False + else: + logger.info("MQTT disabled in settings") + return True - logger.info(f"Settings loaded successfully (Max Open Time: {settings.maxOpenTimeSeconds}s, " - f"Trigger Duration: {settings.triggerDuration}ms, " - f"Gate Pin: {settings.gpio.gatePin}, Status Pin: {settings.gpio.statusPin})") - return settings +async def publish_mqtt_state(state: bool) -> bool: + """Publish state to MQTT with error handling""" + if not app.state.current_settings.mqtt.enabled: + return True + try: + async with asyncio.timeout(5): # 5 second timeout + await ha_mqtt.publish_state(state) + return True + except (asyncio.TimeoutError, Exception) as e: + logger.error(f"Failed to publish MQTT state: {e}", exc_info=True) + return False + +async def start_background_tasks(): + """Start all background monitoring tasks""" + logger.info("Starting background tasks...") + global gate_monitor_running, auto_close_running + gate_monitor_running = True + auto_close_running = True + app.state.status_task = asyncio.create_task(update_gate_status()) + app.state.auto_close_task = asyncio.create_task(check_auto_close()) + logger.info("Background tasks started") + +async def validate_startup_state() -> tuple[bool, list[str]]: + """Validate application startup state""" + errors = [] + + # Check database + try: + async with get_db() as db: + await db.execute("SELECT 1") except Exception as e: - logger.warning(f"Failed to load settings: {e}", exc_info=True) - logger.info("Using default settings") - default_settings = Settings() - app.state.current_settings = default_settings - return default_settings + errors.append(f"Database not accessible: {e}") + + # Check GPIO + try: + settings = app.state.current_settings + if not settings: + errors.append("Settings not loaded") + else: + # Check if pins are configured + GPIO.input(settings.gpio.statusPin) # Test read + GPIO.output(settings.gpio.gatePin, GPIO.LOW) # Test write + except Exception as e: + errors.append(f"GPIO not properly configured: {e}") + + # Check MQTT if enabled + if app.state.current_settings and app.state.current_settings.mqtt.enabled: + if not ha_mqtt.is_connected(): + errors.append("MQTT enabled but not connected") + + return len(errors) == 0, errors @app.on_event("startup") async def startup_event(): """Initialize the application on startup""" try: - logger.info("Starting Gatekeeper application") + logger.info("Starting application...") # 1. Initialize database logger.info("Initializing database...") await init_db() - logger.info("Database initialized successfully") - - # 2. Load settings from database + + # 2. Load settings logger.info("Loading settings...") settings = await load_settings() - app.state.current_settings = settings - logger.info("Settings loaded successfully") - - # 3. Setup GPIO - logger.info("Initializing GPIO...") + + # 3. Configure logging based on settings + logger.info("Configuring logging...") + setup_logging(settings) + + # 4. Configure MQTT + await init_mqtt(settings) + + # 5. Initialize GPIO + logger.info("Setting up GPIO...") setup_gpio() - logger.info("GPIO initialized successfully") - - # 4. Initialize MQTT if enabled - if settings.mqtt.enabled: - logger.info("MQTT enabled, initializing connection...") - ha_mqtt.enable() - logger.info("MQTT initialized successfully") + + # 6. Start background tasks + await start_background_tasks() + + # 7. Validate startup state + is_valid, errors = await validate_startup_state() + if not is_valid: + logger.warning("Application started with warnings:") + for error in errors: + logger.warning(f" - {error}") else: - logger.info("MQTT disabled in settings") - - # 5. Start background tasks - logger.info("Starting background tasks...") - global gate_monitor_running, auto_close_running - gate_monitor_running = True - auto_close_running = True - app.state.status_task = asyncio.create_task(update_gate_status()) - app.state.auto_close_task = asyncio.create_task(check_auto_close()) - logger.info("Background tasks started successfully") - - logger.info("Gatekeeper application started successfully") + logger.info("Application startup complete and validated") + except Exception as e: - logger.critical("Failed to start application", exc_info=True) + logger.error(f"Failed to start application: {e}", exc_info=True) raise @app.on_event("shutdown")