feat: add event cleanup

- Added background task to clean up events older than 3 months
- Added index on timestamp column for faster cleanup
- Fixed success column type in events table
- Updated shutdown handler for new task
This commit is contained in:
Josh Finlay 2025-01-08 09:38:34 +10:00
parent 56d1507cf7
commit 9dc65a138f
3 changed files with 138 additions and 78 deletions

View File

@ -3,7 +3,7 @@ import os
import json
import asyncio
import aiosqlite
from datetime import datetime
from datetime import datetime, timedelta
import logging
from logging.handlers import RotatingFileHandler
import sys
@ -283,6 +283,7 @@ async def trigger_gate():
last_open_time = None
gate_monitor_running = False
auto_close_running = False
cleanup_running = False
async def start_background_tasks():
"""Start all background monitoring tasks"""
@ -291,6 +292,7 @@ async def start_background_tasks():
# Don't set global flags here, let the tasks set them
app.state.status_task = asyncio.create_task(update_gate_status())
app.state.auto_close_task = asyncio.create_task(check_auto_close())
app.state.cleanup_task = asyncio.create_task(cleanup_old_events())
logger.info("Background tasks started")
async def update_gate_status():
@ -400,6 +402,50 @@ async def check_auto_close():
auto_close_running = False
logger.info("Auto-close monitor stopped")
async def cleanup_old_events():
"""Clean up events older than 3 months"""
global cleanup_running
if cleanup_running:
logger.warning("Cleanup task already running, skipping...")
return
cleanup_running = True
try:
while True:
try:
# Calculate cutoff date (3 months ago)
cutoff_date = (datetime.now() - timedelta(days=90)).isoformat()
async with aiosqlite.connect(DB_PATH) as db:
# Get count of events to be deleted
async with db.execute(
"SELECT COUNT(*) FROM events WHERE timestamp < ?",
(cutoff_date,)
) as cursor:
count = (await cursor.fetchone())[0]
if count > 0:
# Delete old events
await db.execute(
"DELETE FROM events WHERE timestamp < ?",
(cutoff_date,)
)
await db.commit()
logger.info(f"Cleaned up {count} events older than {cutoff_date}")
# Run cleanup every 24 hours
await asyncio.sleep(24 * 60 * 60)
except Exception as e:
logger.error(f"Error in cleanup task: {e}", exc_info=True)
await asyncio.sleep(60) # Wait a minute before retrying on error
finally:
cleanup_running = False
logger.info("Event cleanup task stopped")
# MQQT Command Handler
async def handle_mqtt_command(should_open: bool):
"""Handle commands received from Home Assistant"""
@ -576,43 +622,55 @@ def setup_gpio():
# Database functions
async def init_db():
"""Initialize the SQLite database"""
async with get_db() as db:
# Create events table
await db.execute("""
CREATE TABLE IF NOT EXISTS events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL,
action TEXT NOT NULL,
source TEXT NOT NULL,
success BOOLEAN NOT NULL
)
""")
try:
async with aiosqlite.connect(DB_PATH) as db:
# Create events table
await db.execute("""
CREATE TABLE IF NOT EXISTS events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL,
action TEXT NOT NULL,
source TEXT NOT NULL,
success INTEGER DEFAULT 1
)
""")
# Create gate_status table
await db.execute("""
CREATE TABLE IF NOT EXISTS gate_status (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL
)
""")
# Create index on timestamp for faster cleanup queries
await db.execute("""
CREATE INDEX IF NOT EXISTS idx_events_timestamp
ON events(timestamp)
""")
# Create settings table
await db.execute("""
CREATE TABLE IF NOT EXISTS settings (
key TEXT PRIMARY KEY,
value TEXT NOT NULL
)
""")
# Create gate_status table
await db.execute("""
CREATE TABLE IF NOT EXISTS gate_status (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL
)
""")
# Insert default settings if they don't exist
default_settings = Settings().dict()
for key, value in default_settings.items():
await db.execute(
"INSERT OR IGNORE INTO settings (key, value) VALUES (?, ?)",
(key, json.dumps(value))
)
# Create settings table
await db.execute("""
CREATE TABLE IF NOT EXISTS settings (
key TEXT PRIMARY KEY,
value TEXT NOT NULL
)
""")
await db.commit()
# Insert default settings if they don't exist
default_settings = Settings().dict()
for key, value in default_settings.items():
await db.execute(
"INSERT OR IGNORE INTO settings (key, value) VALUES (?, ?)",
(key, json.dumps(value))
)
await db.commit()
logger.info("Database initialized")
except Exception as e:
logger.error(f"Failed to initialize database: {e}", exc_info=True)
raise
# Load settings from database
async def load_settings():
@ -666,30 +724,16 @@ def validate_settings(settings: Settings) -> tuple[bool, list[str]]:
async def init_mqtt(settings: Settings):
"""Initialize MQTT if enabled in settings"""
if settings.mqtt.enabled:
try:
logger.info("MQTT enabled, initializing connection...")
os.environ["MQTT_BROKER"] = settings.mqtt.broker
os.environ["MQTT_PORT"] = settings.mqtt.port
os.environ["MQTT_USERNAME"] = settings.mqtt.username
os.environ["MQTT_PASSWORD"] = settings.mqtt.password
os.environ["MQTT_CLIENT_ID"] = settings.mqtt.clientId
# Try to enable MQTT with timeout
try:
async with asyncio.timeout(10): # 10 second timeout
ha_mqtt.enable()
logger.info("MQTT initialized successfully")
except asyncio.TimeoutError:
logger.error("MQTT initialization timed out")
return False
except Exception as e:
logger.error(f"Failed to initialize MQTT: {e}", exc_info=True)
return False
else:
logger.info("MQTT disabled in settings")
return True
try:
if settings.mqtt.enabled:
ha_mqtt.update_settings(settings.mqtt)
ha_mqtt.enable(True)
logger.info("MQTT integration enabled")
else:
ha_mqtt.enable(False)
logger.info("MQTT integration disabled")
except Exception as e:
logger.error(f"Failed to initialize MQTT: {e}")
async def publish_mqtt_state(state: bool) -> bool:
"""Publish state to MQTT with error handling"""
@ -783,14 +827,17 @@ async def shutdown_event():
# 1. Stop background tasks
logger.info("Stopping background tasks...")
global gate_monitor_running, auto_close_running
global gate_monitor_running, auto_close_running, cleanup_running
gate_monitor_running = False
auto_close_running = False
cleanup_running = False
if hasattr(app.state, "status_task"):
app.state.status_task.cancel()
if hasattr(app.state, "auto_close_task"):
app.state.auto_close_task.cancel()
if hasattr(app.state, "cleanup_task"):
app.state.cleanup_task.cancel()
logger.info("Background tasks stopped")
# 2. Disconnect MQTT if it was enabled

