""" Routes API pour la configuration (Competences, Domains, Scale). """ from typing import List from fastapi import APIRouter, HTTPException from sqlalchemy import select, func, delete from api.dependencies import AsyncSessionDep from api.helpers import ensure_unique_name, upsert_app_configs from infrastructure.database.models import ( AppConfig, Competence, Domain, CompetenceScaleValue, ) from schemas.config import ( CompetenceRead, CompetenceList, CompetenceCreate, CompetenceUpdate, DomainRead, DomainList, DomainCreate, DomainUpdate, ScaleValueRead, ScaleRead, ScaleValueCreate, ScaleValueUpdate, AppConfigRead, AppConfigUpdate, FullConfigRead, ConfigResponse, SMTPConfigRead, SMTPConfigUpdate, SMTPTestRequest, SMTPTestResponse, NotesGradientRead, NotesGradientUpdate, ScaleBatchUpdate, DatabaseConfigRead, DatabaseConfigUpdate, DatabaseConfigResponse, ) from infrastructure.external.email_service import EmailService router = APIRouter(prefix="/config", tags=["Configuration"]) @router.get("/competences", response_model=CompetenceList) async def get_competences(session: AsyncSessionDep): """ Récupère la liste de toutes les compétences. """ query = select(Competence).order_by(Competence.order_index) result = await session.execute(query) competences = result.scalars().all() return CompetenceList( competences=[ CompetenceRead( id=c.id, name=c.name, color=c.color, icon=c.icon, order_index=c.order_index ) for c in competences ] ) @router.get("/domains", response_model=DomainList) async def get_domains(session: AsyncSessionDep): """ Récupère la liste de tous les domaines. """ query = select(Domain).order_by(Domain.name) result = await session.execute(query) domains = result.scalars().all() return DomainList( domains=[ DomainRead( id=d.id, name=d.name, color=d.color, description=d.description ) for d in domains ] ) @router.get("/domains/search", response_model=DomainList) async def search_domains( q: str, session: AsyncSessionDep, ): """ Recherche des domaines par nom (autocomplétion). """ query = select(Domain).where( Domain.name.ilike(f"%{q}%") ).order_by(Domain.name).limit(10) result = await session.execute(query) domains = result.scalars().all() return DomainList( domains=[ DomainRead( id=d.id, name=d.name, color=d.color, description=d.description ) for d in domains ] ) @router.get("/scale", response_model=ScaleRead) async def get_scale(session: AsyncSessionDep): """ Récupère l'échelle de compétences (valeurs 0, 1, 2, 3, ., d, etc.). """ query = select(CompetenceScaleValue).order_by(CompetenceScaleValue.value) result = await session.execute(query) scale_values = result.scalars().all() return ScaleRead( values=[ ScaleValueRead( value=sv.value, label=sv.label, color=sv.color, included_in_total=sv.included_in_total ) for sv in scale_values ] ) @router.get("", response_model=FullConfigRead) async def get_full_config(session: AsyncSessionDep): """ Récupère la configuration complète de l'application. """ # App config app_config_query = select(AppConfig) app_config_result = await session.execute(app_config_query) app_configs = app_config_result.scalars().all() # Convertir en dictionnaire config_dict = {c.key: c.value for c in app_configs} app_config = AppConfigRead( school_year=config_dict.get("school_year", "2024-2025"), school_name=config_dict.get("school_name", ""), default_grading_system=config_dict.get("default_grading_system", "notes") ) # Competences competences_query = select(Competence).order_by(Competence.order_index) competences_result = await session.execute(competences_query) competences = competences_result.scalars().all() competences_list = [ CompetenceRead( id=c.id, name=c.name, color=c.color, icon=c.icon, order_index=c.order_index ) for c in competences ] # Domains domains_query = select(Domain).order_by(Domain.name) domains_result = await session.execute(domains_query) domains = domains_result.scalars().all() domains_list = [ DomainRead( id=d.id, name=d.name, color=d.color, description=d.description ) for d in domains ] # Scale scale_query = select(CompetenceScaleValue).order_by(CompetenceScaleValue.value) scale_result = await session.execute(scale_query) scale_values = scale_result.scalars().all() scale = ScaleRead( values=[ ScaleValueRead( value=sv.value, label=sv.label, color=sv.color, included_in_total=sv.included_in_total ) for sv in scale_values ] ) return FullConfigRead( app_config=app_config, competences=competences_list, domains=domains_list, scale=scale ) # === CRUD Compétences === @router.post("/competences", response_model=CompetenceRead, status_code=201) async def create_competence( data: CompetenceCreate, session: AsyncSessionDep, ): """ Crée une nouvelle compétence. """ # Vérifier l'unicité du nom await ensure_unique_name(session, Competence, data.name, entity_label="compétence") # Déterminer l'index d'ordre if data.order_index is None: max_order_query = select(func.max(Competence.order_index)) max_order_result = await session.execute(max_order_query) max_order = max_order_result.scalar() or 0 order_index = max_order + 1 else: order_index = data.order_index # Créer la compétence competence = Competence( name=data.name, color=data.color, icon=data.icon, order_index=order_index ) session.add(competence) await session.commit() await session.refresh(competence) return CompetenceRead( id=competence.id, name=competence.name, color=competence.color, icon=competence.icon, order_index=competence.order_index ) @router.put("/competences/{competence_id}", response_model=CompetenceRead) async def update_competence( competence_id: int, data: CompetenceUpdate, session: AsyncSessionDep, ): """ Met à jour une compétence existante. """ query = select(Competence).where(Competence.id == competence_id) result = await session.execute(query) competence = result.scalar_one_or_none() if not competence: raise HTTPException(status_code=404, detail="Compétence non trouvée") # Vérifier l'unicité du nouveau nom if data.name and data.name != competence.name: await ensure_unique_name(session, Competence, data.name, exclude_id=competence_id, entity_label="compétence") # Appliquer les modifications if data.name is not None: competence.name = data.name if data.color is not None: competence.color = data.color if data.icon is not None: competence.icon = data.icon if data.order_index is not None: competence.order_index = data.order_index await session.commit() await session.refresh(competence) return CompetenceRead( id=competence.id, name=competence.name, color=competence.color, icon=competence.icon, order_index=competence.order_index ) @router.delete("/competences/{competence_id}", response_model=ConfigResponse) async def delete_competence( competence_id: int, session: AsyncSessionDep, ): """ Supprime une compétence. """ query = select(Competence).where(Competence.id == competence_id) result = await session.execute(query) competence = result.scalar_one_or_none() if not competence: raise HTTPException(status_code=404, detail="Compétence non trouvée") name = competence.name await session.delete(competence) await session.commit() return ConfigResponse( success=True, message=f"Compétence '{name}' supprimée" ) # === CRUD Domaines === @router.post("/domains", response_model=DomainRead, status_code=201) async def create_domain( data: DomainCreate, session: AsyncSessionDep, ): """ Crée un nouveau domaine. """ # Vérifier l'unicité du nom await ensure_unique_name(session, Domain, data.name, entity_label="domaine") # Créer le domaine domain = Domain( name=data.name, color=data.color, description=data.description ) session.add(domain) await session.commit() await session.refresh(domain) return DomainRead( id=domain.id, name=domain.name, color=domain.color, description=domain.description ) @router.put("/domains/{domain_id}", response_model=DomainRead) async def update_domain( domain_id: int, data: DomainUpdate, session: AsyncSessionDep, ): """ Met à jour un domaine existant. """ query = select(Domain).where(Domain.id == domain_id) result = await session.execute(query) domain = result.scalar_one_or_none() if not domain: raise HTTPException(status_code=404, detail="Domaine non trouvé") # Vérifier l'unicité du nouveau nom if data.name and data.name != domain.name: await ensure_unique_name(session, Domain, data.name, exclude_id=domain_id, entity_label="domaine") # Appliquer les modifications if data.name is not None: domain.name = data.name if data.color is not None: domain.color = data.color if data.description is not None: domain.description = data.description await session.commit() await session.refresh(domain) return DomainRead( id=domain.id, name=domain.name, color=domain.color, description=domain.description ) @router.delete("/domains/{domain_id}", response_model=ConfigResponse) async def delete_domain( domain_id: int, session: AsyncSessionDep, ): """ Supprime un domaine. """ query = select(Domain).where(Domain.id == domain_id) result = await session.execute(query) domain = result.scalar_one_or_none() if not domain: raise HTTPException(status_code=404, detail="Domaine non trouvé") name = domain.name await session.delete(domain) await session.commit() return ConfigResponse( success=True, message=f"Domaine '{name}' supprimé" ) # === CRUD Échelle de notation === @router.post("/scale", response_model=ScaleValueRead, status_code=201) async def create_scale_value( data: ScaleValueCreate, session: AsyncSessionDep, ): """ Crée une nouvelle valeur d'échelle. """ # Vérifier l'unicité de la valeur existing_query = select(CompetenceScaleValue).where( CompetenceScaleValue.value == data.value ) existing_result = await session.execute(existing_query) if existing_result.scalar_one_or_none(): raise HTTPException( status_code=400, detail=f"Une valeur d'échelle '{data.value}' existe déjà" ) # Créer la valeur scale_value = CompetenceScaleValue( value=data.value, label=data.label, color=data.color, included_in_total=data.included_in_total ) session.add(scale_value) await session.commit() await session.refresh(scale_value) return ScaleValueRead( value=scale_value.value, label=scale_value.label, color=scale_value.color, included_in_total=scale_value.included_in_total ) # Routes batch et reset AVANT les routes avec paramètres @router.put("/scale/batch", response_model=ScaleRead) async def update_scale_batch( data: ScaleBatchUpdate, session: AsyncSessionDep, ): """ Met à jour plusieurs valeurs d'échelle en une seule requête. """ for item in data.values: query = select(CompetenceScaleValue).where( CompetenceScaleValue.value == item.value ) result = await session.execute(query) sv = result.scalar_one_or_none() if sv: sv.label = item.label sv.color = item.color sv.included_in_total = item.included_in_total else: # Créer si n'existe pas new_sv = CompetenceScaleValue( value=item.value, label=item.label, color=item.color, included_in_total=item.included_in_total ) session.add(new_sv) await session.commit() return await get_scale(session) @router.post("/scale/reset", response_model=ScaleRead) async def reset_scale(session: AsyncSessionDep): """ Réinitialise l'échelle aux valeurs par défaut. """ # Supprimer toutes les valeurs existantes await session.execute( delete(CompetenceScaleValue) ) # Valeurs par défaut default_values = [ ("0", "Non acquis", "#dc2626", True), ("1", "En cours d'acquisition", "#ea580c", True), ("2", "Acquis", "#059669", True), ("3", "Expert", "#2563eb", True), (".", "Non évalué", "#6b7280", True), ] for value, label, color, included in default_values: sv = CompetenceScaleValue( value=value, label=label, color=color, included_in_total=included ) session.add(sv) await session.commit() return await get_scale(session) @router.put("/scale/{scale_value}", response_model=ScaleValueRead) async def update_scale_value( scale_value: str, data: ScaleValueUpdate, session: AsyncSessionDep, ): """ Met à jour une valeur d'échelle existante. """ query = select(CompetenceScaleValue).where( CompetenceScaleValue.value == scale_value ) result = await session.execute(query) sv = result.scalar_one_or_none() if not sv: raise HTTPException(status_code=404, detail="Valeur d'échelle non trouvée") # Vérifier l'unicité de la nouvelle valeur if data.value and data.value != sv.value: existing_query = select(CompetenceScaleValue).where( CompetenceScaleValue.value == data.value ) existing_result = await session.execute(existing_query) if existing_result.scalar_one_or_none(): raise HTTPException( status_code=400, detail=f"Une autre valeur d'échelle '{data.value}' existe déjà" ) # Appliquer les modifications if data.value is not None: sv.value = data.value if data.label is not None: sv.label = data.label if data.color is not None: sv.color = data.color if data.included_in_total is not None: sv.included_in_total = data.included_in_total await session.commit() await session.refresh(sv) return ScaleValueRead( value=sv.value, label=sv.label, color=sv.color, included_in_total=sv.included_in_total ) @router.delete("/scale/{scale_value}", response_model=ConfigResponse) async def delete_scale_value( scale_value: str, session: AsyncSessionDep, ): """ Supprime une valeur d'échelle. Note: Les valeurs de base (0, 1, 2, 3, .) ne peuvent pas être supprimées. """ # Protection des valeurs de base base_values = ['0', '1', '2', '3', '.'] if scale_value in base_values: raise HTTPException( status_code=400, detail=f"La valeur '{scale_value}' est une valeur de base et ne peut pas être supprimée" ) query = select(CompetenceScaleValue).where( CompetenceScaleValue.value == scale_value ) result = await session.execute(query) sv = result.scalar_one_or_none() if not sv: raise HTTPException(status_code=404, detail="Valeur d'échelle non trouvée") value = sv.value await session.delete(sv) await session.commit() return ConfigResponse( success=True, message=f"Valeur d'échelle '{value}' supprimée" ) # === Configuration générale === @router.put("", response_model=AppConfigRead) async def update_app_config( data: AppConfigUpdate, session: AsyncSessionDep, ): """ Met à jour la configuration générale de l'application. """ updates = [] if data.school_year is not None: updates.append(("school_year", data.school_year)) if data.school_name is not None: updates.append(("school_name", data.school_name)) if data.default_grading_system is not None: updates.append(("default_grading_system", data.default_grading_system)) await upsert_app_configs(session, dict(updates)) await session.commit() # Récupérer la configuration mise à jour config_query = select(AppConfig) config_result = await session.execute(config_query) configs = config_result.scalars().all() config_dict = {c.key: c.value for c in configs} return AppConfigRead( school_year=config_dict.get("school_year", "2024-2025"), school_name=config_dict.get("school_name", ""), default_grading_system=config_dict.get("default_grading_system", "notes") ) # === Configuration SMTP Email === @router.get("/smtp", response_model=SMTPConfigRead) async def get_smtp_config(session: AsyncSessionDep): """ Récupère la configuration SMTP pour l'envoi d'emails. Note: Le mot de passe n'est jamais retourné pour des raisons de sécurité. """ # Récupérer toutes les configs email query = select(AppConfig).where(AppConfig.key.like("email.%")) result = await session.execute(query) configs = result.scalars().all() config_dict = {c.key: c.value for c in configs} # Créer le service pour vérifier si configuré email_service = EmailService.from_database_config(config_dict) return SMTPConfigRead( host=config_dict.get("email.smtp_host", ""), port=int(config_dict.get("email.smtp_port", 587)), username=config_dict.get("email.username", ""), use_tls=str(config_dict.get("email.use_tls", "true")).lower() == "true", from_name=config_dict.get("email.from_name", "Notytex"), from_address=config_dict.get("email.from_address", ""), is_configured=email_service.is_configured() ) @router.put("/smtp", response_model=SMTPConfigRead) async def update_smtp_config( data: SMTPConfigUpdate, session: AsyncSessionDep, ): """ Met à jour la configuration SMTP. Tous les champs sont optionnels - seuls ceux fournis seront mis à jour. """ updates = [] if data.host is not None: updates.append(("email.smtp_host", data.host)) if data.port is not None: updates.append(("email.smtp_port", str(data.port))) if data.username is not None: updates.append(("email.username", data.username)) if data.password is not None: updates.append(("email.password", data.password)) if data.use_tls is not None: updates.append(("email.use_tls", "true" if data.use_tls else "false")) if data.from_name is not None: updates.append(("email.from_name", data.from_name)) if data.from_address is not None: updates.append(("email.from_address", data.from_address)) await upsert_app_configs(session, dict(updates)) await session.commit() # Retourner la config mise à jour return await get_smtp_config(session) @router.post("/smtp/test", response_model=SMTPTestResponse) async def test_smtp_config( data: SMTPTestRequest, session: AsyncSessionDep, ): """ Teste la configuration SMTP en envoyant un email de test. """ # Récupérer la config query = select(AppConfig).where(AppConfig.key.like("email.%")) result = await session.execute(query) configs = result.scalars().all() config_dict = {c.key: c.value for c in configs} # Créer le service et envoyer le test email_service = EmailService.from_database_config(config_dict) if not email_service.is_configured(): return SMTPTestResponse( success=False, message="Configuration SMTP incomplète. Vérifiez les paramètres." ) result = email_service.send_test_email(data.test_email) return SMTPTestResponse( success=result.get("success", False), message=result.get("message", result.get("error", "Erreur inconnue")) ) # === Configuration dégradé des notes === @router.get("/notes-gradient", response_model=NotesGradientRead) async def get_notes_gradient(session: AsyncSessionDep): """ Récupère la configuration du dégradé de couleurs pour les notes. """ query = select(AppConfig).where(AppConfig.key.like("grading.notes_gradient.%")) result = await session.execute(query) configs = result.scalars().all() config_dict = {c.key: c.value for c in configs} return NotesGradientRead( min_color=config_dict.get("grading.notes_gradient.min_color", "#dc2626"), max_color=config_dict.get("grading.notes_gradient.max_color", "#059669"), enabled=str(config_dict.get("grading.notes_gradient.enabled", "false")).lower() == "true" ) @router.put("/notes-gradient", response_model=NotesGradientRead) async def update_notes_gradient( data: NotesGradientUpdate, session: AsyncSessionDep, ): """ Met à jour la configuration du dégradé de couleurs pour les notes. """ updates = [] if data.min_color is not None: updates.append(("grading.notes_gradient.min_color", data.min_color)) if data.max_color is not None: updates.append(("grading.notes_gradient.max_color", data.max_color)) if data.enabled is not None: updates.append(("grading.notes_gradient.enabled", "true" if data.enabled else "false")) await upsert_app_configs(session, dict(updates)) await session.commit() return await get_notes_gradient(session) # === Configuration de la base de données === def _format_size(size_bytes: int) -> str: """Formate une taille en bytes en format lisible.""" size = float(size_bytes) for unit in ['B', 'KB', 'MB', 'GB']: if size < 1024: return f"{size:.1f} {unit}" size /= 1024 return f"{size:.1f} TB" @router.get("/database", response_model=DatabaseConfigRead) async def get_database_config(): """ Récupère la configuration actuelle de la base de données. Retourne le chemin configuré, le chemin résolu, et des informations sur le fichier (existence, taille). """ from core.config import settings from pathlib import Path resolved_path = settings.database_path_resolved exists = resolved_path.exists() size_bytes = None size_human = None if exists: size_bytes = resolved_path.stat().st_size size_human = _format_size(size_bytes) return DatabaseConfigRead( path=settings.database_path_configured, resolved_path=str(resolved_path), exists=exists, size_bytes=size_bytes, size_human=size_human ) @router.put("/database", response_model=DatabaseConfigResponse) async def update_database_config( data: DatabaseConfigUpdate, ): """ Met à jour le chemin de la base de données. IMPORTANT: Cette modification met à jour le fichier .env et nécessite un redémarrage de l'application pour prendre effet. Le chemin peut être: - Relatif au répertoire backend (ex: "data/school.db") - Absolu (ex: "/var/data/notytex/school.db") """ from pathlib import Path import os # Valider que le chemin est valide new_path = Path(data.path) # Si relatif, vérifier que le répertoire parent existe ou peut être créé if not new_path.is_absolute(): resolved = Path(__file__).parent.parent.parent / data.path else: resolved = new_path # Vérifier que le répertoire parent existe parent_dir = resolved.parent if not parent_dir.exists(): try: parent_dir.mkdir(parents=True, exist_ok=True) except Exception as e: raise HTTPException( status_code=400, detail=f"Impossible de créer le répertoire parent: {str(e)}" ) # Trouver le fichier .env env_file = Path(__file__).parent.parent.parent / ".env" # Lire le contenu actuel ou créer un nouveau fichier env_content = "" if env_file.exists(): env_content = env_file.read_text() # Mettre à jour ou ajouter DATABASE_PATH import re if "DATABASE_PATH=" in env_content: # Remplacer la valeur existante env_content = re.sub( r'^DATABASE_PATH=.*$', f'DATABASE_PATH={data.path}', env_content, flags=re.MULTILINE ) else: # Ajouter la nouvelle variable if env_content and not env_content.endswith('\n'): env_content += '\n' env_content += f'DATABASE_PATH={data.path}\n' # Écrire le fichier .env try: env_file.write_text(env_content) except Exception as e: raise HTTPException( status_code=500, detail=f"Erreur lors de l'écriture du fichier .env: {str(e)}" ) return DatabaseConfigResponse( success=True, message=f"Chemin de base de données mis à jour. Redémarrez l'application pour appliquer les changements.", requires_restart=True, new_path=data.path )