#!/usr/bin/env python3 import os import RPi.GPIO as GPIO from fastapi import FastAPI, HTTPException from fastapi.middleware.cors import CORSMiddleware from fastapi.staticfiles import StaticFiles from pydantic import BaseModel import aiosqlite from datetime import datetime import asyncio from typing import List, Optional import logging # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # Initialize FastAPI app = FastAPI() # CORS middleware app.add_middleware( CORSMiddleware, allow_origins=["*"], # In production, replace with specific origins allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Constants DB_PATH = "gate.db" RELAY_1_PIN = int(os.getenv("RELAY_1_PIN", "22")) # GPIO22 (Pin 15) RELAY_2_PIN = int(os.getenv("RELAY_2_PIN", "5")) # GPIO5 (Pin 29) STATUS_PIN = int(os.getenv("STATUS_PIN", "4")) # GPIO4 (Pin 7) TRIGGER_DURATION = int(os.getenv("TRIGGER_DURATION", "500")) # 500ms default # Models class GateEvent(BaseModel): id: Optional[int] = None timestamp: str action: str source: str success: bool class Settings(BaseModel): maxOpenTimeSeconds: str # Open time in seconds triggerDuration: str # Trigger duration in milliseconds class GateStatus(BaseModel): isOpen: bool lastChanged: str # GPIO Setup def setup_gpio(): GPIO.setwarnings(False) GPIO.setmode(GPIO.BCM) # Setup relays as outputs (LOW is off) GPIO.setup(RELAY_1_PIN, GPIO.OUT) GPIO.setup(RELAY_2_PIN, GPIO.OUT) GPIO.output(RELAY_1_PIN, GPIO.LOW) GPIO.output(RELAY_2_PIN, GPIO.LOW) # Setup status pin as input with pull-down # This means it will read LOW when floating GPIO.setup(STATUS_PIN, GPIO.IN, pull_up_down=GPIO.PUD_DOWN) logger.info(f"GPIO initialized: Relay 1 on GPIO{RELAY_1_PIN}, Relay 2 on GPIO{RELAY_2_PIN}, Status on GPIO{STATUS_PIN}") # Database functions async def init_db(): async with aiosqlite.connect(DB_PATH) as db: await db.execute(""" CREATE TABLE IF NOT EXISTS events ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp TEXT NOT NULL, action TEXT NOT NULL, source TEXT NOT NULL, success BOOLEAN NOT NULL ) """) await db.execute(""" CREATE TABLE IF NOT EXISTS settings ( id INTEGER PRIMARY KEY AUTOINCREMENT, max_open_time TEXT NOT NULL, trigger_duration TEXT NOT NULL ) """) await db.execute(""" CREATE TABLE IF NOT EXISTS gate_status ( timestamp TEXT NOT NULL ) """) # Insert default settings if they don't exist await db.execute(""" INSERT OR IGNORE INTO settings (max_open_time, trigger_duration) VALUES ('300000', '500') """) await db.commit() # Gate control async def trigger_gate() -> bool: try: GPIO.output(RELAY_1_PIN, GPIO.HIGH) await asyncio.sleep(TRIGGER_DURATION / 1000) # Convert to seconds GPIO.output(RELAY_1_PIN, GPIO.LOW) return True except Exception as e: logger.error(f"Error triggering gate: {e}") return False async def update_gate_status(): """Monitor gate status and update database when it changes""" last_status = None while True: try: # HIGH (3.3V) = OPEN, LOW (0V) = CLOSED current_status = GPIO.input(STATUS_PIN) == GPIO.HIGH # True = OPEN, False = CLOSED if current_status != last_status: timestamp = datetime.now().isoformat() async with aiosqlite.connect(DB_PATH) as db: await db.execute( "INSERT INTO gate_status (timestamp) VALUES (?)", (timestamp,) ) await db.commit() # Log the status change as an event status_str = "opened" if current_status else "closed" await db.execute( "INSERT INTO events (timestamp, action, source, success) VALUES (?, ?, ?, ?)", (timestamp, f"gate {status_str}", "sensor", True) ) await db.commit() last_status = current_status logger.info(f"Gate status changed to: {'open' if current_status else 'closed'}") except Exception as e: logger.error(f"Error monitoring gate status: {e}") await asyncio.sleep(0.1) # Check every 100ms # API Routes @app.post("/api/trigger") async def trigger(): success = await trigger_gate() timestamp = datetime.now().isoformat() current_status = GPIO.input(STATUS_PIN) == GPIO.HIGH action = "trigger gate" async with aiosqlite.connect(DB_PATH) as db: await db.execute( "INSERT INTO events (timestamp, action, source, success) VALUES (?, ?, ?, ?)", (timestamp, action, "api", success) ) await db.commit() return {"success": success, "currentStatus": current_status} @app.get("/api/status") async def get_status(): # HIGH (3.3V) = OPEN, LOW (0V) = CLOSED current_status = GPIO.input(STATUS_PIN) == GPIO.HIGH # True = OPEN, False = CLOSED # Get the last status change time async with aiosqlite.connect(DB_PATH) as db: cursor = await db.execute( "SELECT timestamp FROM gate_status ORDER BY timestamp DESC LIMIT 1" ) row = await cursor.fetchone() last_changed = row[0] if row else datetime.now().isoformat() return { "isOpen": current_status, "lastChanged": last_changed } @app.get("/api/events") async def get_events(limit: int = 10): async with aiosqlite.connect(DB_PATH) as db: db.row_factory = aiosqlite.Row cursor = await db.execute( "SELECT * FROM events ORDER BY timestamp DESC LIMIT ?", (limit,) ) rows = await cursor.fetchall() return [dict(row) for row in rows] @app.get("/api/settings") async def get_settings(): async with aiosqlite.connect(DB_PATH) as db: cursor = await db.execute("SELECT max_open_time, trigger_duration FROM settings ORDER BY id DESC LIMIT 1") row = await cursor.fetchone() if row: max_open_time_ms, trigger_duration = row # Convert milliseconds to seconds for maxOpenTime return {"maxOpenTimeSeconds": str(int(max_open_time_ms) // 1000), "triggerDuration": str(trigger_duration)} return {"maxOpenTimeSeconds": "300", "triggerDuration": "500"} @app.post("/api/settings") async def update_settings(settings: Settings): try: # Convert seconds to milliseconds for storage max_open_time_ms = int(settings.maxOpenTimeSeconds) * 1000 trigger_duration = int(settings.triggerDuration) async with aiosqlite.connect(DB_PATH) as db: await db.execute( "INSERT INTO settings (max_open_time, trigger_duration) VALUES (?, ?)", (str(max_open_time_ms), str(trigger_duration)) ) await db.commit() return {"success": True} except Exception as e: logger.error(f"Error updating settings: {e}") raise HTTPException(status_code=500, detail="Failed to update settings") # Serve static files app.mount("/", StaticFiles(directory="../public", html=True), name="static") # Background task for monitoring gate status @app.on_event("startup") async def startup_event(): setup_gpio() await init_db() asyncio.create_task(update_gate_status()) logger.info("Application started successfully") # Shutdown event @app.on_event("shutdown") async def shutdown_event(): GPIO.cleanup() logger.info("Application shutdown complete")