View File

@ -10,14 +10,12 @@ logger = logging.getLogger("gatekeeper")
class HomeAssistantMQTT:
def __init__(self):
# MQTT Configuration
self.broker = os.getenv("MQTT_BROKER", "localhost")
self.port = int(os.getenv("MQTT_PORT", "1883"))
self.username = os.getenv("MQTT_USERNAME", None)
self.password = os.getenv("MQTT_PASSWORD", None)
self.client_id = os.getenv("MQTT_CLIENT_ID", "gatekeeper")
logger.info(f"Initializing MQTT client (broker: {self.broker}:{self.port}, client_id: {self.client_id})")
# MQTT Configuration - will be updated via update_settings
self.broker = "localhost"
self.port = 1883
self.username = None
self.password = None
self.client_id = "gatekeeper"
# Home Assistant MQTT topics
self.node_id = "gatekeeper"
@ -154,7 +152,7 @@ class HomeAssistantMQTT:
if self.username and self.password:
self.client.set_auth_credentials(self.username, self.password)
# Connect to broker
logger.info(f"Connecting to MQTT broker {self.broker}:{self.port}...")
await self.client.connect(self.broker, self.port)
logger.info("Initiating connection to MQTT broker")
except Exception as e:
@ -199,3 +197,18 @@ class HomeAssistantMQTT:
def set_command_callback(self, callback: Callable):
"""Set callback for handling commands"""
self.command_callback = callback
def update_settings(self, settings):
"""Update MQTT settings"""
logger.info(f"Updating MQTT settings - Broker: {settings.broker}:{settings.port}")
self.broker = settings.broker
self.port = int(settings.port)
self.username = settings.username if settings.username else None
self.password = settings.password if settings.password else None
self.client_id = settings.clientId
# If we're connected, disconnect and reconnect with new settings
if self._connected:
logger.info("Reconnecting with new settings...")
asyncio.create_task(self.disconnect())
# Connection will be re-established by reconnect loop if enabled

View File

@ -121,12 +121,12 @@ function App() {
};
return (
<div className="min-h-screen bg-gray-100 py-6 flex flex-col justify-center sm:py-12">
<div className="relative py-3 sm:max-w-xl sm:mx-auto">
<div className="relative px-4 py-10 bg-white shadow-lg sm:rounded-3xl sm:p-20">
<div className="min-h-screen bg-gray-100 py-2 flex flex-col justify-center">
<div className="relative sm:max-w-xl sm:mx-auto">
<div className="relative px-4 py-6 bg-white shadow-lg sm:rounded-3xl sm:p-12">
<div className="max-w-md mx-auto">
<div className="divide-y divide-gray-200">
<div className="py-8 text-base leading-6 space-y-4 text-gray-700 sm:text-lg sm:leading-7">
<div className="py-4 text-base leading-6 space-y-4 text-gray-700 sm:text-lg sm:leading-7">
<div className="flex justify-between items-center mb-4">
<h1 className="text-2xl font-bold text-gray-900">Gate Control</h1>
<button
@ -172,8 +172,8 @@ function App() {
{/* Recent Events */}
<div className="mt-6">
<h2 className="text-lg font-semibold mb-2">Recent Events</h2>
<div className="border rounded-lg divide-y divide-gray-200 max-h-48 overflow-y-auto">
{events.map((event, index) => (
<div className="border rounded-lg divide-y divide-gray-200 max-h-96 overflow-y-auto">
{[...events].reverse().map((event, index) => (
<div key={index} className="px-3 py-2 hover:bg-gray-50 flex items-center justify-between text-sm">
<div className="flex-1">
<div className="flex items-center gap-2">