Files
bacchus/apps/backend/app/api/stats.py
2025-09-28 19:13:01 +02:00

362 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from datetime import datetime, timedelta, timezone
from typing import Optional, List, Dict, Any, Tuple
from fastapi import APIRouter, Depends, Query
from sqlalchemy import func, and_
from sqlalchemy.orm import Session
# TZ: ZoneInfo ist vorhanden, aber es fehlen auf Windows oft die IANA-Daten.
try:
from zoneinfo import ZoneInfo
except Exception: # sehr defensive Fallbacks
ZoneInfo = None # type: ignore
from app.core.database import get_db
from app.core.auth import get_current_user, requires_role
from app.models.booking import Booking
from app.models.user import User
from app.models.product import Product
from app.models.delivery import Delivery # hat KEIN created_at bei dir
router = APIRouter(prefix="/stats", tags=["stats"])
# ----------------- Helpers -----------------
def _get_tz(tz_name: str) -> timezone:
"""
Liefert eine tzinfo. Auf Windows ohne tzdata fällt das auf UTC zurück,
statt eine ZoneInfoNotFoundError zu werfen.
"""
if ZoneInfo is not None:
try:
return ZoneInfo(tz_name) # type: ignore
except Exception:
pass
# Fallback: UTC korrekt, aber ohne CET/CEST-Offset.
return timezone.utc
def _to_naive_utc(dt: Optional[datetime]) -> Optional[datetime]:
"""
Konvertiert timezone-aware nach UTC und entfernt tzinfo.
Bei None -> None. Bei nativ-naiv wird unverändert zurückgegeben.
"""
if dt is None:
return None
if dt.tzinfo is None:
return dt
return dt.astimezone(timezone.utc).replace(tzinfo=None)
def _delivery_time_column():
"""
Ermittelt die Zeitspalte des Delivery-Modells dynamisch.
Reihenfolge: created_at | timestamp | created | delivered_at
Gibt (Column, name) oder (None, None) zurück.
"""
for name in ("created_at", "timestamp", "created", "delivered_at"):
col = getattr(Delivery, name, None)
if col is not None:
return col, name
return None, None
def _last_delivery_at(db: Session) -> Optional[datetime]:
"""
Bestimmt den Zeitpunkt der letzten Lieferung anhand der vorhandenen Spalte.
Achtung: Gibt das zurück, was in der DB liegt (aware oder naiv).
"""
col, _ = _delivery_time_column()
if col is None:
return None
return db.query(func.max(col)).scalar()
def _period_bounds(db: Session, period: str, tz_name: str = "Europe/Berlin") -> Tuple[Optional[datetime], Optional[datetime], Optional[datetime]]:
"""
Liefert (from_utc_naive, to_utc_naive, last_delivery_time_original).
Booking.timestamp ist in deiner App naiv/UTC, daher filtern wir mit naiven UTC-Grenzen.
"""
tz = _get_tz(tz_name)
now_local = datetime.now(tz)
now_utc_naive = now_local.astimezone(timezone.utc).replace(tzinfo=None)
last_delivery = _last_delivery_at(db)
if period == "last_delivery":
if last_delivery is None:
# Fallback: 30 Tage rückwärts
from_utc_naive = (now_local - timedelta(days=30)).astimezone(timezone.utc).replace(tzinfo=None)
else:
from_utc_naive = _to_naive_utc(last_delivery)
return (from_utc_naive, now_utc_naive, last_delivery)
if period == "ytd":
start_local = datetime(now_local.year, 1, 1, tzinfo=tz)
start_utc_naive = start_local.astimezone(timezone.utc).replace(tzinfo=None)
return (start_utc_naive, now_utc_naive, last_delivery)
# "all" -> keine untere Grenze
return (None, now_utc_naive, last_delivery)
# ----------------- Öffentliche Stat-Endpunkte (auth-pflichtig, aber nicht nur Admin) -----------------
@router.get("/meta")
def stats_meta(
db: Session = Depends(get_db),
_: User = Depends(get_current_user),
tz: str = Query("Europe/Berlin")
) -> Dict[str, Any]:
"""
Metadaten für die Stats-Seite:
- categories: distinct Product.category (ohne NULL)
- last_delivery_at: ISO-Zeitpunkt der letzten Lieferung (falls vorhanden)
- visible_count: Nutzer mit public_stats==true & alias gesetzt & is_active==true
- hidden_count: aktive Nutzer minus visible_count
- now: Serverzeit in tz
"""
# Kategorien
cats = [
c for (c,) in db.query(Product.category)
.filter(Product.category.isnot(None))
.distinct()
.order_by(Product.category.asc())
.all()
]
# Letzte Lieferung robust gegen fehlende Spalte
last_delivery_time = _last_delivery_at(db)
# Sichtbarkeit
visible_count = (
db.query(func.count(User.id))
.filter(
and_(
User.is_active.is_(True),
User.alias.isnot(None),
func.length(func.trim(User.alias)) > 0,
getattr(User, "public_stats", False) == True, # robust falls Migration noch nicht durch
)
)
.scalar()
or 0
)
active_count = db.query(func.count(User.id)).filter(User.is_active.is_(True)).scalar() or 0
now_local = datetime.now(_get_tz(tz))
return {
"categories": cats,
"last_delivery_at": last_delivery_time.isoformat() if last_delivery_time else None,
"visible_count": int(visible_count),
"hidden_count": int(max(0, active_count - visible_count)),
"now": now_local.isoformat(),
}
@router.get("/top-drinkers")
def top_drinkers(
period: str = Query("last_delivery", pattern="^(last_delivery|ytd|all)$"),
category: str = Query("all"),
limit: int = Query(5, ge=1, le=50),
tz: str = Query("Europe/Berlin"),
db: Session = Depends(get_db),
_: User = Depends(get_current_user),
) -> List[Dict[str, Any]]:
"""
Top-Trinker je Zeitraum und optional Kategorie.
Aggregation: Summe Booking.amount (Stückzahl), zusätzlich count(*) und SUM(total_cents).
Nur Nutzer mit Opt-in (public_stats) + Alias + aktiv.
"""
from_dt, to_dt, _ = _period_bounds(db, period, tz)
q = (
db.query(
User.id.label("user_id"),
User.alias.label("alias"),
User.avatar_url.label("avatar_url"),
func.coalesce(func.sum(Booking.amount), 0).label("amount_sum"),
func.count(Booking.id).label("count"),
func.coalesce(func.sum(Booking.total_cents), 0).label("revenue_cents"),
)
.join(User, User.id == Booking.user_id)
)
if category != "all":
q = q.join(Product, Product.id == Booking.product_id).filter(Product.category == category)
if from_dt is not None:
q = q.filter(Booking.timestamp >= from_dt)
if to_dt is not None:
q = q.filter(Booking.timestamp <= to_dt)
# Sichtbarkeitsregeln robust falls Spalte noch nicht migriert:
public_stats_col = getattr(User, "public_stats", None)
conds = [
User.is_active.is_(True),
User.alias.isnot(None),
func.length(func.trim(User.alias)) > 0,
]
if public_stats_col is not None:
conds.append(public_stats_col.is_(True))
q = q.filter(and_(*conds))
q = q.group_by(User.id, User.alias, User.avatar_url).order_by(func.coalesce(func.sum(Booking.amount), 0).desc()).limit(limit)
rows = q.all()
return [
{
"user_id": r.user_id,
"alias": r.alias,
"avatar_url": r.avatar_url,
"amount_sum": int(r.amount_sum or 0),
"count": int(r.count or 0),
"revenue_cents": int(r.revenue_cents or 0),
}
for r in rows
]
@router.get("/product-share")
def product_share(
period: str = Query("last_delivery", pattern="^(last_delivery|ytd|all)$"),
tz: str = Query("Europe/Berlin"),
db: Session = Depends(get_db),
_: User = Depends(get_current_user),
) -> Dict[str, Any]:
"""
Produktverteilung (Kreisdiagramm): COUNT(Booking.id) je Produkt im Zeitraum.
Unabhängig vom Opt-in, da keine personenbezogenen Daten.
"""
from_dt, to_dt, _ = _period_bounds(db, period, tz)
q = (
db.query(
Product.id.label("product_id"),
Product.name.label("product_name"),
func.count(Booking.id).label("count_rows"),
)
.join(Product, Product.id == Booking.product_id)
)
if from_dt is not None:
q = q.filter(Booking.timestamp >= from_dt)
if to_dt is not None:
q = q.filter(Booking.timestamp <= to_dt)
q = q.group_by(Product.id, Product.name).order_by(func.count(Booking.id).desc())
items = q.all()
total = sum(int(r.count_rows or 0) for r in items)
return {
"total": int(total),
"items": [
{
"product_id": r.product_id,
"product_name": r.product_name,
"count": int(r.count_rows or 0),
}
for r in items
],
}
# ----------------- Bestehende Admin-Stats (weiterhin verfügbar) -----------------
@router.get("/summary", dependencies=[Depends(requires_role("manager", "admin"))])
def summary(db: Session = Depends(get_db), user: User = Depends(get_current_user)):
users = db.query(func.count(User.id)).scalar() or 0
products = db.query(func.count(Product.id)).scalar() or 0
bookings = db.query(func.count(Booking.id)).scalar() or 0
total_revenue = db.query(func.coalesce(func.sum(Booking.total_cents), 0)).scalar() or 0
return {
"num_users": users,
"num_products": products,
"num_bookings": bookings,
"total_revenue_cents": int(total_revenue),
"my_balance_cents": int(user.balance_cents or 0),
"stock_overview": None,
}
@router.get("/revenue-summary", dependencies=[Depends(requires_role("manager", "admin"))])
def revenue_summary(db: Session = Depends(get_db), _: User = Depends(get_current_user)):
total_cents = db.query(func.coalesce(func.sum(Booking.total_cents), 0)).scalar() or 0
# dynamische Delivery-Zeitspalte verwenden
last_delivery_time = _last_delivery_at(db)
if last_delivery_time:
since_last_cents = (
db.query(func.coalesce(func.sum(Booking.total_cents), 0))
.filter(Booking.timestamp > _to_naive_utc(last_delivery_time))
.scalar()
or 0
)
else:
since_last_cents = total_cents
return {
"revenue_total_cents": int(total_cents),
"revenue_since_last_delivery_cents": int(since_last_cents),
"last_delivery_at": last_delivery_time.isoformat() if last_delivery_time else None,
}
# (optionale Alt-Endpunkte, falls noch genutzt)
@router.get("/consumption-per-user", dependencies=[Depends(requires_role("manager", "admin"))])
def consumption_per_user(db: Session = Depends(get_db), _: User = Depends(get_current_user)):
rows = (
db.query(
User.id.label("user_id"),
User.name,
func.coalesce(func.sum(Booking.total_cents), 0).label("total_cents"),
)
.outerjoin(Booking, Booking.user_id == User.id)
.group_by(User.id, User.name)
.order_by(func.coalesce(func.sum(Booking.total_cents), 0).desc())
.all()
)
return [{"user_id": r.user_id, "name": r.name, "total_cents": int(r.total_cents or 0)} for r in rows]
@router.get("/consumption-per-product", dependencies=[Depends(requires_role("manager", "admin"))])
def consumption_per_product(db: Session = Depends(get_db), _: User = Depends(get_current_user)):
rows = (
db.query(
Product.id.label("product_id"),
Product.name.label("name"),
func.count(Booking.id).label("count"),
)
.outerjoin(Booking, Booking.product_id == Product.id)
.group_by(Product.id, Product.name)
.order_by(func.count(Booking.id).desc())
.all()
)
return [{"product_id": r.product_id, "name": r.name, "count": int(r.count or 0)} for r in rows]
@router.get("/monthly-ranking", dependencies=[Depends(requires_role("manager", "admin"))])
def monthly_ranking(
year: Optional[int] = Query(None),
month: Optional[int] = Query(None),
db: Session = Depends(get_db),
_: User = Depends(get_current_user),
):
q = (
db.query(
User.id.label("user_id"),
User.name,
func.coalesce(func.sum(Booking.total_cents), 0).label("total_cents"),
)
.join(User, User.id == Booking.user_id)
.group_by(User.id, User.name)
.order_by(func.coalesce(func.sum(Booking.total_cents), 0).desc())
.limit(10)
)
return [{"user_id": r.user_id, "name": r.name, "total_cents": int(r.total_cents or 0)} for r in q.all()]