124-webapp/main.py
2025-10-07 17:48:55 +02:00

195 lines
5.9 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import shutil
import csv
from pathlib import Path
from fastapi import FastAPI, UploadFile, File, Request, Form
from fastapi.responses import HTMLResponse
from fastapi.templating import Jinja2Templates
from fastapi.staticfiles import StaticFiles
from cost_calculator import allowed_file, analyze_pdf, get_rate_black, get_rate_color, UPLOAD_FOLDER
from mailer import send_order_sync
from dotenv import load_dotenv
load_dotenv()
# Get server hostname from environment
SERVER_HOSTNAME = os.environ.get("SERVER_HOSTNAME", "einszwovier.local")
BOOKSTACK_PORT = os.environ.get("BOOKSTACK_PORT", "6875")
OPENWEBUI_PORT = os.environ.get("OPENWEBUI_PORT", "8080")
PORTAINER_PORT = os.environ.get("PORTAINER_PORT", "9000")
# Courses CSV path
COURSES_CSV = Path("data/courses.csv")
app = FastAPI()
templates = Jinja2Templates(directory="templates")
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
app.mount("/static", StaticFiles(directory="static"), name="static")
import asyncio
presence_state = {"present": False}
async def update_presence_periodically():
while True:
try:
# Here you can implement logic to update presence automatically
# For example, check some condition, a file, or even keep it False
print("Updating presence state…")
# Example: flip presence every 5 mins (just for demo)
# presence_state["present"] = not presence_state["present"]
except Exception as e:
print("Error updating presence:", e)
await asyncio.sleep(300) # 5 minutes
@app.on_event("startup")
async def startup_event():
asyncio.create_task(update_presence_periodically())
# ---- Presence State ----
presence_state = {"present": False}
@app.get("/presence")
def get_presence():
"""Return current presence state"""
return presence_state
@app.post("/presence")
def set_presence(present: bool = Form(...)):
"""Set presence state explicitly"""
presence_state["present"] = present
return {"status": "ok", "present": presence_state["present"]}
@app.post("/presence/toggle")
def toggle_presence():
"""Toggle presence state"""
presence_state["present"] = not presence_state["present"]
return {"status": "ok", "present": presence_state["present"]}
# ---- Existing Endpoints ----
@app.get("/", response_class=HTMLResponse)
async def welcome(request: Request):
return templates.TemplateResponse(
"landing.html",
{
"request": request,
"studio_open": presence_state["present"],
"opening_hours": "Di-Do 11:0016:00",
"server_hostname": SERVER_HOSTNAME,
"bookstack_port": BOOKSTACK_PORT,
"openwebui_port": OPENWEBUI_PORT,
"portainer_port": PORTAINER_PORT,
},
)
@app.get("/about", response_class=HTMLResponse)
async def about(request: Request):
# Load courses from CSV
courses = []
if COURSES_CSV.exists():
try:
with open(COURSES_CSV, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
courses = list(reader)
except Exception as e:
print(f"Error loading courses: {e}")
return templates.TemplateResponse("about.html", {
"request": request,
"courses": courses,
"bookstack_url": f"http://{SERVER_HOSTNAME}:{BOOKSTACK_PORT}"
})
@app.get("/cost", response_class=HTMLResponse)
async def cost_dashboard(request: Request):
return templates.TemplateResponse(
"cost-calculator.html",
{
"request": request,
"rate_black": get_rate_black(),
"rate_color": get_rate_color(),
},
)
@app.post("/upload")
async def upload_file(request: Request, file: UploadFile = File(...)):
if not allowed_file(file.filename):
return templates.TemplateResponse(
"cost-calculator.html",
{"request": request, "error": "Unsupported file type. Only PDF allowed."},
)
path = os.path.join(UPLOAD_FOLDER, file.filename)
with open(path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
result = analyze_pdf(path)
return templates.TemplateResponse(
"result.html",
{
"request": request,
"result": result,
"rate_black": get_rate_black(),
"rate_color": get_rate_color(),
},
)
@app.post("/send-order")
def send_order_endpoint(
request: Request,
filename: str = Form(...),
name: str = Form(...),
comment: str = Form(""),
):
path = os.path.join(UPLOAD_FOLDER, filename)
if not os.path.exists(path):
return templates.TemplateResponse(
"cost-calculator.html",
{"request": request, "error": "Datei nicht gefunden. Bitte erneut hochladen."},
)
analysis = analyze_pdf(path)
# Get Matrix room ID from environment
matrix_room = os.environ.get("MATRIX_ROOM", "!eFWbWEnYsgeIKqyfjw:einszwovier.local")
try:
send_order_sync(
pdf_path=path,
analysis=analysis,
room_id=matrix_room,
name=name,
comment=comment,
)
return templates.TemplateResponse(
"result.html",
{
"request": request,
"result": analysis,
"rate_black": get_rate_black(),
"rate_color": get_rate_color(),
"success": "✅ Dein Auftrag wurde erfolgreich gesendet!",
"name": name,
"comment": comment,
},
)
except Exception as e:
return templates.TemplateResponse(
"result.html",
{
"request": request,
"result": analysis,
"rate_black": get_rate_black(),
"rate_color": get_rate_color(),
"error": f"Fehler beim Senden des Auftrags: {e}",
"name": name,
"comment": comment,
},
)