dlbGatekeeper/backend/main.py

671 lines
24 KiB
Python

#!/usr/bin/env python3
import os
import json
import asyncio
import aiosqlite
from datetime import datetime
import logging
from logging.handlers import RotatingFileHandler
import sys
from fastapi import FastAPI, HTTPException
from fastapi.staticfiles import StaticFiles
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import Optional, Dict, Any, List, Union
import RPi.GPIO as GPIO
from mqtt_integration import HomeAssistantMQTT
# Configure logging
LOG_FILE = "/var/log/gatekeeper.log"
logger = logging.getLogger("gatekeeper")
# Models
class GateEvent(BaseModel):
id: Optional[int] = None
timestamp: str
action: str
source: str
success: bool
class MQTTSettings(BaseModel):
broker: str = "localhost"
port: str = "1883"
username: str = ""
password: str = ""
clientId: str = "gatekeeper"
enabled: bool = False
class GPIOSettings(BaseModel):
gatePin: int = 17 # Default GPIO pin for gate control
statusPin: int = 27 # Default GPIO pin for gate status
class LoggingSettings(BaseModel):
level: str = "WARNING" # Default to WARNING level
maxBytes: int = 10 * 1024 * 1024 # 10MB default
backupCount: int = 5 # Keep 5 backup files by default
class Settings(BaseModel):
maxOpenTimeSeconds: int = 300 # Default 5 minutes
triggerDuration: int = 500 # Default 500ms
mqtt: MQTTSettings = MQTTSettings()
gpio: GPIOSettings = GPIOSettings()
logging: LoggingSettings = LoggingSettings()
class GateStatus(BaseModel):
isOpen: bool
lastChanged: str
# Configure logging
def setup_logging(settings: Settings):
"""Configure logging based on settings"""
log_level = getattr(logging, settings.logging.level.upper(), logging.WARNING)
logger.setLevel(log_level)
# Remove existing handlers
for handler in logger.handlers[:]:
logger.removeHandler(handler)
# Create rotating file handler
file_handler = RotatingFileHandler(
LOG_FILE,
maxBytes=settings.logging.maxBytes,
backupCount=settings.logging.backupCount,
delay=True # Only create log file when first record is written
)
file_handler.setLevel(log_level)
# Create formatter and add it to the handler
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
file_handler.setFormatter(formatter)
# Add the handler to the logger
logger.addHandler(file_handler)
logger.debug("Logging configured with level: %s", settings.logging.level)
# Log uncaught exceptions
def handle_exception(exc_type, exc_value, exc_traceback):
if issubclass(exc_type, KeyboardInterrupt):
# Call the default handler for keyboard interrupt
sys.__excepthook__(exc_type, exc_value, exc_traceback)
return
logger.error("Uncaught exception", exc_info=(exc_type, exc_value, exc_traceback))
# Install exception handler
sys.excepthook = handle_exception
# Initialize FastAPI
app = FastAPI()
ha_mqtt = HomeAssistantMQTT()
# Constants
DB_PATH = "gatekeeper.db" # Database file path
# Database connection handling
async def get_db():
"""Get a new database connection"""
db = await aiosqlite.connect(DB_PATH)
db.row_factory = aiosqlite.Row
return db
# Set up MQTT event logging
async def log_mqtt_event(action: str, success: bool = True):
"""Log MQTT events to the database and log file"""
logger.info(f"MQTT Event - {action} (Success: {success})")
async with await get_db() as db:
await db.execute(
"INSERT INTO events (timestamp, action, source, success) VALUES (?, ?, ?, ?)",
(datetime.utcnow().isoformat(), action, "MQTT", success)
)
await db.commit()
ha_mqtt.set_event_callback(log_mqtt_event)
# CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # In production, replace with specific origins
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Gate control
async def trigger_gate() -> bool:
try:
settings = app.state.current_settings
if not settings:
logger.warning("No settings available, using default settings")
settings = Settings()
gate_pin = settings.gpio.gatePin
GPIO.output(gate_pin, GPIO.HIGH)
await asyncio.sleep(settings.triggerDuration / 1000) # Convert ms to seconds
GPIO.output(gate_pin, GPIO.LOW)
return True
except Exception as e:
logger.error(f"Error triggering gate: {e}")
return False
last_open_time = None
gate_monitor_running = False
auto_close_running = False
async def update_gate_status():
"""Monitor gate status and update database when it changes"""
global gate_monitor_running
if gate_monitor_running:
logger.warning("Gate status monitor already running, skipping...")
return
gate_monitor_running = True
logger.info("Starting gate status monitoring task")
try:
settings = app.state.current_settings
if not settings:
logger.warning("No settings available, using default settings")
settings = Settings()
status_pin = settings.gpio.statusPin
last_status = None
consecutive_errors = 0
while True:
try:
if not gate_monitor_running:
logger.info("Gate status monitor stopped")
break
current_status = GPIO.input(status_pin) == GPIO.HIGH
if last_status != current_status:
timestamp = datetime.now()
logger.info(f"Gate status changed to: {'open' if current_status else 'closed'}")
logger.debug("Updating database with new status")
async with await get_db() as db:
await db.execute(
"INSERT INTO gate_status (timestamp) VALUES (?)",
(timestamp.isoformat(),)
)
await db.execute(
"INSERT INTO events (timestamp, action, source, success) VALUES (?, ?, ?, ?)",
(timestamp.isoformat(), f"gate {'opened' if current_status else 'closed'}", "sensor", True)
)
await db.commit()
await ha_mqtt.publish_state(current_status)
last_status = current_status
consecutive_errors = 0
else:
logger.debug(f"Gate status unchanged: {'open' if current_status else 'closed'}")
await asyncio.sleep(0.5)
except Exception as e:
consecutive_errors += 1
wait_time = min(30, 2 ** consecutive_errors)
logger.error(f"Error in update_gate_status (attempt {consecutive_errors}): {e}", exc_info=True)
logger.warning(f"Retrying in {wait_time} seconds...")
await asyncio.sleep(wait_time)
finally:
gate_monitor_running = False
logger.info("Gate status monitor stopped")
async def check_auto_close():
"""Check if gate has been open too long and close it if needed"""
global last_open_time, auto_close_running
if auto_close_running:
logger.warning("Auto-close monitor already running, skipping...")
return
auto_close_running = True
logger.info("Starting auto-close monitoring task")
try:
settings = app.state.current_settings
if not settings:
logger.warning("No settings available, using default settings")
settings = Settings()
status_pin = settings.gpio.statusPin
consecutive_errors = 0
while True:
try:
if not auto_close_running:
logger.info("Auto-close monitor stopped")
break
if GPIO.input(status_pin) == GPIO.HIGH: # Gate is open
current_time = datetime.now()
if last_open_time is None:
last_open_time = current_time
logger.debug("Gate opened, starting timer")
time_open = (current_time - last_open_time).total_seconds()
logger.debug(f"Gate has been open for {time_open:.1f} seconds")
if time_open > settings.maxOpenTimeSeconds:
logger.warning(f"Gate has been open for {time_open:.1f} seconds, auto-closing")
timestamp = current_time.isoformat()
async with await get_db() as db:
await db.execute(
"INSERT INTO events (timestamp, action, source, success) VALUES (?, ?, ?, ?)",
(timestamp, "auto-close", "system", True)
)
await db.commit()
await trigger_gate()
last_open_time = None
logger.info("Gate auto-closed successfully")
else:
if last_open_time is not None:
logger.debug("Gate is now closed, resetting timer")
last_open_time = None
consecutive_errors = 0
await asyncio.sleep(1)
except Exception as e:
consecutive_errors += 1
wait_time = min(30, 2 ** consecutive_errors)
logger.error(f"Error in check_auto_close (attempt {consecutive_errors}): {e}", exc_info=True)
logger.warning(f"Retrying in {wait_time} seconds...")
await asyncio.sleep(wait_time)
finally:
auto_close_running = False
logger.info("Auto-close monitor stopped")
# MQQT Command Handler
async def handle_mqtt_command(should_open: bool):
"""Handle commands received from Home Assistant"""
try:
settings = app.state.current_settings
if not settings:
logger.warning("No settings available, using default settings")
settings = Settings()
status_pin = settings.gpio.statusPin
if should_open != (GPIO.input(status_pin) == GPIO.HIGH):
await trigger_gate()
except Exception as e:
logger.error(f"Error handling MQTT command: {e}")
# API Routes
@app.post("/api/trigger")
async def trigger():
"""Trigger the gate"""
try:
success = await trigger_gate()
timestamp = datetime.now().isoformat()
# Get current status after trigger
settings = app.state.current_settings or Settings()
current_status = GPIO.input(settings.gpio.statusPin) == GPIO.HIGH
# Log event
async with await get_db() as db:
await db.execute(
"INSERT INTO events (timestamp, action, source, success) VALUES (?, ?, ?, ?)",
(timestamp, "trigger gate", "api", success)
)
await db.commit()
return {"success": success, "timestamp": timestamp, "isOpen": current_status}
except Exception as e:
logger.error("Error triggering gate", exc_info=True)
raise HTTPException(status_code=500, detail="Failed to trigger gate")
@app.get("/api/status")
async def get_status():
"""Get current gate status"""
try:
settings = app.state.current_settings or Settings()
is_open = GPIO.input(settings.gpio.statusPin) == GPIO.HIGH
async with await get_db() as db:
cursor = await db.execute(
"SELECT timestamp FROM gate_status ORDER BY timestamp DESC LIMIT 1"
)
row = await cursor.fetchone()
last_changed = row[0] if row else datetime.now().isoformat()
return {"isOpen": is_open, "lastChanged": last_changed}
except Exception as e:
logger.error("Error getting gate status", exc_info=True)
raise HTTPException(status_code=500, detail="Failed to get gate status")
@app.get("/api/events")
async def get_events(limit: int = 10, offset: int = 0):
"""Get recent gate events with pagination"""
async with await get_db() as db:
db.row_factory = aiosqlite.Row
# Get total count
cursor = await db.execute("SELECT COUNT(*) as count FROM events")
row = await cursor.fetchone()
total_count = row['count']
# Get paginated events
cursor = await db.execute(
"""
SELECT * FROM events
ORDER BY timestamp DESC
LIMIT ? OFFSET ?
""",
(limit, offset)
)
events = await cursor.fetchall()
return {
"events": [dict(event) for event in events],
"total": total_count,
"hasMore": (offset + limit) < total_count
}
@app.get("/api/settings")
async def get_settings():
"""Get current settings"""
async with await get_db() as db:
cursor = await db.execute("SELECT key, value FROM settings")
rows = await cursor.fetchall()
settings = {}
for key, value in rows:
settings[key] = json.loads(value)
return {
"maxOpenTimeSeconds": settings.get("maxOpenTimeSeconds", "300"),
"triggerDuration": settings.get("triggerDuration", "500"),
"mqtt": settings.get("mqtt", {
"broker": "localhost",
"port": "1883",
"username": "",
"password": "",
"clientId": "gatekeeper",
"enabled": False
}),
"gpio": settings.get("gpio", {
"gatePin": 17,
"statusPin": 27
}),
"logging": settings.get("logging", {
"level": "WARNING",
"maxBytes": 10 * 1024 * 1024,
"backupCount": 5
})
}
@app.post("/api/settings")
async def update_settings(settings: Settings):
"""Update settings"""
try:
async with await get_db() as db:
# Update each setting
for key, value in settings.dict().items():
await db.execute(
"INSERT OR REPLACE INTO settings (key, value) VALUES (?, ?)",
(key, json.dumps(value))
)
await db.commit()
# Update environment variables and MQTT connection
if settings.mqtt:
# Update environment variables
os.environ["MQTT_BROKER"] = settings.mqtt["broker"]
os.environ["MQTT_PORT"] = settings.mqtt["port"]
os.environ["MQTT_USERNAME"] = settings.mqtt["username"]
os.environ["MQTT_PASSWORD"] = settings.mqtt["password"]
os.environ["MQTT_CLIENT_ID"] = settings.mqtt["clientId"]
# Enable/disable MQTT based on settings
ha_mqtt.enable(settings.mqtt.get("enabled", False))
# Log settings update event with details
changes = []
if settings.maxOpenTimeSeconds:
changes.append(f"Max Open Time: {settings.maxOpenTimeSeconds}s")
if settings.triggerDuration:
changes.append(f"Trigger Duration: {settings.triggerDuration}ms")
if settings.mqtt:
mqtt_status = "Enabled" if settings.mqtt.get("enabled") else "Disabled"
changes.append(f"MQTT: {mqtt_status}")
if settings.mqtt.get("enabled"):
changes.append(f"Broker: {settings.mqtt['broker']}:{settings.mqtt['port']}")
await db.execute(
"INSERT INTO events (timestamp, action, source, success) VALUES (?, ?, ?, ?)",
(
datetime.utcnow().isoformat(),
f"Settings Updated ({'; '.join(changes)})",
"Settings",
True
)
)
await db.commit()
# Update logging configuration
setup_logging(settings)
return {"success": True}
except Exception as e:
# Log failure event
async with await get_db() as db:
await db.execute(
"INSERT INTO events (timestamp, action, source, success) VALUES (?, ?, ?, ?)",
(
datetime.utcnow().isoformat(),
f"Settings Update Failed: {str(e)}",
"Settings",
False
)
)
await db.commit()
raise HTTPException(status_code=500, detail=str(e))
# Serve static files
app.mount("/", StaticFiles(directory="../public", html=True), name="static")
# GPIO Setup
def setup_gpio():
"""Initialize GPIO pins based on settings"""
GPIO.setwarnings(False)
GPIO.setmode(GPIO.BCM)
try:
settings = app.state.current_settings
if not settings:
logger.warning("No settings available, using default settings")
settings = Settings()
# Setup gate control pin
gate_pin = settings.gpio.gatePin
GPIO.setup(gate_pin, GPIO.OUT)
GPIO.output(gate_pin, GPIO.LOW)
# Setup status pin if needed
status_pin = settings.gpio.statusPin
GPIO.setup(status_pin, GPIO.IN, pull_up_down=GPIO.PUD_UP)
logger.info(f"GPIO initialized (Gate Pin: {gate_pin}, Status Pin: {status_pin})")
except Exception as e:
logger.error(f"Failed to setup GPIO: {e}", exc_info=True)
raise
# Database functions
async def init_db():
"""Initialize the SQLite database"""
async with await get_db() as db:
# Create events table
await db.execute("""
CREATE TABLE IF NOT EXISTS events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL,
action TEXT NOT NULL,
source TEXT NOT NULL,
success BOOLEAN NOT NULL
)
""")
# Create gate_status table
await db.execute("""
CREATE TABLE IF NOT EXISTS gate_status (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL
)
""")
# Create settings table
await db.execute("""
CREATE TABLE IF NOT EXISTS settings (
key TEXT PRIMARY KEY,
value TEXT NOT NULL
)
""")
# Insert default settings if they don't exist
default_settings = Settings().dict()
for key, value in default_settings.items():
await db.execute(
"INSERT OR IGNORE INTO settings (key, value) VALUES (?, ?)",
(key, json.dumps(value))
)
await db.commit()
# Load settings from database
async def load_settings():
"""Load settings and initialize components based on settings"""
try:
settings_dict = await get_settings()
settings = Settings(**settings_dict)
# Store settings in app state for easy access
app.state.current_settings = settings
# Configure MQTT based on settings
if settings.mqtt:
# Update environment variables
os.environ["MQTT_BROKER"] = settings.mqtt.broker
os.environ["MQTT_PORT"] = settings.mqtt.port
os.environ["MQTT_USERNAME"] = settings.mqtt.username
os.environ["MQTT_PASSWORD"] = settings.mqtt.password
os.environ["MQTT_CLIENT_ID"] = settings.mqtt.clientId
# Enable/disable MQTT based on settings
if settings.mqtt.enabled:
logger.info("MQTT enabled in settings, initializing connection")
ha_mqtt.enable()
else:
logger.info("MQTT disabled in settings")
# Set other application settings
os.environ["MAX_OPEN_TIME_SECONDS"] = str(settings.maxOpenTimeSeconds)
os.environ["TRIGGER_DURATION"] = str(settings.triggerDuration)
# Configure logging
setup_logging(settings)
logger.info(f"Settings loaded successfully (Max Open Time: {settings.maxOpenTimeSeconds}s, "
f"Trigger Duration: {settings.triggerDuration}ms, "
f"Gate Pin: {settings.gpio.gatePin}, Status Pin: {settings.gpio.statusPin})")
return settings
except Exception as e:
logger.warning(f"Failed to load settings: {e}", exc_info=True)
logger.info("Using default settings")
default_settings = Settings()
app.state.current_settings = default_settings
return default_settings
@app.on_event("startup")
async def startup_event():
"""Initialize the application on startup"""
try:
logger.info("Starting Gatekeeper application")
# 1. Initialize database
logger.info("Initializing database...")
await init_db()
logger.info("Database initialized successfully")
# 2. Load settings from database
logger.info("Loading settings...")
settings = await load_settings()
app.state.current_settings = settings
logger.info("Settings loaded successfully")
# 3. Setup GPIO
logger.info("Initializing GPIO...")
setup_gpio()
logger.info("GPIO initialized successfully")
# 4. Initialize MQTT if enabled
if settings.mqtt.enabled:
logger.info("MQTT enabled, initializing connection...")
ha_mqtt.enable()
logger.info("MQTT initialized successfully")
else:
logger.info("MQTT disabled in settings")
# 5. Start background tasks
logger.info("Starting background tasks...")
global gate_monitor_running, auto_close_running
gate_monitor_running = True
auto_close_running = True
app.state.status_task = asyncio.create_task(update_gate_status())
app.state.auto_close_task = asyncio.create_task(check_auto_close())
logger.info("Background tasks started successfully")
logger.info("Gatekeeper application started successfully")
except Exception as e:
logger.critical("Failed to start application", exc_info=True)
raise
@app.on_event("shutdown")
async def shutdown_event():
"""Cleanup on shutdown"""
try:
logger.info("Shutting down Gatekeeper application")
# 1. Stop background tasks
logger.info("Stopping background tasks...")
global gate_monitor_running, auto_close_running
gate_monitor_running = False
auto_close_running = False
if hasattr(app.state, "status_task"):
app.state.status_task.cancel()
if hasattr(app.state, "auto_close_task"):
app.state.auto_close_task.cancel()
logger.info("Background tasks stopped")
# 2. Disconnect MQTT if it was enabled
if hasattr(app.state, "current_settings") and app.state.current_settings.mqtt.enabled:
logger.info("Disconnecting MQTT...")
await ha_mqtt.disconnect()
logger.info("MQTT disconnected")
# 3. Clean up GPIO
logger.info("Cleaning up GPIO...")
GPIO.cleanup()
logger.info("GPIO cleanup completed")
# 4. Close database connection
logger.info("Closing database connection...")
# No need to explicitly close as we're using new connections for each request
logger.info("Database connections will be closed automatically")
logger.info("Shutdown completed successfully")
except Exception as e:
logger.error("Error during shutdown", exc_info=True)
raise