✨ Changements majeurs: - Suppression complète du code Flask legacy - Migration backend FastAPI vers racine /backend - Migration frontend Vue.js vers racine /frontend - Suppression de notytex-v2/ (code monté à la racine) ✅ Validations: - Backend démarre correctement (port 8000) - API /api/v2/health répond healthy - 99/99 tests unitaires passent - Frontend configuré avec proxy Vite 📝 Documentation: - README.md réécrit pour v2 - Instructions de démarrage mises à jour - .gitignore adapté pour backend/frontend/ 🎯 Architecture finale: notytex/ ├── backend/ # FastAPI + SQLAlchemy + Pydantic ├── frontend/ # Vue 3 + Vite + TailwindCSS ├── docs/ # Documentation └── school_management.db # Base de données (inchangée) Jalon 6 complété: Application v2 prête pour utilisation!
979 lines
28 KiB
Python
979 lines
28 KiB
Python
"""
|
|
Routes API pour la configuration (Competences, Domains, Scale).
|
|
"""
|
|
|
|
from typing import List
|
|
|
|
from fastapi import APIRouter, HTTPException
|
|
from sqlalchemy import select, func, delete
|
|
|
|
from api.dependencies import AsyncSessionDep
|
|
from infrastructure.database.models import (
|
|
AppConfig,
|
|
Competence,
|
|
Domain,
|
|
CompetenceScaleValue,
|
|
)
|
|
from schemas.config import (
|
|
CompetenceRead,
|
|
CompetenceList,
|
|
CompetenceCreate,
|
|
CompetenceUpdate,
|
|
DomainRead,
|
|
DomainList,
|
|
DomainCreate,
|
|
DomainUpdate,
|
|
ScaleValueRead,
|
|
ScaleRead,
|
|
ScaleValueCreate,
|
|
ScaleValueUpdate,
|
|
AppConfigRead,
|
|
AppConfigUpdate,
|
|
FullConfigRead,
|
|
ConfigResponse,
|
|
SMTPConfigRead,
|
|
SMTPConfigUpdate,
|
|
SMTPTestRequest,
|
|
SMTPTestResponse,
|
|
NotesGradientRead,
|
|
NotesGradientUpdate,
|
|
ScaleBatchUpdate,
|
|
DatabaseConfigRead,
|
|
DatabaseConfigUpdate,
|
|
DatabaseConfigResponse,
|
|
)
|
|
from infrastructure.external.email_service import EmailService
|
|
|
|
router = APIRouter(prefix="/config", tags=["Configuration"])
|
|
|
|
|
|
@router.get("/competences", response_model=CompetenceList)
|
|
async def get_competences(session: AsyncSessionDep):
|
|
"""
|
|
Récupère la liste de toutes les compétences.
|
|
"""
|
|
query = select(Competence).order_by(Competence.order_index)
|
|
result = await session.execute(query)
|
|
competences = result.scalars().all()
|
|
|
|
return CompetenceList(
|
|
competences=[
|
|
CompetenceRead(
|
|
id=c.id,
|
|
name=c.name,
|
|
color=c.color,
|
|
icon=c.icon,
|
|
order_index=c.order_index
|
|
)
|
|
for c in competences
|
|
]
|
|
)
|
|
|
|
|
|
@router.get("/domains", response_model=DomainList)
|
|
async def get_domains(session: AsyncSessionDep):
|
|
"""
|
|
Récupère la liste de tous les domaines.
|
|
"""
|
|
query = select(Domain).order_by(Domain.name)
|
|
result = await session.execute(query)
|
|
domains = result.scalars().all()
|
|
|
|
return DomainList(
|
|
domains=[
|
|
DomainRead(
|
|
id=d.id,
|
|
name=d.name,
|
|
color=d.color,
|
|
description=d.description
|
|
)
|
|
for d in domains
|
|
]
|
|
)
|
|
|
|
|
|
@router.get("/domains/search", response_model=DomainList)
|
|
async def search_domains(
|
|
q: str,
|
|
session: AsyncSessionDep,
|
|
):
|
|
"""
|
|
Recherche des domaines par nom (autocomplétion).
|
|
"""
|
|
query = select(Domain).where(
|
|
Domain.name.ilike(f"%{q}%")
|
|
).order_by(Domain.name).limit(10)
|
|
|
|
result = await session.execute(query)
|
|
domains = result.scalars().all()
|
|
|
|
return DomainList(
|
|
domains=[
|
|
DomainRead(
|
|
id=d.id,
|
|
name=d.name,
|
|
color=d.color,
|
|
description=d.description
|
|
)
|
|
for d in domains
|
|
]
|
|
)
|
|
|
|
|
|
@router.get("/scale", response_model=ScaleRead)
|
|
async def get_scale(session: AsyncSessionDep):
|
|
"""
|
|
Récupère l'échelle de compétences (valeurs 0, 1, 2, 3, ., d, etc.).
|
|
"""
|
|
query = select(CompetenceScaleValue).order_by(CompetenceScaleValue.value)
|
|
result = await session.execute(query)
|
|
scale_values = result.scalars().all()
|
|
|
|
return ScaleRead(
|
|
values=[
|
|
ScaleValueRead(
|
|
value=sv.value,
|
|
label=sv.label,
|
|
color=sv.color,
|
|
included_in_total=sv.included_in_total
|
|
)
|
|
for sv in scale_values
|
|
]
|
|
)
|
|
|
|
|
|
@router.get("", response_model=FullConfigRead)
|
|
async def get_full_config(session: AsyncSessionDep):
|
|
"""
|
|
Récupère la configuration complète de l'application.
|
|
"""
|
|
# App config
|
|
app_config_query = select(AppConfig)
|
|
app_config_result = await session.execute(app_config_query)
|
|
app_configs = app_config_result.scalars().all()
|
|
|
|
# Convertir en dictionnaire
|
|
config_dict = {c.key: c.value for c in app_configs}
|
|
|
|
app_config = AppConfigRead(
|
|
school_year=config_dict.get("school_year", "2024-2025"),
|
|
school_name=config_dict.get("school_name", ""),
|
|
default_grading_system=config_dict.get("default_grading_system", "notes")
|
|
)
|
|
|
|
# Competences
|
|
competences_query = select(Competence).order_by(Competence.order_index)
|
|
competences_result = await session.execute(competences_query)
|
|
competences = competences_result.scalars().all()
|
|
|
|
competences_list = [
|
|
CompetenceRead(
|
|
id=c.id,
|
|
name=c.name,
|
|
color=c.color,
|
|
icon=c.icon,
|
|
order_index=c.order_index
|
|
)
|
|
for c in competences
|
|
]
|
|
|
|
# Domains
|
|
domains_query = select(Domain).order_by(Domain.name)
|
|
domains_result = await session.execute(domains_query)
|
|
domains = domains_result.scalars().all()
|
|
|
|
domains_list = [
|
|
DomainRead(
|
|
id=d.id,
|
|
name=d.name,
|
|
color=d.color,
|
|
description=d.description
|
|
)
|
|
for d in domains
|
|
]
|
|
|
|
# Scale
|
|
scale_query = select(CompetenceScaleValue).order_by(CompetenceScaleValue.value)
|
|
scale_result = await session.execute(scale_query)
|
|
scale_values = scale_result.scalars().all()
|
|
|
|
scale = ScaleRead(
|
|
values=[
|
|
ScaleValueRead(
|
|
value=sv.value,
|
|
label=sv.label,
|
|
color=sv.color,
|
|
included_in_total=sv.included_in_total
|
|
)
|
|
for sv in scale_values
|
|
]
|
|
)
|
|
|
|
return FullConfigRead(
|
|
app_config=app_config,
|
|
competences=competences_list,
|
|
domains=domains_list,
|
|
scale=scale
|
|
)
|
|
|
|
|
|
# === CRUD Compétences ===
|
|
|
|
@router.post("/competences", response_model=CompetenceRead, status_code=201)
|
|
async def create_competence(
|
|
data: CompetenceCreate,
|
|
session: AsyncSessionDep,
|
|
):
|
|
"""
|
|
Crée une nouvelle compétence.
|
|
"""
|
|
# Vérifier l'unicité du nom
|
|
existing_query = select(Competence).where(Competence.name == data.name)
|
|
existing_result = await session.execute(existing_query)
|
|
if existing_result.scalar_one_or_none():
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"Une compétence avec le nom '{data.name}' existe déjà"
|
|
)
|
|
|
|
# Déterminer l'index d'ordre
|
|
if data.order_index is None:
|
|
max_order_query = select(func.max(Competence.order_index))
|
|
max_order_result = await session.execute(max_order_query)
|
|
max_order = max_order_result.scalar() or 0
|
|
order_index = max_order + 1
|
|
else:
|
|
order_index = data.order_index
|
|
|
|
# Créer la compétence
|
|
competence = Competence(
|
|
name=data.name,
|
|
color=data.color,
|
|
icon=data.icon,
|
|
order_index=order_index
|
|
)
|
|
|
|
session.add(competence)
|
|
await session.commit()
|
|
await session.refresh(competence)
|
|
|
|
return CompetenceRead(
|
|
id=competence.id,
|
|
name=competence.name,
|
|
color=competence.color,
|
|
icon=competence.icon,
|
|
order_index=competence.order_index
|
|
)
|
|
|
|
|
|
@router.put("/competences/{competence_id}", response_model=CompetenceRead)
|
|
async def update_competence(
|
|
competence_id: int,
|
|
data: CompetenceUpdate,
|
|
session: AsyncSessionDep,
|
|
):
|
|
"""
|
|
Met à jour une compétence existante.
|
|
"""
|
|
query = select(Competence).where(Competence.id == competence_id)
|
|
result = await session.execute(query)
|
|
competence = result.scalar_one_or_none()
|
|
|
|
if not competence:
|
|
raise HTTPException(status_code=404, detail="Compétence non trouvée")
|
|
|
|
# Vérifier l'unicité du nouveau nom
|
|
if data.name and data.name != competence.name:
|
|
existing_query = select(Competence).where(
|
|
Competence.name == data.name,
|
|
Competence.id != competence_id
|
|
)
|
|
existing_result = await session.execute(existing_query)
|
|
if existing_result.scalar_one_or_none():
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"Une autre compétence avec le nom '{data.name}' existe déjà"
|
|
)
|
|
|
|
# Appliquer les modifications
|
|
if data.name is not None:
|
|
competence.name = data.name
|
|
if data.color is not None:
|
|
competence.color = data.color
|
|
if data.icon is not None:
|
|
competence.icon = data.icon
|
|
if data.order_index is not None:
|
|
competence.order_index = data.order_index
|
|
|
|
await session.commit()
|
|
await session.refresh(competence)
|
|
|
|
return CompetenceRead(
|
|
id=competence.id,
|
|
name=competence.name,
|
|
color=competence.color,
|
|
icon=competence.icon,
|
|
order_index=competence.order_index
|
|
)
|
|
|
|
|
|
@router.delete("/competences/{competence_id}", response_model=ConfigResponse)
|
|
async def delete_competence(
|
|
competence_id: int,
|
|
session: AsyncSessionDep,
|
|
):
|
|
"""
|
|
Supprime une compétence.
|
|
"""
|
|
query = select(Competence).where(Competence.id == competence_id)
|
|
result = await session.execute(query)
|
|
competence = result.scalar_one_or_none()
|
|
|
|
if not competence:
|
|
raise HTTPException(status_code=404, detail="Compétence non trouvée")
|
|
|
|
name = competence.name
|
|
await session.delete(competence)
|
|
await session.commit()
|
|
|
|
return ConfigResponse(
|
|
success=True,
|
|
message=f"Compétence '{name}' supprimée"
|
|
)
|
|
|
|
|
|
# === CRUD Domaines ===
|
|
|
|
@router.post("/domains", response_model=DomainRead, status_code=201)
|
|
async def create_domain(
|
|
data: DomainCreate,
|
|
session: AsyncSessionDep,
|
|
):
|
|
"""
|
|
Crée un nouveau domaine.
|
|
"""
|
|
# Vérifier l'unicité du nom
|
|
existing_query = select(Domain).where(Domain.name == data.name)
|
|
existing_result = await session.execute(existing_query)
|
|
if existing_result.scalar_one_or_none():
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"Un domaine avec le nom '{data.name}' existe déjà"
|
|
)
|
|
|
|
# Créer le domaine
|
|
domain = Domain(
|
|
name=data.name,
|
|
color=data.color,
|
|
description=data.description
|
|
)
|
|
|
|
session.add(domain)
|
|
await session.commit()
|
|
await session.refresh(domain)
|
|
|
|
return DomainRead(
|
|
id=domain.id,
|
|
name=domain.name,
|
|
color=domain.color,
|
|
description=domain.description
|
|
)
|
|
|
|
|
|
@router.put("/domains/{domain_id}", response_model=DomainRead)
|
|
async def update_domain(
|
|
domain_id: int,
|
|
data: DomainUpdate,
|
|
session: AsyncSessionDep,
|
|
):
|
|
"""
|
|
Met à jour un domaine existant.
|
|
"""
|
|
query = select(Domain).where(Domain.id == domain_id)
|
|
result = await session.execute(query)
|
|
domain = result.scalar_one_or_none()
|
|
|
|
if not domain:
|
|
raise HTTPException(status_code=404, detail="Domaine non trouvé")
|
|
|
|
# Vérifier l'unicité du nouveau nom
|
|
if data.name and data.name != domain.name:
|
|
existing_query = select(Domain).where(
|
|
Domain.name == data.name,
|
|
Domain.id != domain_id
|
|
)
|
|
existing_result = await session.execute(existing_query)
|
|
if existing_result.scalar_one_or_none():
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"Un autre domaine avec le nom '{data.name}' existe déjà"
|
|
)
|
|
|
|
# Appliquer les modifications
|
|
if data.name is not None:
|
|
domain.name = data.name
|
|
if data.color is not None:
|
|
domain.color = data.color
|
|
if data.description is not None:
|
|
domain.description = data.description
|
|
|
|
await session.commit()
|
|
await session.refresh(domain)
|
|
|
|
return DomainRead(
|
|
id=domain.id,
|
|
name=domain.name,
|
|
color=domain.color,
|
|
description=domain.description
|
|
)
|
|
|
|
|
|
@router.delete("/domains/{domain_id}", response_model=ConfigResponse)
|
|
async def delete_domain(
|
|
domain_id: int,
|
|
session: AsyncSessionDep,
|
|
):
|
|
"""
|
|
Supprime un domaine.
|
|
"""
|
|
query = select(Domain).where(Domain.id == domain_id)
|
|
result = await session.execute(query)
|
|
domain = result.scalar_one_or_none()
|
|
|
|
if not domain:
|
|
raise HTTPException(status_code=404, detail="Domaine non trouvé")
|
|
|
|
name = domain.name
|
|
await session.delete(domain)
|
|
await session.commit()
|
|
|
|
return ConfigResponse(
|
|
success=True,
|
|
message=f"Domaine '{name}' supprimé"
|
|
)
|
|
|
|
|
|
# === CRUD Échelle de notation ===
|
|
|
|
@router.post("/scale", response_model=ScaleValueRead, status_code=201)
|
|
async def create_scale_value(
|
|
data: ScaleValueCreate,
|
|
session: AsyncSessionDep,
|
|
):
|
|
"""
|
|
Crée une nouvelle valeur d'échelle.
|
|
"""
|
|
# Vérifier l'unicité de la valeur
|
|
existing_query = select(CompetenceScaleValue).where(
|
|
CompetenceScaleValue.value == data.value
|
|
)
|
|
existing_result = await session.execute(existing_query)
|
|
if existing_result.scalar_one_or_none():
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"Une valeur d'échelle '{data.value}' existe déjà"
|
|
)
|
|
|
|
# Créer la valeur
|
|
scale_value = CompetenceScaleValue(
|
|
value=data.value,
|
|
label=data.label,
|
|
color=data.color,
|
|
included_in_total=data.included_in_total
|
|
)
|
|
|
|
session.add(scale_value)
|
|
await session.commit()
|
|
await session.refresh(scale_value)
|
|
|
|
return ScaleValueRead(
|
|
value=scale_value.value,
|
|
label=scale_value.label,
|
|
color=scale_value.color,
|
|
included_in_total=scale_value.included_in_total
|
|
)
|
|
|
|
|
|
# Routes batch et reset AVANT les routes avec paramètres
|
|
@router.put("/scale/batch", response_model=ScaleRead)
|
|
async def update_scale_batch(
|
|
data: ScaleBatchUpdate,
|
|
session: AsyncSessionDep,
|
|
):
|
|
"""
|
|
Met à jour plusieurs valeurs d'échelle en une seule requête.
|
|
"""
|
|
for item in data.values:
|
|
query = select(CompetenceScaleValue).where(
|
|
CompetenceScaleValue.value == item.value
|
|
)
|
|
result = await session.execute(query)
|
|
sv = result.scalar_one_or_none()
|
|
|
|
if sv:
|
|
sv.label = item.label
|
|
sv.color = item.color
|
|
sv.included_in_total = item.included_in_total
|
|
else:
|
|
# Créer si n'existe pas
|
|
new_sv = CompetenceScaleValue(
|
|
value=item.value,
|
|
label=item.label,
|
|
color=item.color,
|
|
included_in_total=item.included_in_total
|
|
)
|
|
session.add(new_sv)
|
|
|
|
await session.commit()
|
|
|
|
return await get_scale(session)
|
|
|
|
|
|
@router.post("/scale/reset", response_model=ScaleRead)
|
|
async def reset_scale(session: AsyncSessionDep):
|
|
"""
|
|
Réinitialise l'échelle aux valeurs par défaut.
|
|
"""
|
|
# Supprimer toutes les valeurs existantes
|
|
await session.execute(
|
|
delete(CompetenceScaleValue)
|
|
)
|
|
|
|
# Valeurs par défaut
|
|
default_values = [
|
|
("0", "Non acquis", "#dc2626", True),
|
|
("1", "En cours d'acquisition", "#ea580c", True),
|
|
("2", "Acquis", "#059669", True),
|
|
("3", "Expert", "#2563eb", True),
|
|
(".", "Non évalué", "#6b7280", True),
|
|
]
|
|
|
|
for value, label, color, included in default_values:
|
|
sv = CompetenceScaleValue(
|
|
value=value,
|
|
label=label,
|
|
color=color,
|
|
included_in_total=included
|
|
)
|
|
session.add(sv)
|
|
|
|
await session.commit()
|
|
|
|
return await get_scale(session)
|
|
|
|
|
|
@router.put("/scale/{scale_value}", response_model=ScaleValueRead)
|
|
async def update_scale_value(
|
|
scale_value: str,
|
|
data: ScaleValueUpdate,
|
|
session: AsyncSessionDep,
|
|
):
|
|
"""
|
|
Met à jour une valeur d'échelle existante.
|
|
"""
|
|
query = select(CompetenceScaleValue).where(
|
|
CompetenceScaleValue.value == scale_value
|
|
)
|
|
result = await session.execute(query)
|
|
sv = result.scalar_one_or_none()
|
|
|
|
if not sv:
|
|
raise HTTPException(status_code=404, detail="Valeur d'échelle non trouvée")
|
|
|
|
# Vérifier l'unicité de la nouvelle valeur
|
|
if data.value and data.value != sv.value:
|
|
existing_query = select(CompetenceScaleValue).where(
|
|
CompetenceScaleValue.value == data.value
|
|
)
|
|
existing_result = await session.execute(existing_query)
|
|
if existing_result.scalar_one_or_none():
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"Une autre valeur d'échelle '{data.value}' existe déjà"
|
|
)
|
|
|
|
# Appliquer les modifications
|
|
if data.value is not None:
|
|
sv.value = data.value
|
|
if data.label is not None:
|
|
sv.label = data.label
|
|
if data.color is not None:
|
|
sv.color = data.color
|
|
if data.included_in_total is not None:
|
|
sv.included_in_total = data.included_in_total
|
|
|
|
await session.commit()
|
|
await session.refresh(sv)
|
|
|
|
return ScaleValueRead(
|
|
value=sv.value,
|
|
label=sv.label,
|
|
color=sv.color,
|
|
included_in_total=sv.included_in_total
|
|
)
|
|
|
|
|
|
@router.delete("/scale/{scale_value}", response_model=ConfigResponse)
|
|
async def delete_scale_value(
|
|
scale_value: str,
|
|
session: AsyncSessionDep,
|
|
):
|
|
"""
|
|
Supprime une valeur d'échelle.
|
|
|
|
Note: Les valeurs de base (0, 1, 2, 3, .) ne peuvent pas être supprimées.
|
|
"""
|
|
# Protection des valeurs de base
|
|
base_values = ['0', '1', '2', '3', '.']
|
|
if scale_value in base_values:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"La valeur '{scale_value}' est une valeur de base et ne peut pas être supprimée"
|
|
)
|
|
|
|
query = select(CompetenceScaleValue).where(
|
|
CompetenceScaleValue.value == scale_value
|
|
)
|
|
result = await session.execute(query)
|
|
sv = result.scalar_one_or_none()
|
|
|
|
if not sv:
|
|
raise HTTPException(status_code=404, detail="Valeur d'échelle non trouvée")
|
|
|
|
value = sv.value
|
|
await session.delete(sv)
|
|
await session.commit()
|
|
|
|
return ConfigResponse(
|
|
success=True,
|
|
message=f"Valeur d'échelle '{value}' supprimée"
|
|
)
|
|
|
|
|
|
# === Configuration générale ===
|
|
|
|
@router.put("", response_model=AppConfigRead)
|
|
async def update_app_config(
|
|
data: AppConfigUpdate,
|
|
session: AsyncSessionDep,
|
|
):
|
|
"""
|
|
Met à jour la configuration générale de l'application.
|
|
"""
|
|
updates = []
|
|
|
|
if data.school_year is not None:
|
|
updates.append(("school_year", data.school_year))
|
|
if data.school_name is not None:
|
|
updates.append(("school_name", data.school_name))
|
|
if data.default_grading_system is not None:
|
|
updates.append(("default_grading_system", data.default_grading_system))
|
|
|
|
for key, value in updates:
|
|
# Chercher la config existante
|
|
query = select(AppConfig).where(AppConfig.key == key)
|
|
result = await session.execute(query)
|
|
config = result.scalar_one_or_none()
|
|
|
|
if config:
|
|
config.value = value
|
|
else:
|
|
# Créer si n'existe pas
|
|
new_config = AppConfig(key=key, value=value)
|
|
session.add(new_config)
|
|
|
|
await session.commit()
|
|
|
|
# Récupérer la configuration mise à jour
|
|
config_query = select(AppConfig)
|
|
config_result = await session.execute(config_query)
|
|
configs = config_result.scalars().all()
|
|
|
|
config_dict = {c.key: c.value for c in configs}
|
|
|
|
return AppConfigRead(
|
|
school_year=config_dict.get("school_year", "2024-2025"),
|
|
school_name=config_dict.get("school_name", ""),
|
|
default_grading_system=config_dict.get("default_grading_system", "notes")
|
|
)
|
|
|
|
|
|
# === Configuration SMTP Email ===
|
|
|
|
@router.get("/smtp", response_model=SMTPConfigRead)
|
|
async def get_smtp_config(session: AsyncSessionDep):
|
|
"""
|
|
Récupère la configuration SMTP pour l'envoi d'emails.
|
|
|
|
Note: Le mot de passe n'est jamais retourné pour des raisons de sécurité.
|
|
"""
|
|
# Récupérer toutes les configs email
|
|
query = select(AppConfig).where(AppConfig.key.like("email.%"))
|
|
result = await session.execute(query)
|
|
configs = result.scalars().all()
|
|
|
|
config_dict = {c.key: c.value for c in configs}
|
|
|
|
# Créer le service pour vérifier si configuré
|
|
email_service = EmailService.from_database_config(config_dict)
|
|
|
|
return SMTPConfigRead(
|
|
host=config_dict.get("email.smtp_host", ""),
|
|
port=int(config_dict.get("email.smtp_port", 587)),
|
|
username=config_dict.get("email.username", ""),
|
|
use_tls=str(config_dict.get("email.use_tls", "true")).lower() == "true",
|
|
from_name=config_dict.get("email.from_name", "Notytex"),
|
|
from_address=config_dict.get("email.from_address", ""),
|
|
is_configured=email_service.is_configured()
|
|
)
|
|
|
|
|
|
@router.put("/smtp", response_model=SMTPConfigRead)
|
|
async def update_smtp_config(
|
|
data: SMTPConfigUpdate,
|
|
session: AsyncSessionDep,
|
|
):
|
|
"""
|
|
Met à jour la configuration SMTP.
|
|
|
|
Tous les champs sont optionnels - seuls ceux fournis seront mis à jour.
|
|
"""
|
|
updates = []
|
|
|
|
if data.host is not None:
|
|
updates.append(("email.smtp_host", data.host))
|
|
if data.port is not None:
|
|
updates.append(("email.smtp_port", str(data.port)))
|
|
if data.username is not None:
|
|
updates.append(("email.username", data.username))
|
|
if data.password is not None:
|
|
updates.append(("email.password", data.password))
|
|
if data.use_tls is not None:
|
|
updates.append(("email.use_tls", "true" if data.use_tls else "false"))
|
|
if data.from_name is not None:
|
|
updates.append(("email.from_name", data.from_name))
|
|
if data.from_address is not None:
|
|
updates.append(("email.from_address", data.from_address))
|
|
|
|
for key, value in updates:
|
|
# Chercher la config existante
|
|
query = select(AppConfig).where(AppConfig.key == key)
|
|
result = await session.execute(query)
|
|
config = result.scalar_one_or_none()
|
|
|
|
if config:
|
|
config.value = value
|
|
else:
|
|
# Créer si n'existe pas
|
|
new_config = AppConfig(key=key, value=value)
|
|
session.add(new_config)
|
|
|
|
await session.commit()
|
|
|
|
# Retourner la config mise à jour
|
|
return await get_smtp_config(session)
|
|
|
|
|
|
@router.post("/smtp/test", response_model=SMTPTestResponse)
|
|
async def test_smtp_config(
|
|
data: SMTPTestRequest,
|
|
session: AsyncSessionDep,
|
|
):
|
|
"""
|
|
Teste la configuration SMTP en envoyant un email de test.
|
|
"""
|
|
# Récupérer la config
|
|
query = select(AppConfig).where(AppConfig.key.like("email.%"))
|
|
result = await session.execute(query)
|
|
configs = result.scalars().all()
|
|
|
|
config_dict = {c.key: c.value for c in configs}
|
|
|
|
# Créer le service et envoyer le test
|
|
email_service = EmailService.from_database_config(config_dict)
|
|
|
|
if not email_service.is_configured():
|
|
return SMTPTestResponse(
|
|
success=False,
|
|
message="Configuration SMTP incomplète. Vérifiez les paramètres."
|
|
)
|
|
|
|
result = email_service.send_test_email(data.test_email)
|
|
|
|
return SMTPTestResponse(
|
|
success=result.get("success", False),
|
|
message=result.get("message", result.get("error", "Erreur inconnue"))
|
|
)
|
|
|
|
|
|
# === Configuration dégradé des notes ===
|
|
|
|
@router.get("/notes-gradient", response_model=NotesGradientRead)
|
|
async def get_notes_gradient(session: AsyncSessionDep):
|
|
"""
|
|
Récupère la configuration du dégradé de couleurs pour les notes.
|
|
"""
|
|
query = select(AppConfig).where(AppConfig.key.like("grading.notes_gradient.%"))
|
|
result = await session.execute(query)
|
|
configs = result.scalars().all()
|
|
|
|
config_dict = {c.key: c.value for c in configs}
|
|
|
|
return NotesGradientRead(
|
|
min_color=config_dict.get("grading.notes_gradient.min_color", "#dc2626"),
|
|
max_color=config_dict.get("grading.notes_gradient.max_color", "#059669"),
|
|
enabled=str(config_dict.get("grading.notes_gradient.enabled", "false")).lower() == "true"
|
|
)
|
|
|
|
|
|
@router.put("/notes-gradient", response_model=NotesGradientRead)
|
|
async def update_notes_gradient(
|
|
data: NotesGradientUpdate,
|
|
session: AsyncSessionDep,
|
|
):
|
|
"""
|
|
Met à jour la configuration du dégradé de couleurs pour les notes.
|
|
"""
|
|
updates = []
|
|
|
|
if data.min_color is not None:
|
|
updates.append(("grading.notes_gradient.min_color", data.min_color))
|
|
if data.max_color is not None:
|
|
updates.append(("grading.notes_gradient.max_color", data.max_color))
|
|
if data.enabled is not None:
|
|
updates.append(("grading.notes_gradient.enabled", "true" if data.enabled else "false"))
|
|
|
|
for key, value in updates:
|
|
query = select(AppConfig).where(AppConfig.key == key)
|
|
result = await session.execute(query)
|
|
config = result.scalar_one_or_none()
|
|
|
|
if config:
|
|
config.value = value
|
|
else:
|
|
new_config = AppConfig(key=key, value=value)
|
|
session.add(new_config)
|
|
|
|
await session.commit()
|
|
|
|
return await get_notes_gradient(session)
|
|
|
|
|
|
# === Configuration de la base de données ===
|
|
|
|
def _format_size(size_bytes: int) -> str:
|
|
"""Formate une taille en bytes en format lisible."""
|
|
size = float(size_bytes)
|
|
for unit in ['B', 'KB', 'MB', 'GB']:
|
|
if size < 1024:
|
|
return f"{size:.1f} {unit}"
|
|
size /= 1024
|
|
return f"{size:.1f} TB"
|
|
|
|
|
|
@router.get("/database", response_model=DatabaseConfigRead)
|
|
async def get_database_config():
|
|
"""
|
|
Récupère la configuration actuelle de la base de données.
|
|
|
|
Retourne le chemin configuré, le chemin résolu, et des informations
|
|
sur le fichier (existence, taille).
|
|
"""
|
|
from core.config import settings
|
|
from pathlib import Path
|
|
|
|
resolved_path = settings.database_path_resolved
|
|
exists = resolved_path.exists()
|
|
|
|
size_bytes = None
|
|
size_human = None
|
|
if exists:
|
|
size_bytes = resolved_path.stat().st_size
|
|
size_human = _format_size(size_bytes)
|
|
|
|
return DatabaseConfigRead(
|
|
path=settings.database_path_configured,
|
|
resolved_path=str(resolved_path),
|
|
exists=exists,
|
|
size_bytes=size_bytes,
|
|
size_human=size_human
|
|
)
|
|
|
|
|
|
@router.put("/database", response_model=DatabaseConfigResponse)
|
|
async def update_database_config(
|
|
data: DatabaseConfigUpdate,
|
|
):
|
|
"""
|
|
Met à jour le chemin de la base de données.
|
|
|
|
IMPORTANT: Cette modification met à jour le fichier .env et nécessite
|
|
un redémarrage de l'application pour prendre effet.
|
|
|
|
Le chemin peut être:
|
|
- Relatif au répertoire backend (ex: "data/school.db")
|
|
- Absolu (ex: "/var/data/notytex/school.db")
|
|
"""
|
|
from pathlib import Path
|
|
import os
|
|
|
|
# Valider que le chemin est valide
|
|
new_path = Path(data.path)
|
|
|
|
# Si relatif, vérifier que le répertoire parent existe ou peut être créé
|
|
if not new_path.is_absolute():
|
|
resolved = Path(__file__).parent.parent.parent / data.path
|
|
else:
|
|
resolved = new_path
|
|
|
|
# Vérifier que le répertoire parent existe
|
|
parent_dir = resolved.parent
|
|
if not parent_dir.exists():
|
|
try:
|
|
parent_dir.mkdir(parents=True, exist_ok=True)
|
|
except Exception as e:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"Impossible de créer le répertoire parent: {str(e)}"
|
|
)
|
|
|
|
# Trouver le fichier .env
|
|
env_file = Path(__file__).parent.parent.parent / ".env"
|
|
|
|
# Lire le contenu actuel ou créer un nouveau fichier
|
|
env_content = ""
|
|
if env_file.exists():
|
|
env_content = env_file.read_text()
|
|
|
|
# Mettre à jour ou ajouter DATABASE_PATH
|
|
import re
|
|
if "DATABASE_PATH=" in env_content:
|
|
# Remplacer la valeur existante
|
|
env_content = re.sub(
|
|
r'^DATABASE_PATH=.*$',
|
|
f'DATABASE_PATH={data.path}',
|
|
env_content,
|
|
flags=re.MULTILINE
|
|
)
|
|
else:
|
|
# Ajouter la nouvelle variable
|
|
if env_content and not env_content.endswith('\n'):
|
|
env_content += '\n'
|
|
env_content += f'DATABASE_PATH={data.path}\n'
|
|
|
|
# Écrire le fichier .env
|
|
try:
|
|
env_file.write_text(env_content)
|
|
except Exception as e:
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=f"Erreur lors de l'écriture du fichier .env: {str(e)}"
|
|
)
|
|
|
|
return DatabaseConfigResponse(
|
|
success=True,
|
|
message=f"Chemin de base de données mis à jour. Redémarrez l'application pour appliquer les changements.",
|
|
requires_restart=True,
|
|
new_path=data.path
|
|
)
|