refactor: improve database operations and settings management

- Added dedicated database operation functions
- Improved settings validation and updates
- Added proper state machine for gate status
- Added MQTT error handling
- Added startup state validation
- Fixed partial settings updates
This commit is contained in:
Josh Finlay 2025-01-08 09:11:39 +10:00
parent a3ecf3a606
commit 9046cbca1d
1 changed files with 368 additions and 229 deletions

View File

@ -13,6 +13,7 @@ from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import Optional, Dict, Any, List, Union
import RPi.GPIO as GPIO
from enum import Enum, auto
from mqtt_integration import HomeAssistantMQTT
@ -37,8 +38,8 @@ class MQTTSettings(BaseModel):
enabled: bool = False
class GPIOSettings(BaseModel):
gatePin: int = 17 # Default GPIO pin for gate control
statusPin: int = 27 # Default GPIO pin for gate status
gatePin: int = 15 # Relay control pin
statusPin: int = 7 # Gate open status pin
class LoggingSettings(BaseModel):
level: str = "WARNING" # Default to WARNING level
@ -52,9 +53,43 @@ class Settings(BaseModel):
gpio: GPIOSettings = GPIOSettings()
logging: LoggingSettings = LoggingSettings()
class GateStatus(BaseModel):
isOpen: bool
lastChanged: str
class GateState(Enum):
"""Gate state enumeration"""
UNKNOWN = auto()
OPEN = auto()
CLOSED = auto()
TRANSITIONING = auto()
class GateStatus:
"""Gate status tracking"""
def __init__(self):
self.state = GateState.UNKNOWN
self.last_change = datetime.now()
self.transition_start = None
def update(self, is_open: bool) -> bool:
"""Update state and return True if state changed"""
now = datetime.now()
new_state = GateState.OPEN if is_open else GateState.CLOSED
if new_state != self.state:
self.state = new_state
self.last_change = now
return True
return False
@property
def is_open(self) -> bool:
"""Return True if gate is open"""
return self.state == GateState.OPEN
@property
def last_changed(self) -> str:
"""Return ISO formatted last change time"""
return self.last_change.isoformat()
# Initialize gate status
gate_status = GateStatus()
# Configure logging
def setup_logging(settings: Settings):
@ -121,16 +156,77 @@ def get_db():
"""Get a new database connection"""
return DBConnection()
# Database operations
async def add_event(timestamp: str, action: str, source: str, success: bool = True):
"""Add an event to the database"""
async with get_db() as db:
await db.execute(
"INSERT INTO events (timestamp, action, source, success) VALUES (?, ?, ?, ?)",
(timestamp, action, source, success)
)
await db.commit()
async def add_gate_status(timestamp: str):
"""Add a gate status change to the database"""
async with get_db() as db:
await db.execute(
"INSERT INTO gate_status (timestamp) VALUES (?)",
(timestamp,)
)
await db.commit()
async def update_settings(settings_update: Dict[str, Any]):
"""Update settings in the database"""
async with get_db() as db:
# Update each setting in the database
for key, value in settings_update.items():
await db.execute(
"INSERT OR REPLACE INTO settings (key, value) VALUES (?, ?)",
(key, json.dumps(value))
)
await db.commit()
async def get_settings():
"""Get current settings from the database"""
async with get_db() as db:
cursor = await db.execute("SELECT key, value FROM settings")
rows = await cursor.fetchall()
settings = {}
for key, value in rows:
settings[key] = json.loads(value)
return settings
async def get_events(limit: int = 10, offset: int = 0):
"""Get recent gate events with pagination"""
async with get_db() as db:
db.row_factory = aiosqlite.Row
# Get total count
cursor = await db.execute("SELECT COUNT(*) as count FROM events")
row = await cursor.fetchone()
total_count = row['count']
# Get paginated events
cursor = await db.execute(
"""
SELECT * FROM events
ORDER BY timestamp DESC
LIMIT ? OFFSET ?
""",
(limit, offset)
)
events = await cursor.fetchall()
return {
"events": [dict(event) for event in events],
"total": total_count,
"hasMore": (offset + limit) < total_count
}
# Set up MQTT event logging
async def log_mqtt_event(action: str, success: bool = True):
"""Log MQTT events to the database and log file"""
logger.info(f"MQTT Event - {action} (Success: {success})")
async with get_db() as db:
await db.execute(
"INSERT INTO events (timestamp, action, source, success) VALUES (?, ?, ?, ?)",
(datetime.utcnow().isoformat(), action, "MQTT", success)
)
await db.commit()
await add_event(datetime.utcnow().isoformat(), action, "MQTT", success)
ha_mqtt.set_event_callback(log_mqtt_event)
@ -144,7 +240,8 @@ app.add_middleware(
)
# Gate control
async def trigger_gate() -> bool:
async def trigger_gate():
"""Trigger the gate relay"""
try:
settings = app.state.current_settings
if not settings:
@ -152,12 +249,29 @@ async def trigger_gate() -> bool:
settings = Settings()
gate_pin = settings.gpio.gatePin
trigger_duration = settings.triggerDuration / 1000.0 # Convert to seconds
# Activate relay (pull pin HIGH)
GPIO.output(gate_pin, GPIO.HIGH)
await asyncio.sleep(settings.triggerDuration / 1000) # Convert ms to seconds
logger.info(f"Gate triggered - pin {gate_pin} set HIGH")
# Log event
timestamp = datetime.now().isoformat()
await add_event(timestamp, "gate triggered", "api")
# Wait for specified duration
await asyncio.sleep(trigger_duration)
# Deactivate relay (pull pin LOW)
GPIO.output(gate_pin, GPIO.LOW)
logger.info(f"Gate trigger complete - pin {gate_pin} set LOW")
return True
except Exception as e:
logger.error(f"Error triggering gate: {e}")
logger.error(f"Failed to trigger gate: {e}", exc_info=True)
# Log failure
timestamp = datetime.now().isoformat()
await add_event(timestamp, "gate trigger failed", "api", False)
return False
last_open_time = None
@ -169,59 +283,49 @@ async def update_gate_status():
global gate_monitor_running
if gate_monitor_running:
logger.warning("Gate status monitor already running, skipping...")
logger.warning("Gate status monitor already running")
return
gate_monitor_running = True
logger.info("Starting gate status monitoring task")
consecutive_errors = 0
try:
settings = app.state.current_settings
if not settings:
logger.warning("No settings available, using default settings")
settings = Settings()
status_pin = settings.gpio.statusPin
last_status = None
consecutive_errors = 0
while True:
try:
if not gate_monitor_running:
logger.info("Gate status monitor stopped")
break
current_status = GPIO.input(status_pin) == GPIO.HIGH
settings = app.state.current_settings
if not settings:
logger.warning("No settings available, using default settings")
settings = Settings()
if last_status != current_status:
timestamp = datetime.now()
logger.info(f"Gate status changed to: {'open' if current_status else 'closed'}")
# Check current status (LOW = closed, HIGH = open)
is_open = GPIO.input(settings.gpio.statusPin) == GPIO.HIGH
# Update state machine
if gate_status.update(is_open):
logger.info(f"Gate status changed to: {gate_status.state.name}")
logger.debug("Updating database with new status")
async with get_db() as db:
await db.execute(
"INSERT INTO gate_status (timestamp) VALUES (?)",
(timestamp.isoformat(),)
)
await db.execute(
"INSERT INTO events (timestamp, action, source, success) VALUES (?, ?, ?, ?)",
(timestamp.isoformat(), f"gate {'opened' if current_status else 'closed'}", "sensor", True)
)
await db.commit()
# Update database
await add_gate_status(gate_status.last_changed)
await add_event(
gate_status.last_changed,
f"gate {gate_status.state.name.lower()}",
"sensor"
)
# Update MQTT state if enabled
if settings.mqtt.enabled:
await publish_mqtt_state(gate_status.is_open)
await ha_mqtt.publish_state(current_status)
last_status = current_status
consecutive_errors = 0
else:
logger.debug(f"Gate status unchanged: {'open' if current_status else 'closed'}")
# Sleep for a short time before next check
await asyncio.sleep(0.5)
except Exception as e:
consecutive_errors += 1
wait_time = min(30, 2 ** consecutive_errors)
logger.error(f"Error in update_gate_status (attempt {consecutive_errors}): {e}", exc_info=True)
logger.error(f"Error in gate_status_monitor (attempt {consecutive_errors}): {e}", exc_info=True)
logger.warning(f"Retrying in {wait_time} seconds...")
await asyncio.sleep(wait_time)
finally:
@ -268,12 +372,7 @@ async def check_auto_close():
logger.warning(f"Gate has been open for {time_open:.1f} seconds, auto-closing")
timestamp = current_time.isoformat()
async with get_db() as db:
await db.execute(
"INSERT INTO events (timestamp, action, source, success) VALUES (?, ?, ?, ?)",
(timestamp, "auto-close", "system", True)
)
await db.commit()
await add_event(timestamp, "auto-close", "system")
await trigger_gate()
last_open_time = None
@ -323,14 +422,6 @@ async def trigger():
settings = app.state.current_settings or Settings()
current_status = GPIO.input(settings.gpio.statusPin) == GPIO.HIGH
# Log event
async with get_db() as db:
await db.execute(
"INSERT INTO events (timestamp, action, source, success) VALUES (?, ?, ?, ?)",
(timestamp, "trigger gate", "api", success)
)
await db.commit()
return {"success": success, "timestamp": timestamp, "isOpen": current_status}
except Exception as e:
logger.error("Error triggering gate", exc_info=True)
@ -341,16 +432,11 @@ async def get_status():
"""Get current gate status"""
try:
settings = app.state.current_settings or Settings()
# LOW (0V) means gate is closed, HIGH (3.3V) means gate is open
is_open = GPIO.input(settings.gpio.statusPin) == GPIO.HIGH
gate_status.update(is_open)
async with get_db() as db:
cursor = await db.execute(
"SELECT timestamp FROM gate_status ORDER BY timestamp DESC LIMIT 1"
)
row = await cursor.fetchone()
last_changed = row[0] if row else datetime.now().isoformat()
return {"isOpen": is_open, "lastChanged": last_changed}
return {"isOpen": gate_status.is_open, "lastChanged": gate_status.last_changed}
except Exception as e:
logger.error("Error getting gate status", exc_info=True)
raise HTTPException(status_code=500, detail="Failed to get gate status")
@ -358,44 +444,17 @@ async def get_status():
@app.get("/api/events")
async def get_events(limit: int = 10, offset: int = 0):
"""Get recent gate events with pagination"""
async with get_db() as db:
db.row_factory = aiosqlite.Row
# Get total count
cursor = await db.execute("SELECT COUNT(*) as count FROM events")
row = await cursor.fetchone()
total_count = row['count']
# Get paginated events
cursor = await db.execute(
"""
SELECT * FROM events
ORDER BY timestamp DESC
LIMIT ? OFFSET ?
""",
(limit, offset)
)
events = await cursor.fetchall()
return {
"events": [dict(event) for event in events],
"total": total_count,
"hasMore": (offset + limit) < total_count
}
return await get_events(limit, offset)
@app.get("/api/settings")
async def get_settings():
async def get_settings_route():
"""Get current settings"""
async with get_db() as db:
cursor = await db.execute("SELECT key, value FROM settings")
rows = await cursor.fetchall()
settings = {}
for key, value in rows:
settings[key] = json.loads(value)
try:
settings_dict = await get_settings()
return {
"maxOpenTimeSeconds": settings.get("maxOpenTimeSeconds", "300"),
"triggerDuration": settings.get("triggerDuration", "500"),
"mqtt": settings.get("mqtt", {
"maxOpenTimeSeconds": settings_dict.get("maxOpenTimeSeconds", 300),
"triggerDuration": settings_dict.get("triggerDuration", 500),
"mqtt": settings_dict.get("mqtt", {
"broker": "localhost",
"port": "1883",
"username": "",
@ -403,83 +462,81 @@ async def get_settings():
"clientId": "gatekeeper",
"enabled": False
}),
"gpio": settings.get("gpio", {
"gatePin": 17,
"statusPin": 27
"gpio": settings_dict.get("gpio", {
"gatePin": 15,
"statusPin": 7
}),
"logging": settings.get("logging", {
"logging": settings_dict.get("logging", {
"level": "WARNING",
"maxBytes": 10 * 1024 * 1024,
"backupCount": 5
})
}
except Exception as e:
logger.error(f"Failed to get settings: {e}", exc_info=True)
raise HTTPException(status_code=500, detail="Failed to get settings")
@app.post("/api/settings")
async def update_settings(settings: Settings):
async def update_settings_route(settings_update: Dict[str, Any]):
"""Update settings"""
try:
async with get_db() as db:
# Update each setting
for key, value in settings.dict().items():
await db.execute(
"INSERT OR REPLACE INTO settings (key, value) VALUES (?, ?)",
(key, json.dumps(value))
)
await db.commit()
# Update environment variables and MQTT connection
if settings.mqtt:
# Update environment variables
os.environ["MQTT_BROKER"] = settings.mqtt["broker"]
os.environ["MQTT_PORT"] = settings.mqtt["port"]
os.environ["MQTT_USERNAME"] = settings.mqtt["username"]
os.environ["MQTT_PASSWORD"] = settings.mqtt["password"]
os.environ["MQTT_CLIENT_ID"] = settings.mqtt["clientId"]
# Enable/disable MQTT based on settings
ha_mqtt.enable(settings.mqtt.get("enabled", False))
# Log settings update event with details
changes = []
if settings.maxOpenTimeSeconds:
changes.append(f"Max Open Time: {settings.maxOpenTimeSeconds}s")
if settings.triggerDuration:
changes.append(f"Trigger Duration: {settings.triggerDuration}ms")
if settings.mqtt:
mqtt_status = "Enabled" if settings.mqtt.get("enabled") else "Disabled"
changes.append(f"MQTT: {mqtt_status}")
if settings.mqtt.get("enabled"):
changes.append(f"Broker: {settings.mqtt['broker']}:{settings.mqtt['port']}")
await db.execute(
"INSERT INTO events (timestamp, action, source, success) VALUES (?, ?, ?, ?)",
(
datetime.utcnow().isoformat(),
f"Settings Updated ({'; '.join(changes)})",
"Settings",
True
)
)
await db.commit()
# Update logging configuration
setup_logging(settings)
return {"success": True}
# Store original settings for comparison
old_settings = await get_settings()
# Update settings in database
await update_settings(settings_update)
# Load new settings and merge with existing
current_settings = await get_settings()
new_settings = Settings(**current_settings) # Use complete settings
app.state.current_settings = new_settings
# Update MQTT if configuration changed
if "mqtt" in settings_update:
if new_settings.mqtt.enabled:
os.environ["MQTT_BROKER"] = new_settings.mqtt.broker
os.environ["MQTT_PORT"] = new_settings.mqtt.port
os.environ["MQTT_USERNAME"] = new_settings.mqtt.username
os.environ["MQTT_PASSWORD"] = new_settings.mqtt.password
os.environ["MQTT_CLIENT_ID"] = new_settings.mqtt.clientId
await init_mqtt(new_settings)
# Update logging if configuration changed
if "logging" in settings_update:
setup_logging(new_settings)
# Log changes
timestamp = datetime.now().isoformat()
changes = []
if "maxOpenTimeSeconds" in settings_update:
changes.append(f"Max Open Time: {new_settings.maxOpenTimeSeconds}s")
if "triggerDuration" in settings_update:
changes.append(f"Trigger Duration: {new_settings.triggerDuration}ms")
if "mqtt" in settings_update:
mqtt_status = "Enabled" if new_settings.mqtt.enabled else "Disabled"
changes.append(f"MQTT: {mqtt_status}")
if new_settings.mqtt.enabled:
changes.append(f"Broker: {new_settings.mqtt.broker}:{new_settings.mqtt.port}")
await add_event(
timestamp,
f"Settings Updated ({'; '.join(changes)})",
"api"
)
return {"success": True, "message": "Settings updated successfully"}
except Exception as e:
# Log failure event
async with get_db() as db:
await db.execute(
"INSERT INTO events (timestamp, action, source, success) VALUES (?, ?, ?, ?)",
(
datetime.utcnow().isoformat(),
f"Settings Update Failed: {str(e)}",
"Settings",
False
)
)
await db.commit()
raise HTTPException(status_code=500, detail=str(e))
logger.error(f"Failed to update settings: {e}", exc_info=True)
# Log failure
timestamp = datetime.now().isoformat()
await add_event(
timestamp,
f"Settings update failed: {str(e)}",
"api",
False
)
raise HTTPException(status_code=500, detail=f"Failed to update settings: {str(e)}")
# Serve static files
app.mount("/", StaticFiles(directory="../public", html=True), name="static")
@ -553,91 +610,173 @@ async def init_db():
# Load settings from database
async def load_settings():
"""Load settings and initialize components based on settings"""
"""Load settings from database"""
try:
# Use existing get_settings function
settings_dict = await get_settings()
settings = Settings(**settings_dict)
# Store settings in app state for easy access
app.state.current_settings = settings
# Validate settings
is_valid, errors = validate_settings(settings)
if not is_valid:
logger.warning(f"Invalid settings detected: {', '.join(errors)}")
logger.info("Using default settings")
settings = Settings() # Use defaults
# Configure MQTT based on settings
if settings.mqtt:
# Update environment variables
# Store in app state
app.state.current_settings = settings
return settings
except Exception as e:
logger.error(f"Failed to load settings: {e}", exc_info=True)
# Return default settings on error
default_settings = Settings()
app.state.current_settings = default_settings
return default_settings
def validate_settings(settings: Settings) -> tuple[bool, list[str]]:
"""Validate settings and return (is_valid, error_messages)"""
errors = []
# Validate time settings
if settings.maxOpenTimeSeconds < 0:
errors.append("maxOpenTimeSeconds must be positive")
if settings.triggerDuration < 100: # Min 100ms
errors.append("triggerDuration must be at least 100ms")
# Validate GPIO settings
if settings.gpio.gatePin < 0 or settings.gpio.statusPin < 0:
errors.append("GPIO pins must be positive")
if settings.gpio.gatePin == settings.gpio.statusPin:
errors.append("Gate and status pins must be different")
# Validate MQTT settings if enabled
if settings.mqtt.enabled:
if not settings.mqtt.broker:
errors.append("MQTT broker required when MQTT is enabled")
if not settings.mqtt.port:
errors.append("MQTT port required when MQTT is enabled")
return len(errors) == 0, errors
async def init_mqtt(settings: Settings):
"""Initialize MQTT if enabled in settings"""
if settings.mqtt.enabled:
try:
logger.info("MQTT enabled, initializing connection...")
os.environ["MQTT_BROKER"] = settings.mqtt.broker
os.environ["MQTT_PORT"] = settings.mqtt.port
os.environ["MQTT_USERNAME"] = settings.mqtt.username
os.environ["MQTT_PASSWORD"] = settings.mqtt.password
os.environ["MQTT_CLIENT_ID"] = settings.mqtt.clientId
# Enable/disable MQTT based on settings
if settings.mqtt.enabled:
logger.info("MQTT enabled in settings, initializing connection")
ha_mqtt.enable()
else:
logger.info("MQTT disabled in settings")
# Set other application settings
os.environ["MAX_OPEN_TIME_SECONDS"] = str(settings.maxOpenTimeSeconds)
os.environ["TRIGGER_DURATION"] = str(settings.triggerDuration)
# Configure logging
setup_logging(settings)
# Try to enable MQTT with timeout
try:
async with asyncio.timeout(10): # 10 second timeout
ha_mqtt.enable()
logger.info("MQTT initialized successfully")
except asyncio.TimeoutError:
logger.error("MQTT initialization timed out")
return False
except Exception as e:
logger.error(f"Failed to initialize MQTT: {e}", exc_info=True)
return False
else:
logger.info("MQTT disabled in settings")
return True
logger.info(f"Settings loaded successfully (Max Open Time: {settings.maxOpenTimeSeconds}s, "
f"Trigger Duration: {settings.triggerDuration}ms, "
f"Gate Pin: {settings.gpio.gatePin}, Status Pin: {settings.gpio.statusPin})")
return settings
async def publish_mqtt_state(state: bool) -> bool:
"""Publish state to MQTT with error handling"""
if not app.state.current_settings.mqtt.enabled:
return True
try:
async with asyncio.timeout(5): # 5 second timeout
await ha_mqtt.publish_state(state)
return True
except (asyncio.TimeoutError, Exception) as e:
logger.error(f"Failed to publish MQTT state: {e}", exc_info=True)
return False
async def start_background_tasks():
"""Start all background monitoring tasks"""
logger.info("Starting background tasks...")
global gate_monitor_running, auto_close_running
gate_monitor_running = True
auto_close_running = True
app.state.status_task = asyncio.create_task(update_gate_status())
app.state.auto_close_task = asyncio.create_task(check_auto_close())
logger.info("Background tasks started")
async def validate_startup_state() -> tuple[bool, list[str]]:
"""Validate application startup state"""
errors = []
# Check database
try:
async with get_db() as db:
await db.execute("SELECT 1")
except Exception as e:
logger.warning(f"Failed to load settings: {e}", exc_info=True)
logger.info("Using default settings")
default_settings = Settings()
app.state.current_settings = default_settings
return default_settings
errors.append(f"Database not accessible: {e}")
# Check GPIO
try:
settings = app.state.current_settings
if not settings:
errors.append("Settings not loaded")
else:
# Check if pins are configured
GPIO.input(settings.gpio.statusPin) # Test read
GPIO.output(settings.gpio.gatePin, GPIO.LOW) # Test write
except Exception as e:
errors.append(f"GPIO not properly configured: {e}")
# Check MQTT if enabled
if app.state.current_settings and app.state.current_settings.mqtt.enabled:
if not ha_mqtt.is_connected():
errors.append("MQTT enabled but not connected")
return len(errors) == 0, errors
@app.on_event("startup")
async def startup_event():
"""Initialize the application on startup"""
try:
logger.info("Starting Gatekeeper application")
logger.info("Starting application...")
# 1. Initialize database
logger.info("Initializing database...")
await init_db()
logger.info("Database initialized successfully")
# 2. Load settings from database
# 2. Load settings
logger.info("Loading settings...")
settings = await load_settings()
app.state.current_settings = settings
logger.info("Settings loaded successfully")
# 3. Setup GPIO
logger.info("Initializing GPIO...")
# 3. Configure logging based on settings
logger.info("Configuring logging...")
setup_logging(settings)
# 4. Configure MQTT
await init_mqtt(settings)
# 5. Initialize GPIO
logger.info("Setting up GPIO...")
setup_gpio()
logger.info("GPIO initialized successfully")
# 4. Initialize MQTT if enabled
if settings.mqtt.enabled:
logger.info("MQTT enabled, initializing connection...")
ha_mqtt.enable()
logger.info("MQTT initialized successfully")
# 6. Start background tasks
await start_background_tasks()
# 7. Validate startup state
is_valid, errors = await validate_startup_state()
if not is_valid:
logger.warning("Application started with warnings:")
for error in errors:
logger.warning(f" - {error}")
else:
logger.info("MQTT disabled in settings")
# 5. Start background tasks
logger.info("Starting background tasks...")
global gate_monitor_running, auto_close_running
gate_monitor_running = True
auto_close_running = True
app.state.status_task = asyncio.create_task(update_gate_status())
app.state.auto_close_task = asyncio.create_task(check_auto_close())
logger.info("Background tasks started successfully")
logger.info("Gatekeeper application started successfully")
logger.info("Application startup complete and validated")
except Exception as e:
logger.critical("Failed to start application", exc_info=True)
logger.error(f"Failed to start application: {e}", exc_info=True)
raise
@app.on_event("shutdown")