#!/usr/bin/env python3 import os import json import asyncio import aiosqlite from datetime import datetime import logging from logging.handlers import RotatingFileHandler import sys from fastapi import FastAPI, HTTPException from fastapi.staticfiles import StaticFiles from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel from typing import Optional, Dict, Any, List, Union import RPi.GPIO as GPIO from enum import Enum, auto from mqtt_integration import HomeAssistantMQTT # Configure logging LOG_FILE = "/var/log/gatekeeper.log" logger = logging.getLogger("gatekeeper") # Models class GateEvent(BaseModel): id: Optional[int] = None timestamp: str action: str source: str success: bool class MQTTSettings(BaseModel): broker: str = "localhost" port: str = "1883" username: str = "" password: str = "" clientId: str = "gatekeeper" enabled: bool = False class GPIOSettings(BaseModel): gatePin: int = 15 # Relay control pin statusPin: int = 7 # Gate open status pin class LoggingSettings(BaseModel): level: str = "INFO" # Default to INFO level maxBytes: int = 10 * 1024 * 1024 # 10MB default backupCount: int = 5 # Keep 5 backup files by default class Settings(BaseModel): maxOpenTimeSeconds: int = 300 # Default 5 minutes triggerDuration: int = 500 # Default 500ms mqtt: MQTTSettings = MQTTSettings() gpio: GPIOSettings = GPIOSettings() logging: LoggingSettings = LoggingSettings() class GateState(Enum): """Gate state enumeration""" UNKNOWN = auto() OPEN = auto() CLOSED = auto() TRANSITIONING = auto() class GateStatus: """Gate status tracking""" def __init__(self): self.state = GateState.UNKNOWN self.last_change = datetime.now() self.transition_start = None def update(self, is_open: bool) -> bool: """Update state and return True if state changed""" now = datetime.now() new_state = GateState.OPEN if is_open else GateState.CLOSED if new_state != self.state: self.state = new_state self.last_change = now return True return False @property def is_open(self) -> bool: """Return True if gate is open""" return self.state == GateState.OPEN @property def last_changed(self) -> str: """Return ISO formatted last change time""" return self.last_change.isoformat() # Initialize gate status gate_status = GateStatus() # Configure logging def setup_logging(settings: Settings): """Configure logging based on settings""" log_level = getattr(logging, settings.logging.level.upper(), logging.INFO) logger.setLevel(log_level) # Remove existing handlers for handler in logger.handlers[:]: logger.removeHandler(handler) # Create rotating file handler file_handler = RotatingFileHandler( LOG_FILE, maxBytes=settings.logging.maxBytes, backupCount=settings.logging.backupCount, delay=True # Only create log file when first record is written ) file_handler.setLevel(log_level) # Create formatter and add it to the handler formatter = logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) file_handler.setFormatter(formatter) # Add the handler to the logger logger.addHandler(file_handler) logger.debug("Logging configured with level: %s", settings.logging.level) # Log uncaught exceptions def handle_exception(exc_type, exc_value, exc_traceback): if issubclass(exc_type, KeyboardInterrupt): # Call the default handler for keyboard interrupt sys.__excepthook__(exc_type, exc_value, exc_traceback) return logger.error("Uncaught exception", exc_info=(exc_type, exc_value, exc_traceback)) # Install exception handler sys.excepthook = handle_exception # Initialize FastAPI app = FastAPI() ha_mqtt = HomeAssistantMQTT() # Constants DB_PATH = "gatekeeper.db" # Database file path # Database connection handling class DBConnection: def __init__(self): self.db = None async def __aenter__(self): self.db = await aiosqlite.connect(DB_PATH) self.db.row_factory = aiosqlite.Row return self.db async def __aexit__(self, exc_type, exc_val, exc_tb): await self.db.close() def get_db(): """Get a new database connection""" return DBConnection() # Database operations async def add_event(timestamp: str, action: str, source: str, success: bool = True): """Add an event to the database""" async with get_db() as db: await db.execute( "INSERT INTO events (timestamp, action, source, success) VALUES (?, ?, ?, ?)", (timestamp, action, source, success) ) await db.commit() async def add_gate_status(timestamp: str): """Add a gate status change to the database""" async with get_db() as db: await db.execute( "INSERT INTO gate_status (timestamp) VALUES (?)", (timestamp,) ) await db.commit() async def update_settings(settings_update: Dict[str, Any]): """Update settings in the database""" async with get_db() as db: # Update each setting in the database for key, value in settings_update.items(): await db.execute( "INSERT OR REPLACE INTO settings (key, value) VALUES (?, ?)", (key, json.dumps(value)) ) await db.commit() async def get_settings(): """Get current settings from the database""" async with get_db() as db: cursor = await db.execute("SELECT key, value FROM settings") rows = await cursor.fetchall() settings = {} for key, value in rows: settings[key] = json.loads(value) return settings async def get_events(limit: int = 10, offset: int = 0): """Get events from database with pagination""" try: async with aiosqlite.connect(DB_PATH) as db: # Get total count async with db.execute("SELECT COUNT(*) FROM events") as cursor: total = (await cursor.fetchone())[0] # Get paginated events async with db.execute( "SELECT timestamp, action, source, success FROM events ORDER BY timestamp DESC LIMIT ? OFFSET ?", (limit, offset) ) as cursor: events = [ { "timestamp": row[0], "action": row[1], "source": row[2], "success": bool(row[3]) } for row in await cursor.fetchall() ] return { "events": events, "total": total, "hasMore": total > (offset + limit) } except Exception as e: logger.error(f"Failed to get events: {e}", exc_info=True) raise HTTPException(status_code=500, detail="Failed to get events") # Set up MQTT event logging async def log_mqtt_event(action: str, success: bool = True): """Log MQTT events to the database and log file""" logger.info(f"MQTT Event - {action} (Success: {success})") await add_event(datetime.utcnow().isoformat(), action, "MQTT", success) ha_mqtt.set_event_callback(log_mqtt_event) # CORS middleware app.add_middleware( CORSMiddleware, allow_origins=["*"], # In production, replace with specific origins allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Gate control async def trigger_gate(): """Trigger the gate relay""" try: settings = app.state.current_settings if not settings: logger.warning("No settings available, using default settings") settings = Settings() gate_pin = settings.gpio.gatePin trigger_duration = settings.triggerDuration / 1000.0 # Convert to seconds # Activate relay (pull pin HIGH) GPIO.output(gate_pin, GPIO.HIGH) logger.info(f"Gate triggered - pin {gate_pin} set HIGH") # Log event timestamp = datetime.now().isoformat() await add_event(timestamp, "gate triggered", "api") # Wait for specified duration await asyncio.sleep(trigger_duration) # Deactivate relay (pull pin LOW) GPIO.output(gate_pin, GPIO.LOW) logger.info(f"Gate trigger complete - pin {gate_pin} set LOW") return True except Exception as e: logger.error(f"Failed to trigger gate: {e}", exc_info=True) # Log failure timestamp = datetime.now().isoformat() await add_event(timestamp, "gate trigger failed", "api", False) return False last_open_time = None gate_monitor_running = False auto_close_running = False async def start_background_tasks(): """Start all background monitoring tasks""" logger.info("Starting background tasks...") # Don't set global flags here, let the tasks set them app.state.status_task = asyncio.create_task(update_gate_status()) app.state.auto_close_task = asyncio.create_task(check_auto_close()) logger.info("Background tasks started") async def update_gate_status(): """Monitor gate status and update database when it changes""" global gate_monitor_running if gate_monitor_running: logger.warning("Gate status monitor already running") return gate_monitor_running = True consecutive_errors = 0 try: while True: try: settings = app.state.current_settings if not settings: logger.warning("No settings available, using default settings") settings = Settings() # Check current status (LOW = closed, HIGH = open) is_open = GPIO.input(settings.gpio.statusPin) == GPIO.HIGH # Update state machine if gate_status.update(is_open): logger.info(f"Gate status changed to: {gate_status.state.name}") logger.debug("Updating database with new status") # Update database await add_gate_status(gate_status.last_changed) await add_event( gate_status.last_changed, f"gate {gate_status.state.name.lower()}", "sensor" ) # Update MQTT state if enabled if settings.mqtt.enabled: await publish_mqtt_state(gate_status.is_open) consecutive_errors = 0 # Sleep for a short time before next check await asyncio.sleep(0.5) except Exception as e: consecutive_errors += 1 wait_time = min(30, 2 ** consecutive_errors) logger.error(f"Error in gate_status_monitor (attempt {consecutive_errors}): {e}", exc_info=True) logger.warning(f"Retrying in {wait_time} seconds...") await asyncio.sleep(wait_time) finally: gate_monitor_running = False logger.info("Gate status monitor stopped") async def check_auto_close(): """Monitor gate status and auto-close if open too long""" global auto_close_running if auto_close_running: logger.warning("Auto-close monitor already running, skipping...") return auto_close_running = True try: while True: try: settings = app.state.current_settings if not settings: logger.warning("No settings available for auto-close, using defaults") settings = Settings() current_time = datetime.now() # Check if gate is open if gate_status.is_open: # If this is the first time we see it open, record the time global last_open_time if last_open_time is None: last_open_time = current_time logger.debug("Gate opened, starting auto-close timer") # Calculate how long it's been open time_open = (current_time - last_open_time).total_seconds() # Auto-close if it's been open too long if time_open > settings.maxOpenTimeSeconds: logger.warning(f"Gate has been open for {time_open:.1f} seconds, auto-closing") await add_event(current_time.isoformat(), "auto-close", "system") await trigger_gate() last_open_time = None else: # Reset last open time if gate is closed if last_open_time is not None: logger.debug("Gate closed, resetting auto-close timer") last_open_time = None await asyncio.sleep(1) except Exception as e: logger.error(f"Error in auto_close_monitor: {e}", exc_info=True) await asyncio.sleep(5) # Wait before retrying finally: auto_close_running = False logger.info("Auto-close monitor stopped") # MQQT Command Handler async def handle_mqtt_command(should_open: bool): """Handle commands received from Home Assistant""" try: settings = app.state.current_settings if not settings: logger.warning("No settings available, using default settings") settings = Settings() status_pin = settings.gpio.statusPin if should_open != (GPIO.input(status_pin) == GPIO.HIGH): await trigger_gate() except Exception as e: logger.error(f"Error handling MQTT command: {e}") # API Routes @app.post("/api/trigger") async def trigger(): """Trigger the gate""" try: success = await trigger_gate() timestamp = datetime.now().isoformat() # Get current status after trigger settings = app.state.current_settings or Settings() current_status = GPIO.input(settings.gpio.statusPin) == GPIO.HIGH return {"success": success, "timestamp": timestamp, "isOpen": current_status} except Exception as e: logger.error("Error triggering gate", exc_info=True) raise HTTPException(status_code=500, detail="Failed to trigger gate") @app.get("/api/status") async def get_status(): """Get current gate status""" try: settings = app.state.current_settings or Settings() # LOW (0V) means gate is closed, HIGH (3.3V) means gate is open is_open = GPIO.input(settings.gpio.statusPin) == GPIO.HIGH gate_status.update(is_open) return {"isOpen": gate_status.is_open, "lastChanged": gate_status.last_changed} except Exception as e: logger.error("Error getting gate status", exc_info=True) raise HTTPException(status_code=500, detail="Failed to get gate status") @app.get("/api/events") async def get_events_route(limit: int = 10, offset: int = 0): """Get recent gate events with pagination""" return await get_events(limit, offset) @app.get("/api/settings") async def get_settings_route(): """Get current settings""" try: settings_dict = await get_settings() return { "maxOpenTimeSeconds": settings_dict.get("maxOpenTimeSeconds", 300), "triggerDuration": settings_dict.get("triggerDuration", 500), "mqtt": settings_dict.get("mqtt", { "broker": "localhost", "port": "1883", "username": "", "password": "", "clientId": "gatekeeper", "enabled": False }), "gpio": settings_dict.get("gpio", { "gatePin": 15, "statusPin": 7 }), "logging": settings_dict.get("logging", { "level": "INFO", "maxBytes": 10 * 1024 * 1024, "backupCount": 5 }) } except Exception as e: logger.error(f"Failed to get settings: {e}", exc_info=True) raise HTTPException(status_code=500, detail="Failed to get settings") @app.post("/api/settings") async def update_settings_route(settings_update: Dict[str, Any]): """Update settings""" try: # Store original settings for comparison old_settings = await get_settings() # Update settings in database await update_settings(settings_update) # Load new settings and merge with existing current_settings = await get_settings() new_settings = Settings(**current_settings) # Use complete settings app.state.current_settings = new_settings # Update MQTT if configuration changed if "mqtt" in settings_update: if new_settings.mqtt.enabled: os.environ["MQTT_BROKER"] = new_settings.mqtt.broker os.environ["MQTT_PORT"] = new_settings.mqtt.port os.environ["MQTT_USERNAME"] = new_settings.mqtt.username os.environ["MQTT_PASSWORD"] = new_settings.mqtt.password os.environ["MQTT_CLIENT_ID"] = new_settings.mqtt.clientId await init_mqtt(new_settings) # Update logging if configuration changed if "logging" in settings_update: setup_logging(new_settings) # Log changes timestamp = datetime.now().isoformat() changes = [] if "maxOpenTimeSeconds" in settings_update: changes.append(f"Max Open Time: {new_settings.maxOpenTimeSeconds}s") if "triggerDuration" in settings_update: changes.append(f"Trigger Duration: {new_settings.triggerDuration}ms") if "mqtt" in settings_update: mqtt_status = "Enabled" if new_settings.mqtt.enabled else "Disabled" changes.append(f"MQTT: {mqtt_status}") if new_settings.mqtt.enabled: changes.append(f"Broker: {new_settings.mqtt.broker}:{new_settings.mqtt.port}") await add_event( timestamp, f"Settings Updated ({'; '.join(changes)})", "api" ) return {"success": True, "message": "Settings updated successfully"} except Exception as e: logger.error(f"Failed to update settings: {e}", exc_info=True) # Log failure timestamp = datetime.now().isoformat() await add_event( timestamp, f"Settings update failed: {str(e)}", "api", False ) raise HTTPException(status_code=500, detail=f"Failed to update settings: {str(e)}") # Serve static files app.mount("/", StaticFiles(directory="../public", html=True), name="static") # GPIO Setup def setup_gpio(): """Initialize GPIO pins based on settings""" GPIO.setwarnings(False) GPIO.setmode(GPIO.BCM) try: settings = app.state.current_settings if not settings: logger.warning("No settings available, using default settings") settings = Settings() # Setup gate control pin gate_pin = settings.gpio.gatePin GPIO.setup(gate_pin, GPIO.OUT) GPIO.output(gate_pin, GPIO.LOW) # Setup status pin if needed status_pin = settings.gpio.statusPin GPIO.setup(status_pin, GPIO.IN, pull_up_down=GPIO.PUD_UP) logger.info(f"GPIO initialized (Gate Pin: {gate_pin}, Status Pin: {status_pin})") except Exception as e: logger.error(f"Failed to setup GPIO: {e}", exc_info=True) raise # Database functions async def init_db(): """Initialize the SQLite database""" async with get_db() as db: # Create events table await db.execute(""" CREATE TABLE IF NOT EXISTS events ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp TEXT NOT NULL, action TEXT NOT NULL, source TEXT NOT NULL, success BOOLEAN NOT NULL ) """) # Create gate_status table await db.execute(""" CREATE TABLE IF NOT EXISTS gate_status ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp TEXT NOT NULL ) """) # Create settings table await db.execute(""" CREATE TABLE IF NOT EXISTS settings ( key TEXT PRIMARY KEY, value TEXT NOT NULL ) """) # Insert default settings if they don't exist default_settings = Settings().dict() for key, value in default_settings.items(): await db.execute( "INSERT OR IGNORE INTO settings (key, value) VALUES (?, ?)", (key, json.dumps(value)) ) await db.commit() # Load settings from database async def load_settings(): """Load settings from database""" try: # Use existing get_settings function settings_dict = await get_settings() settings = Settings(**settings_dict) # Validate settings is_valid, errors = validate_settings(settings) if not is_valid: logger.warning(f"Invalid settings detected: {', '.join(errors)}") logger.info("Using default settings") settings = Settings() # Use defaults # Store in app state app.state.current_settings = settings return settings except Exception as e: logger.error(f"Failed to load settings: {e}", exc_info=True) # Return default settings on error default_settings = Settings() app.state.current_settings = default_settings return default_settings def validate_settings(settings: Settings) -> tuple[bool, list[str]]: """Validate settings and return (is_valid, error_messages)""" errors = [] # Validate time settings if settings.maxOpenTimeSeconds < 0: errors.append("maxOpenTimeSeconds must be positive") if settings.triggerDuration < 100: # Min 100ms errors.append("triggerDuration must be at least 100ms") # Validate GPIO settings if settings.gpio.gatePin < 0 or settings.gpio.statusPin < 0: errors.append("GPIO pins must be positive") if settings.gpio.gatePin == settings.gpio.statusPin: errors.append("Gate and status pins must be different") # Validate MQTT settings if enabled if settings.mqtt.enabled: if not settings.mqtt.broker: errors.append("MQTT broker required when MQTT is enabled") if not settings.mqtt.port: errors.append("MQTT port required when MQTT is enabled") return len(errors) == 0, errors async def init_mqtt(settings: Settings): """Initialize MQTT if enabled in settings""" if settings.mqtt.enabled: try: logger.info("MQTT enabled, initializing connection...") os.environ["MQTT_BROKER"] = settings.mqtt.broker os.environ["MQTT_PORT"] = settings.mqtt.port os.environ["MQTT_USERNAME"] = settings.mqtt.username os.environ["MQTT_PASSWORD"] = settings.mqtt.password os.environ["MQTT_CLIENT_ID"] = settings.mqtt.clientId # Try to enable MQTT with timeout try: async with asyncio.timeout(10): # 10 second timeout ha_mqtt.enable() logger.info("MQTT initialized successfully") except asyncio.TimeoutError: logger.error("MQTT initialization timed out") return False except Exception as e: logger.error(f"Failed to initialize MQTT: {e}", exc_info=True) return False else: logger.info("MQTT disabled in settings") return True async def publish_mqtt_state(state: bool) -> bool: """Publish state to MQTT with error handling""" if not app.state.current_settings.mqtt.enabled: return True try: async with asyncio.timeout(5): # 5 second timeout await ha_mqtt.publish_state(state) return True except (asyncio.TimeoutError, Exception) as e: logger.error(f"Failed to publish MQTT state: {e}", exc_info=True) return False async def validate_startup_state() -> tuple[bool, list[str]]: """Validate application startup state""" errors = [] # Check database try: async with get_db() as db: await db.execute("SELECT 1") except Exception as e: errors.append(f"Database not accessible: {e}") # Check GPIO try: settings = app.state.current_settings if not settings: errors.append("Settings not loaded") else: # Check if pins are configured GPIO.input(settings.gpio.statusPin) # Test read GPIO.output(settings.gpio.gatePin, GPIO.LOW) # Test write except Exception as e: errors.append(f"GPIO not properly configured: {e}") # Check MQTT if enabled if app.state.current_settings and app.state.current_settings.mqtt.enabled: if not ha_mqtt.is_connected(): errors.append("MQTT enabled but not connected") return len(errors) == 0, errors @app.on_event("startup") async def startup_event(): """Initialize the application on startup""" try: logger.info("Starting application...") # 1. Initialize database logger.info("Initializing database...") await init_db() # 2. Load settings logger.info("Loading settings...") settings = await load_settings() # 3. Configure logging based on settings logger.info("Configuring logging...") setup_logging(settings) # 4. Configure MQTT await init_mqtt(settings) # 5. Initialize GPIO logger.info("Setting up GPIO...") setup_gpio() # 6. Start background tasks await start_background_tasks() # 7. Validate startup state is_valid, errors = await validate_startup_state() if not is_valid: logger.warning("Application started with warnings:") for error in errors: logger.warning(f" - {error}") else: logger.info("Application startup complete and validated") except Exception as e: logger.error(f"Failed to start application: {e}", exc_info=True) raise @app.on_event("shutdown") async def shutdown_event(): """Cleanup on shutdown""" try: logger.info("Shutting down Gatekeeper application") # 1. Stop background tasks logger.info("Stopping background tasks...") global gate_monitor_running, auto_close_running gate_monitor_running = False auto_close_running = False if hasattr(app.state, "status_task"): app.state.status_task.cancel() if hasattr(app.state, "auto_close_task"): app.state.auto_close_task.cancel() logger.info("Background tasks stopped") # 2. Disconnect MQTT if it was enabled if hasattr(app.state, "current_settings") and app.state.current_settings.mqtt.enabled: logger.info("Disconnecting MQTT...") await ha_mqtt.disconnect() logger.info("MQTT disconnected") # 3. Clean up GPIO logger.info("Cleaning up GPIO...") GPIO.cleanup() logger.info("GPIO cleanup completed") # 4. Close database connection logger.info("Closing database connection...") # No need to explicitly close as we're using new connections for each request logger.info("Database connections will be closed automatically") logger.info("Shutdown completed successfully") except Exception as e: logger.error("Error during shutdown", exc_info=True) raise