Files
bacchus/apps/backend/app/api/auth.py
2025-09-28 19:13:01 +02:00

293 lines
9.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from fastapi import APIRouter, Depends, HTTPException, Response, Body, Request, status
from sqlalchemy.orm import Session as DBSession
from sqlalchemy import func
from passlib.context import CryptContext
from datetime import datetime, timedelta
from collections import defaultdict
import os, hmac, hashlib
from app.core.database import get_db
from app.core.auth import (
set_session_cookie,
issue_csrf_cookie,
clear_session_cookie,
clear_csrf_cookie,
get_current_user,
SESSION_COOKIE_NAME,
)
from app.services.sessions import create_session, delete_session
from app.models.user import User
router = APIRouter(prefix="/auth", tags=["auth"])
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def verify_pin(plain_pin: str, hashed_pin: str) -> bool:
if not hashed_pin:
return False
return pwd_context.verify(plain_pin, hashed_pin)
def verify_password(plain_password: str, hashed_password: str) -> bool:
if not hashed_password:
return False
return pwd_context.verify(plain_password, hashed_password)
# -------- Rolle robust normalisieren --------
def _normalize_role(role_raw) -> str:
# Enum? -> value
if hasattr(role_raw, "value"):
role_raw = role_raw.value
# Stringifizieren
role = str(role_raw or "user").strip()
# ggf. Präfixe/Namespaces entfernen (z. B. "userrole.admin")
if "." in role:
role = role.split(".")[-1]
return role.lower()
# -----------------------------
# Throttling für Management-Login
# -----------------------------
_FAILED_LIMIT = 5
_LOCK_MINUTES = 2
_login_state = defaultdict(lambda: {"fails": 0, "lock_until": None})
def _client_ip(request: Request) -> str:
xff = request.headers.get("X-Forwarded-For")
if xff:
first = xff.split(",")[0].strip()
if first:
return first
return request.client.host if request.client else "0.0.0.0"
def _throttle_check(email_lower: str, ip: str):
st = _login_state[(email_lower, ip)]
now = datetime.utcnow()
if st["lock_until"] and now < st["lock_until"]:
seconds = int((st["lock_until"] - now).total_seconds())
raise HTTPException(
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
detail=f"Login gesperrt. Bitte {seconds} Sekunden warten.",
)
def _throttle_fail(email_lower: str, ip: str):
st = _login_state[(email_lower, ip)]
st["fails"] += 1
if st["fails"] >= _FAILED_LIMIT:
st["lock_until"] = datetime.utcnow() + timedelta(minutes=_LOCK_MINUTES)
def _throttle_reset(email_lower: str, ip: str):
_login_state[(email_lower, ip)] = {"fails": 0, "lock_until": None}
# -----------------------------
# PIN-Login: O(1) via HMAC-Lookup + IP-Rate-Limit
# -----------------------------
# Servergeheimer Pepper für HMAC über die **volle** PIN
PIN_PEPPER = os.getenv("PIN_PEPPER")
if not PIN_PEPPER or len(PIN_PEPPER) < 32:
where = os.environ.get("DOTENV_PATH") or ".env (nicht gefunden)"
raise RuntimeError(f"PIN_PEPPER env var fehlt/zu kurz .env: {where}")
def _pin_hmac(pin: str) -> str:
"""hex(HMAC_SHA256(PEPPER, pin))"""
return hmac.new(PIN_PEPPER.encode("utf-8"), pin.encode("utf-8"), hashlib.sha256).hexdigest()
# sehr einfaches IP-Rate-Limit (pro Prozess)
_pin_attempts = defaultdict(list) # ip -> [timestamps]
def _pin_rate_limit_ok(ip: str, limit: int = 10, window_sec: int = 60) -> bool:
now = datetime.utcnow().timestamp()
arr = _pin_attempts[ip]
# altes Fenster abschneiden
arr[:] = [t for t in arr if now - t < window_sec]
if len(arr) >= limit:
return False
arr.append(now)
return True
# -----------------------------
# Management-Login (E-Mail/Passwort)
# -----------------------------
@router.post("/management-login")
def management_login(
request: Request,
response: Response,
email: str = Body(..., embed=True),
password: str = Body(..., embed=True),
db: DBSession = Depends(get_db),
):
email_lower = (email or "").strip().lower()
ip = _client_ip(request)
_throttle_check(email_lower, ip)
user = db.query(User).filter(func.lower(User.email) == email_lower).one_or_none()
if not user or not getattr(user, "is_active", True):
_throttle_fail(email_lower, ip)
raise HTTPException(status_code=401, detail="Ungültige Zugangsdaten")
hashed_pw = getattr(user, "hashed_password", None) or getattr(user, "password_hash", None)
if not hashed_pw:
_throttle_fail(email_lower, ip)
raise HTTPException(status_code=401, detail="Ungültige Zugangsdaten")
if not verify_password(password, hashed_pw):
_throttle_fail(email_lower, ip)
raise HTTPException(status_code=401, detail="Ungültige Zugangsdaten")
role = _normalize_role(getattr(user, "role", "user"))
_throttle_reset(email_lower, ip)
token = create_session(db, user.id)
set_session_cookie(response, token)
issue_csrf_cookie(response)
return {
"message": "Login erfolgreich",
"user": {"id": user.id, "name": user.name, "role": role},
}
# -----------------------------
# PIN-Login (O(1) Kandidaten, dann 1× Hash-Verify)
# -----------------------------
@router.post("/pin-login")
def pin_login(
request: Request,
response: Response,
pin: str = Body(..., embed=True),
db: DBSession = Depends(get_db),
):
# IP-Rate-Limit (z. B. 10 Versuche/60s)
ip = _client_ip(request)
if not _pin_rate_limit_ok(ip):
raise HTTPException(status_code=status.HTTP_429_TOO_MANY_REQUESTS, detail="Zu viele Versuche. Bitte kurz warten.")
if not pin or not pin.isdigit() or len(pin) != 6:
# gleiche Antwortzeit für Fehler
raise HTTPException(status_code=401, detail="Ungültiger PIN")
token = _pin_hmac(pin)
# Schnellpfad: direkt per HMAC-Token Kandidaten suchen (O(1) via Index)
candidates = db.query(User).filter(
User.is_active == True,
getattr(User, "pin_lookup") == token # erwartet Spalte users.pin_lookup (VARCHAR(64))
).all()
# Normalfall: genau 1 Kandidat; bei Kollisionen (gleiche PIN) sind es wenige
matched = None
for u in candidates:
# sichere Verifikation mit dem langsamen Hash
hashed_pin = getattr(u, "hashed_pin", None) or getattr(u, "pin_hash", None)
if hashed_pin and verify_pin(pin, hashed_pin):
matched = u
break
if not matched:
# Altbestand (pin_lookup leer) langsamer Fallback: aktive Nutzer iterieren
# Hinweis: Bei vielen Nutzern besser deaktivieren und stattdessen Nutzer zur PIN-Neusetzung zwingen.
slow = db.query(User).filter(User.is_active == True).with_entities(User.id, User.hashed_pin).all()
for uid, h in slow:
if h and verify_pin(pin, h):
matched = db.query(User).get(uid) # type: ignore
if matched:
# Nachmigration: pin_lookup setzen, damit künftige Logins schnell sind
setattr(matched, "pin_lookup", token)
db.add(matched)
db.commit()
db.refresh(matched)
break
if not matched:
raise HTTPException(status_code=401, detail="Ungültiger PIN")
# Erfolg → Session-Cookie + CSRF
token_val = create_session(db, matched.id)
set_session_cookie(response, token_val)
issue_csrf_cookie(response)
return {
"message": "Login erfolgreich",
"user": {
"id": matched.id,
"name": matched.name,
"role": _normalize_role(getattr(matched, "role", "user")),
},
}
# -----------------------------
# Logout
# -----------------------------
@router.post("/logout")
def logout(
request: Request,
response: Response,
db: DBSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
token = request.cookies.get(SESSION_COOKIE_NAME)
if token:
delete_session(db, token)
clear_session_cookie(response)
clear_csrf_cookie(response)
return Response(status_code=204)
# -----------------------------
# CSRF-Token beziehen
# -----------------------------
@router.get("/csrf")
def get_csrf(response: Response):
issue_csrf_cookie(response)
return {"csrf": "ok"}
# -----------------------------
# Aktueller Nutzer
# -----------------------------
@router.get("/me")
def me(current_user: User = Depends(get_current_user)):
return current_user
# -----------------------------
# Capabilities (UI-Gating)
# -----------------------------
@router.get("/capabilities")
def get_capabilities(current_user: User = Depends(get_current_user)):
role = _normalize_role(getattr(current_user, "role", "user"))
caps_by_role = {
"user": [
"dashboard",
"profile",
"my-bookings",
"my-transactions",
"system-config",
"stats-advanced",
],
"manager": [
"dashboard",
"profile",
"my-bookings",
"my-transactions",
"products",
"deliveries",
"system-config",
"stats-advanced",
],
"admin": [
"dashboard",
"profile",
"my-bookings",
"my-transactions",
"users",
"products",
"deliveries",
"stats-advanced",
"audit-logs",
"system-config",
"transactions",
],
}
return {"role": role, "capabilities": caps_by_role.get(role, caps_by_role["user"])}