dlbGatekeeper/backend/main.py

481 lines
16 KiB
Python

#!/usr/bin/env python3
import os
import json
import asyncio
import aiosqlite
from datetime import datetime
import logging
from logging.handlers import RotatingFileHandler
import sys
from fastapi import FastAPI, HTTPException
from fastapi.staticfiles import StaticFiles
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import Optional, Dict, Any, List
import RPi.GPIO as GPIO
from mqtt_integration import HomeAssistantMQTT
# Configure logging
LOG_FILE = "/var/log/gatekeeper.log"
logger = logging.getLogger("gatekeeper")
logger.setLevel(logging.INFO)
# Create rotating file handler (10MB per file, keep 5 backup files)
file_handler = RotatingFileHandler(
LOG_FILE,
maxBytes=10*1024*1024, # 10MB
backupCount=5,
delay=True # Only create log file when first record is written
)
file_handler.setLevel(logging.INFO)
# Create formatter and add it to the handler
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
file_handler.setFormatter(formatter)
# Add the handler to the logger
logger.addHandler(file_handler)
# Log uncaught exceptions
def handle_exception(exc_type, exc_value, exc_traceback):
if issubclass(exc_type, KeyboardInterrupt):
# Call the default handler for keyboard interrupt
sys.__excepthook__(exc_type, exc_value, exc_traceback)
return
logger.error("Uncaught exception", exc_info=(exc_type, exc_value, exc_traceback))
# Install exception handler
sys.excepthook = handle_exception
# Initialize FastAPI
app = FastAPI()
ha_mqtt = HomeAssistantMQTT()
# Set up MQTT event logging
async def log_mqtt_event(action: str, success: bool = True):
"""Log MQTT events to the database and log file"""
logger.info(f"MQTT Event - {action} (Success: {success})")
async with aiosqlite.connect("gate.db") as db:
await db.execute(
"INSERT INTO events (timestamp, action, source, success) VALUES (?, ?, ?, ?)",
(datetime.utcnow().isoformat(), action, "MQTT", success)
)
await db.commit()
ha_mqtt.set_event_callback(log_mqtt_event)
# CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # In production, replace with specific origins
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Constants
DB_PATH = "gate.db"
RELAY_1_PIN = int(os.getenv("RELAY_1_PIN", "22")) # GPIO22 (Pin 15)
RELAY_2_PIN = int(os.getenv("RELAY_2_PIN", "5")) # GPIO5 (Pin 29)
STATUS_PIN = int(os.getenv("STATUS_PIN", "4")) # GPIO4 (Pin 7)
TRIGGER_DURATION = int(os.getenv("TRIGGER_DURATION", "500")) # 500ms default
DEFAULT_MAX_OPEN_TIME = 300 # seconds (5 minutes)
# Models
class GateEvent(BaseModel):
id: Optional[int] = None
timestamp: str
action: str
source: str
success: bool
class Settings(BaseModel):
maxOpenTimeSeconds: str
triggerDuration: str
mqtt: dict
class GateStatus(BaseModel):
isOpen: bool
lastChanged: str
# GPIO Setup
def setup_gpio():
GPIO.setwarnings(False)
GPIO.setmode(GPIO.BCM)
# Setup relays as outputs (LOW is off)
GPIO.setup(RELAY_1_PIN, GPIO.OUT)
GPIO.setup(RELAY_2_PIN, GPIO.OUT)
GPIO.output(RELAY_1_PIN, GPIO.LOW)
GPIO.output(RELAY_2_PIN, GPIO.LOW)
# Setup status pin as input with pull-down
# This means it will read LOW when floating
GPIO.setup(STATUS_PIN, GPIO.IN, pull_up_down=GPIO.PUD_DOWN)
logger.info(f"GPIO initialized: Relay 1 on GPIO{RELAY_1_PIN}, Relay 2 on GPIO{RELAY_2_PIN}, Status on GPIO{STATUS_PIN}")
# Database functions
async def init_db():
"""Initialize the SQLite database"""
async with aiosqlite.connect(DB_PATH) as db:
# Create events table
await db.execute("""
CREATE TABLE IF NOT EXISTS events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL,
action TEXT NOT NULL,
source TEXT NOT NULL,
success BOOLEAN NOT NULL
)
""")
# Create gate_status table
await db.execute("""
CREATE TABLE IF NOT EXISTS gate_status (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL
)
""")
# Create settings table
await db.execute("""
CREATE TABLE IF NOT EXISTS settings (
key TEXT PRIMARY KEY,
value TEXT NOT NULL
)
""")
# Insert default settings if they don't exist
default_settings = {
"maxOpenTimeSeconds": "300",
"triggerDuration": "500",
"mqtt": {
"broker": "localhost",
"port": "1883",
"username": "",
"password": "",
"clientId": "gatekeeper",
"enabled": False
}
}
for key, value in default_settings.items():
await db.execute(
"INSERT OR IGNORE INTO settings (key, value) VALUES (?, ?)",
(key, json.dumps(value))
)
await db.commit()
# Gate control
async def trigger_gate() -> bool:
try:
GPIO.output(RELAY_1_PIN, GPIO.HIGH)
await asyncio.sleep(TRIGGER_DURATION / 1000) # Convert to seconds
GPIO.output(RELAY_1_PIN, GPIO.LOW)
return True
except Exception as e:
logger.error(f"Error triggering gate: {e}")
return False
last_open_time = None
async def check_auto_close():
"""Check if gate has been open too long and close it if needed"""
global last_open_time
while True:
try:
if GPIO.input(STATUS_PIN) == GPIO.HIGH: # Gate is open
current_time = datetime.now()
# Initialize last_open_time if gate is open and time not set
if last_open_time is None:
last_open_time = current_time
# Get max open time from settings
async with aiosqlite.connect(DB_PATH) as db:
cursor = await db.execute("SELECT value FROM settings WHERE key = 'maxOpenTimeSeconds'")
row = await cursor.fetchone()
max_open_time = int(json.loads(row[0])) if row else DEFAULT_MAX_OPEN_TIME
# Check if gate has been open too long
if (current_time - last_open_time).total_seconds() > max_open_time:
logger.warning(f"Gate has been open for more than {max_open_time} seconds. Auto-closing...")
await trigger_gate()
timestamp = current_time.isoformat()
# Log auto-close event
async with aiosqlite.connect(DB_PATH) as db:
await db.execute(
"INSERT INTO events (timestamp, action, source, success) VALUES (?, ?, ?, ?)",
(timestamp, "auto-close", "system", True)
)
await db.commit()
else:
# Reset last_open_time when gate is closed
last_open_time = None
except Exception as e:
logger.error(f"Error in auto-close check: {e}")
await asyncio.sleep(1) # Check every second
async def update_gate_status():
"""Monitor gate status and update database when it changes"""
global last_open_time
last_status = None
while True:
try:
current_status = GPIO.input(STATUS_PIN) == GPIO.HIGH # True = OPEN, False = CLOSED
if last_status != current_status:
timestamp = datetime.now()
# Update last_open_time when gate opens
if current_status: # Gate just opened
last_open_time = timestamp
else: # Gate just closed
last_open_time = None
# Update gate_status table
async with aiosqlite.connect(DB_PATH) as db:
await db.execute(
"INSERT INTO gate_status (timestamp) VALUES (?)",
(timestamp.isoformat(),)
)
# Log the status change as an event
status_str = "opened" if current_status else "closed"
await db.execute(
"INSERT INTO events (timestamp, action, source, success) VALUES (?, ?, ?, ?)",
(timestamp.isoformat(), f"gate {status_str}", "sensor", True)
)
await db.commit()
# Update Home Assistant via MQTT
await ha_mqtt.publish_state(current_status)
last_status = current_status
logger.info(f"Gate status changed to: {'open' if current_status else 'closed'}")
await asyncio.sleep(0.5) # Check every 500ms
except Exception as e:
logger.error(f"Error in update_gate_status: {e}")
await asyncio.sleep(5) # Wait longer on error
# MQQT Command Handler
async def handle_mqtt_command(should_open: bool):
"""Handle commands received from Home Assistant"""
try:
if should_open != (GPIO.input(STATUS_PIN) == GPIO.HIGH):
await trigger_gate()
except Exception as e:
logger.error(f"Error handling MQTT command: {e}")
# API Routes
@app.post("/api/trigger")
async def trigger():
success = await trigger_gate()
timestamp = datetime.now().isoformat()
current_status = GPIO.input(STATUS_PIN) == GPIO.HIGH
action = "trigger gate"
async with aiosqlite.connect(DB_PATH) as db:
await db.execute(
"INSERT INTO events (timestamp, action, source, success) VALUES (?, ?, ?, ?)",
(timestamp, action, "api", success)
)
await db.commit()
return {"success": success, "currentStatus": current_status}
@app.get("/api/status")
async def get_status():
"""Get current gate status"""
is_open = GPIO.input(STATUS_PIN) == GPIO.HIGH
# Get last status change time
async with aiosqlite.connect(DB_PATH) as db:
cursor = await db.execute(
"SELECT timestamp FROM gate_status ORDER BY timestamp DESC LIMIT 1"
)
result = await cursor.fetchone()
last_changed = result[0] if result else datetime.now().isoformat()
return {
"isOpen": is_open,
"lastChanged": last_changed
}
@app.get("/api/events")
async def get_events(limit: int = 10, offset: int = 0):
"""Get recent gate events with pagination"""
async with aiosqlite.connect(DB_PATH) as db:
db.row_factory = aiosqlite.Row
# Get total count
cursor = await db.execute("SELECT COUNT(*) as count FROM events")
row = await cursor.fetchone()
total_count = row['count']
# Get paginated events
cursor = await db.execute(
"""
SELECT * FROM events
ORDER BY timestamp DESC
LIMIT ? OFFSET ?
""",
(limit, offset)
)
events = await cursor.fetchall()
return {
"events": [dict(event) for event in events],
"total": total_count,
"hasMore": (offset + limit) < total_count
}
@app.get("/api/settings")
async def get_settings():
"""Get current settings"""
async with aiosqlite.connect(DB_PATH) as db:
cursor = await db.execute("SELECT key, value FROM settings")
rows = await cursor.fetchall()
settings = {}
for key, value in rows:
settings[key] = json.loads(value)
return {
"maxOpenTimeSeconds": settings.get("maxOpenTimeSeconds", "300"),
"triggerDuration": settings.get("triggerDuration", "500"),
"mqtt": settings.get("mqtt", {
"broker": "localhost",
"port": "1883",
"username": "",
"password": "",
"clientId": "gatekeeper",
"enabled": False
})
}
@app.post("/api/settings")
async def update_settings(settings: Settings):
"""Update settings"""
try:
async with aiosqlite.connect(DB_PATH) as db:
# Update each setting
for key, value in settings.dict().items():
await db.execute(
"INSERT OR REPLACE INTO settings (key, value) VALUES (?, ?)",
(key, json.dumps(value))
)
await db.commit()
# Update environment variables and MQTT connection
if settings.mqtt:
os.environ["MQTT_BROKER"] = settings.mqtt["broker"]
os.environ["MQTT_PORT"] = settings.mqtt["port"]
os.environ["MQTT_USERNAME"] = settings.mqtt["username"]
os.environ["MQTT_PASSWORD"] = settings.mqtt["password"]
os.environ["MQTT_CLIENT_ID"] = settings.mqtt["clientId"]
# Enable/disable MQTT based on settings
ha_mqtt.enable(settings.mqtt.get("enabled", False))
# Log settings update event with details
changes = []
if settings.maxOpenTimeSeconds:
changes.append(f"Max Open Time: {settings.maxOpenTimeSeconds}s")
if settings.triggerDuration:
changes.append(f"Trigger Duration: {settings.triggerDuration}ms")
if settings.mqtt:
mqtt_status = "Enabled" if settings.mqtt.get("enabled") else "Disabled"
changes.append(f"MQTT: {mqtt_status}")
if settings.mqtt.get("enabled"):
changes.append(f"Broker: {settings.mqtt['broker']}:{settings.mqtt['port']}")
await db.execute(
"INSERT INTO events (timestamp, action, source, success) VALUES (?, ?, ?, ?)",
(
datetime.utcnow().isoformat(),
f"Settings Updated ({'; '.join(changes)})",
"Settings",
True
)
)
await db.commit()
return {"success": True}
except Exception as e:
# Log failure event
async with aiosqlite.connect(DB_PATH) as db:
await db.execute(
"INSERT INTO events (timestamp, action, source, success) VALUES (?, ?, ?, ?)",
(
datetime.utcnow().isoformat(),
f"Settings Update Failed: {str(e)}",
"Settings",
False
)
)
await db.commit()
raise HTTPException(status_code=500, detail=str(e))
# Serve static files
app.mount("/", StaticFiles(directory="../public", html=True), name="static")
# Background task for monitoring gate status
@app.on_event("startup")
async def startup_event():
"""Initialize the application on startup"""
logger.info("Starting Gatekeeper application")
try:
# Initialize database
await init_db()
logger.info("Database initialized successfully")
# Setup GPIO
setup_gpio()
logger.info("GPIO initialized successfully")
# Initialize MQTT from settings
settings = await get_settings()
if settings.mqtt and settings.mqtt.get("enabled"):
logger.info("MQTT enabled in settings, initializing connection")
ha_mqtt.enable()
else:
logger.info("MQTT disabled in settings")
# Start background tasks
app.state.status_task = asyncio.create_task(update_gate_status())
app.state.auto_close_task = asyncio.create_task(check_auto_close())
except Exception as e:
logger.error(f"Startup error: {str(e)}", exc_info=True)
raise
@app.on_event("shutdown")
async def shutdown_event():
"""Cleanup on shutdown"""
logger.info("Shutting down Gatekeeper application")
try:
# Cancel background tasks
if hasattr(app.state, "status_task"):
app.state.status_task.cancel()
if hasattr(app.state, "auto_close_task"):
app.state.auto_close_task.cancel()
# Disconnect MQTT
await ha_mqtt.disconnect()
# Cleanup GPIO
GPIO.cleanup()
logger.info("Cleanup completed successfully")
except Exception as e:
logger.error(f"Shutdown error: {str(e)}", exc_info=True)
raise