""" Routes API pour les évaluations (Assessment). """ from typing import Optional, List from datetime import date from fastapi import APIRouter, HTTPException, Query from fastapi.responses import HTMLResponse from sqlalchemy import select, func from sqlalchemy.orm import selectinload from api.dependencies import AsyncSessionDep from api.helpers import ( count_eligible_students, eligible_student_ids_subquery, get_eligible_enrollments, build_heatmap, ) from infrastructure.database.models import ( Assessment, Exercise, GradingElement, Grade, ClassGroup, Student, StudentEnrollment, ) from schemas.assessment import ( AssessmentRead, AssessmentWithProgress, AssessmentDetail, AssessmentList, ExerciseRead, GradingElementRead, AssessmentResults, AssessmentStatistics, StudentScore, AssessmentCreate, AssessmentUpdate, AssessmentResponse, SendReportsRequest, SendReportResult, SendReportsResponse, ) from schemas.grading import BulkGradeCreate, BulkGradeResponse, GradeRead from domain.services import GradingCalculator, StatisticsService, StudentReportService, generate_report_html, ConfigService from infrastructure.external.email_service import EmailService, SMTPConfig from infrastructure.database.models import AppConfig router = APIRouter(prefix="/assessments", tags=["Assessments"]) def calculate_grading_progress( assessment: Assessment, grades_count: int, total_elements: int, eligible_students_count: int ) -> dict: """Calcule la progression de correction d'une évaluation.""" total = total_elements * eligible_students_count completed = grades_count if total == 0: percentage = 0 status = "not_started" else: percentage = round((completed / total) * 100) if percentage == 0: status = "not_started" elif percentage == 100: status = "completed" else: status = "in_progress" return { "percentage": percentage, "completed": completed, "total": total, "status": status, "students_count": eligible_students_count } @router.get("", response_model=AssessmentList) async def get_assessments( session: AsyncSessionDep, class_id: Optional[int] = Query(None, description="Filtrer par classe"), trimester: Optional[int] = Query(None, ge=1, le=3, description="Filtrer par trimestre"), sort: Optional[str] = Query("date_desc", description="Tri: date_desc, date_asc, title"), ): """ Récupère la liste de toutes les évaluations. """ query = ( select(Assessment) .options( selectinload(Assessment.class_group), selectinload(Assessment.exercises).selectinload(Exercise.grading_elements) ) ) if class_id: query = query.where(Assessment.class_group_id == class_id) if trimester: query = query.where(Assessment.trimester == trimester) # Tri if sort == "date_asc": query = query.order_by(Assessment.date.asc()) elif sort == "title": query = query.order_by(Assessment.title) else: # date_desc par défaut query = query.order_by(Assessment.date.desc()) result = await session.execute(query) assessments = result.scalars().all() assessments_list = [] for assessment in assessments: # Calculer le total des points total_max_points = 0 total_elements = 0 for exercise in assessment.exercises: for element in exercise.grading_elements: total_max_points += element.max_points total_elements += 1 # Compter les élèves éligibles (inscrits à la date de l'évaluation) eligible_students_count = await count_eligible_students( session, assessment.class_group_id, assessment.date ) # Compter les notes saisies uniquement pour les élèves éligibles eligible_student_ids = eligible_student_ids_subquery( assessment.class_group_id, assessment.date ) grades_query = select(func.count(Grade.id)).where( Grade.grading_element_id.in_( select(GradingElement.id) .join(Exercise) .where(Exercise.assessment_id == assessment.id) ), Grade.student_id.in_(eligible_student_ids), Grade.value.isnot(None) ) grades_result = await session.execute(grades_query) grades_count = grades_result.scalar() or 0 progress = calculate_grading_progress( assessment, grades_count, total_elements, eligible_students_count ) assessments_list.append( AssessmentWithProgress( id=assessment.id, title=assessment.title, description=assessment.description, date=assessment.date, trimester=assessment.trimester, coefficient=assessment.coefficient, class_group_id=assessment.class_group_id, class_name=assessment.class_group.name if assessment.class_group else None, grading_progress=progress, total_max_points=total_max_points ) ) return AssessmentList( assessments=assessments_list, total=len(assessments_list) ) @router.get("/{assessment_id}", response_model=AssessmentDetail) async def get_assessment( assessment_id: int, session: AsyncSessionDep, ): """ Récupère les détails d'une évaluation avec ses exercices et éléments de notation. """ query = ( select(Assessment) .options( selectinload(Assessment.class_group), selectinload(Assessment.exercises) .selectinload(Exercise.grading_elements) .selectinload(GradingElement.domain) ) .where(Assessment.id == assessment_id) ) result = await session.execute(query) assessment = result.scalar_one_or_none() if not assessment: raise HTTPException(status_code=404, detail="Évaluation non trouvée") # Construire la liste des exercices exercises_list = [] total_max_points = 0 total_elements = 0 for exercise in sorted(assessment.exercises, key=lambda e: e.order): elements_list = [] for element in exercise.grading_elements: total_max_points += element.max_points total_elements += 1 elements_list.append( GradingElementRead( id=element.id, exercise_id=element.exercise_id, label=element.label, description=element.description, skill=element.skill, max_points=element.max_points, grading_type=element.grading_type, domain_id=element.domain_id, domain_name=element.domain.name if element.domain else None, domain_color=element.domain.color if element.domain else None ) ) exercises_list.append( ExerciseRead( id=exercise.id, assessment_id=exercise.assessment_id, title=exercise.title, description=exercise.description, order=exercise.order, grading_elements=elements_list ) ) # Calculer la progression eligible_students_count = await count_eligible_students( session, assessment.class_group_id, assessment.date ) # Compter les notes uniquement pour les élèves éligibles eligible_student_ids = eligible_student_ids_subquery( assessment.class_group_id, assessment.date ) grades_query = select(func.count(Grade.id)).where( Grade.grading_element_id.in_( select(GradingElement.id) .join(Exercise) .where(Exercise.assessment_id == assessment.id) ), Grade.student_id.in_(eligible_student_ids), Grade.value.isnot(None) ) grades_result = await session.execute(grades_query) grades_count = grades_result.scalar() or 0 progress = calculate_grading_progress( assessment, grades_count, total_elements, eligible_students_count ) return AssessmentDetail( id=assessment.id, title=assessment.title, description=assessment.description, date=assessment.date, trimester=assessment.trimester, coefficient=assessment.coefficient, class_group_id=assessment.class_group_id, class_name=assessment.class_group.name if assessment.class_group else None, grading_progress=progress, total_max_points=total_max_points, exercises=exercises_list ) @router.get("/{assessment_id}/results", response_model=AssessmentResults) async def get_assessment_results( assessment_id: int, session: AsyncSessionDep, ): """ Récupère les résultats d'une évaluation avec statistiques et scores par élève. Utilise les services domain pour les calculs de scores et statistiques. """ # Initialiser les services domain grading_calc = GradingCalculator() stats_service = StatisticsService() # Récupérer l'évaluation avec ses éléments et domaines query = ( select(Assessment) .options( selectinload(Assessment.class_group), selectinload(Assessment.exercises) .selectinload(Exercise.grading_elements) .selectinload(GradingElement.grades), selectinload(Assessment.exercises) .selectinload(Exercise.grading_elements) .selectinload(GradingElement.domain) ) .where(Assessment.id == assessment_id) ) result = await session.execute(query) assessment = result.scalar_one_or_none() if not assessment: raise HTTPException(status_code=404, detail="Évaluation non trouvée") # Récupérer les élèves éligibles enrollments = await get_eligible_enrollments( session, assessment.class_group_id, assessment.date ) # Calculer le total des points maximum total_max_points = 0 for exercise in assessment.exercises: for element in exercise.grading_elements: total_max_points += element.max_points # Calculer les scores par élève en utilisant GradingCalculator student_scores = {} for enrollment in enrollments: student = enrollment.student student_id = student.id total_score = 0.0 counted_max = 0.0 exercise_scores = {} has_any_grade = False # Tracker si l'élève a au moins une note for exercise in assessment.exercises: ex_score = 0.0 ex_max = 0.0 for element in exercise.grading_elements: # Trouver la note de l'élève pour cet élément grade = None for g in element.grades: if g.student_id == student_id: grade = g break if grade and grade.value: value = grade.value.strip() if value: # La valeur n'est pas vide has_any_grade = True # Utiliser GradingCalculator pour le calcul calculated_score = grading_calc.calculate_score( value, element.grading_type, element.max_points ) # Vérifier si compte dans le total if grading_calc.is_counted_in_total(value): if calculated_score is not None: # Pas dispensé ex_score += calculated_score ex_max += element.max_points exercise_scores[exercise.id] = { "score": round(ex_score, 2), "max": ex_max } total_score += ex_score counted_max += ex_max percentage = round((total_score / counted_max * 100) if counted_max > 0 else 0, 1) student_scores[student_id] = StudentScore( student_id=student_id, student_name=student.full_name_reversed, email=student.email, # Ajouter l'email de l'élève total_score=round(total_score, 2), total_max_points=counted_max, percentage=percentage, exercise_scores=exercise_scores, has_grades=has_any_grade ) # Utiliser StatisticsService pour les calculs statistiques # Inclure uniquement les élèves qui ont des notes scores = [s.total_score for s in student_scores.values() if s.total_max_points > 0 and s.has_grades] stats_result = stats_service.calculate_statistics(scores) stats = AssessmentStatistics( count=stats_result.count, mean=stats_result.mean, median=stats_result.median, min=stats_result.min, max=stats_result.max, std_dev=stats_result.std_dev ) # Utiliser StatisticsService pour l'histogramme histogram_data = stats_service.create_simple_histogram(scores, total_max_points) # Trier les scores par nom sorted_scores = sorted( student_scores.values(), key=lambda s: s.student_name.lower() ) # Calculer les heatmaps par compétence et domaine competences_heatmap = None domains_heatmap = None # Collecter les compétences et domaines uniques all_competences = set() all_domains = {} # name -> color for exercise in assessment.exercises: for element in exercise.grading_elements: if element.skill: all_competences.add(element.skill) if element.domain: all_domains[element.domain.name] = element.domain.color # Calculer heatmap des compétences si présentes competences_heatmap = build_heatmap( enrollments=enrollments, assessment=assessment, items=all_competences, item_extractor=lambda el: el.skill, grading_calc=grading_calc, sorted_student_names=[s.student_name for s in sorted_scores], ) # Calculer heatmap des domaines si présents domains_heatmap = build_heatmap( enrollments=enrollments, assessment=assessment, items=all_domains, item_extractor=lambda el: el.domain.name if el.domain else None, grading_calc=grading_calc, sorted_student_names=[s.student_name for s in sorted_scores], color_map=all_domains, ) return AssessmentResults( assessment_id=assessment.id, assessment_title=assessment.title, statistics=stats, students_scores=sorted_scores, histogram_data=histogram_data, competences_heatmap=competences_heatmap, domains_heatmap=domains_heatmap ) @router.post("", response_model=AssessmentDetail, status_code=201) async def create_assessment( assessment_data: AssessmentCreate, session: AsyncSessionDep, ): """ Crée une évaluation complète avec ses exercices et éléments de notation. C'est une création unifiée qui permet de créer en une seule requête: - L'évaluation - Ses exercices - Les éléments de notation de chaque exercice """ # Vérifier que la classe existe class_query = select(ClassGroup).where(ClassGroup.id == assessment_data.class_group_id) class_result = await session.execute(class_query) class_group = class_result.scalar_one_or_none() if not class_group: raise HTTPException(status_code=404, detail="Classe non trouvée") # Créer l'évaluation assessment = Assessment( title=assessment_data.title, description=assessment_data.description, date=assessment_data.date, trimester=assessment_data.trimester, coefficient=assessment_data.coefficient, class_group_id=assessment_data.class_group_id ) session.add(assessment) await session.flush() # Pour obtenir l'ID # Créer les exercices et leurs éléments exercises_list = [] total_max_points = 0 total_elements = 0 for ex_idx, ex_data in enumerate(assessment_data.exercises): exercise = Exercise( assessment_id=assessment.id, title=ex_data.title, description=ex_data.description, order=ex_data.order if ex_data.order else ex_idx + 1 ) session.add(exercise) await session.flush() elements_list = [] for elem_data in ex_data.grading_elements: element = GradingElement( exercise_id=exercise.id, label=elem_data.label, description=elem_data.description, skill=elem_data.skill, max_points=elem_data.max_points, grading_type=elem_data.grading_type, domain_id=elem_data.domain_id ) session.add(element) total_max_points += elem_data.max_points total_elements += 1 elements_list.append( GradingElementRead( id=0, # Sera mis à jour après flush exercise_id=exercise.id, label=elem_data.label, description=elem_data.description, skill=elem_data.skill, max_points=elem_data.max_points, grading_type=elem_data.grading_type, domain_id=elem_data.domain_id, domain_name=None, domain_color=None ) ) exercises_list.append( ExerciseRead( id=exercise.id, assessment_id=assessment.id, title=exercise.title, description=exercise.description, order=exercise.order, grading_elements=elements_list ) ) await session.commit() # Calculer la progression (0% car nouvelle évaluation) progress = { "percentage": 0, "completed": 0, "total": 0, "status": "not_started", "students_count": 0 } return AssessmentDetail( id=assessment.id, title=assessment.title, description=assessment.description, date=assessment.date, trimester=assessment.trimester, coefficient=assessment.coefficient, class_group_id=assessment.class_group_id, class_name=class_group.name, grading_progress=progress, total_max_points=total_max_points, exercises=exercises_list ) @router.put("/{assessment_id}", response_model=AssessmentDetail) async def update_assessment( assessment_id: int, assessment_data: AssessmentUpdate, session: AsyncSessionDep, ): """ Modifie une évaluation existante. Si des exercices sont fournis, ils remplacent entièrement les exercices existants. """ # Récupérer l'évaluation avec ses exercices query = ( select(Assessment) .options( selectinload(Assessment.class_group), selectinload(Assessment.exercises).selectinload(Exercise.grading_elements) ) .where(Assessment.id == assessment_id) ) result = await session.execute(query) assessment = result.scalar_one_or_none() if not assessment: raise HTTPException(status_code=404, detail="Évaluation non trouvée") # Mettre à jour les champs de base if assessment_data.title is not None: assessment.title = assessment_data.title if assessment_data.description is not None: assessment.description = assessment_data.description if assessment_data.date is not None: # Convertir string en date si nécessaire if isinstance(assessment_data.date, str): from datetime import date as date_type assessment.date = date_type.fromisoformat(assessment_data.date) else: assessment.date = assessment_data.date if assessment_data.trimester is not None: assessment.trimester = assessment_data.trimester if assessment_data.coefficient is not None: assessment.coefficient = assessment_data.coefficient # Si des exercices sont fournis, mise à jour intelligente if assessment_data.exercises is not None: # Collecter les IDs des exercices et éléments à conserver new_exercise_ids = {ex.id for ex in assessment_data.exercises if ex.id} new_element_ids = set() for ex in assessment_data.exercises: for el in ex.grading_elements: if el.id: new_element_ids.add(el.id) # Supprimer les exercices qui ne sont plus dans la liste for exercise in assessment.exercises: if exercise.id not in new_exercise_ids: await session.delete(exercise) else: # Supprimer les éléments qui ne sont plus dans la liste for element in exercise.grading_elements: if element.id not in new_element_ids: await session.delete(element) await session.flush() # Mettre à jour ou créer les exercices for ex_idx, ex_data in enumerate(assessment_data.exercises): if ex_data.id: # Mise à jour d'un exercice existant ex_query = select(Exercise).where(Exercise.id == ex_data.id) ex_result = await session.execute(ex_query) exercise = ex_result.scalar_one_or_none() if exercise: exercise.title = ex_data.title exercise.description = ex_data.description exercise.order = ex_data.order if ex_data.order else ex_idx + 1 else: # Création d'un nouvel exercice exercise = Exercise( assessment_id=assessment.id, title=ex_data.title, description=ex_data.description, order=ex_data.order if ex_data.order else ex_idx + 1 ) session.add(exercise) await session.flush() # Mettre à jour ou créer les éléments de notation for elem_data in ex_data.grading_elements: if elem_data.id: # Mise à jour d'un élément existant (préserve les notes) el_query = select(GradingElement).where(GradingElement.id == elem_data.id) el_result = await session.execute(el_query) element = el_result.scalar_one_or_none() if element: element.label = elem_data.label element.description = elem_data.description element.skill = elem_data.skill element.max_points = elem_data.max_points element.grading_type = elem_data.grading_type element.domain_id = elem_data.domain_id else: # Création d'un nouvel élément element = GradingElement( exercise_id=exercise.id, label=elem_data.label, description=elem_data.description, skill=elem_data.skill, max_points=elem_data.max_points, grading_type=elem_data.grading_type, domain_id=elem_data.domain_id ) session.add(element) await session.commit() # Recharger l'évaluation avec les nouvelles données query = ( select(Assessment) .options( selectinload(Assessment.class_group), selectinload(Assessment.exercises).selectinload(Exercise.grading_elements) ) .where(Assessment.id == assessment_id) ) result = await session.execute(query) assessment = result.scalar_one_or_none() # Vérification de sécurité (ne devrait jamais arriver après commit) if not assessment: raise HTTPException(status_code=500, detail="Erreur lors de la mise à jour") # Construire la réponse exercises_list = [] total_max_points = 0 total_elements = 0 for exercise in sorted(assessment.exercises, key=lambda e: e.order): elements_list = [] for element in exercise.grading_elements: total_max_points += element.max_points total_elements += 1 elements_list.append( GradingElementRead( id=element.id, exercise_id=element.exercise_id, label=element.label, description=element.description, skill=element.skill, max_points=element.max_points, grading_type=element.grading_type, domain_id=element.domain_id, domain_name=None, domain_color=None ) ) exercises_list.append( ExerciseRead( id=exercise.id, assessment_id=exercise.assessment_id, title=exercise.title, description=exercise.description, order=exercise.order, grading_elements=elements_list ) ) # Calculer la progression eligible_students_count = await count_eligible_students( session, assessment.class_group_id, assessment.date ) # Compter les notes uniquement pour les élèves éligibles eligible_student_ids = eligible_student_ids_subquery( assessment.class_group_id, assessment.date ) grades_query = select(func.count(Grade.id)).where( Grade.grading_element_id.in_( select(GradingElement.id) .join(Exercise) .where(Exercise.assessment_id == assessment.id) ), Grade.student_id.in_(eligible_student_ids), Grade.value.isnot(None) ) grades_result = await session.execute(grades_query) grades_count = grades_result.scalar() or 0 progress = calculate_grading_progress( assessment, grades_count, total_elements, eligible_students_count ) return AssessmentDetail( id=assessment.id, title=assessment.title, description=assessment.description, date=assessment.date, trimester=assessment.trimester, coefficient=assessment.coefficient, class_group_id=assessment.class_group_id, class_name=assessment.class_group.name if assessment.class_group else None, grading_progress=progress, total_max_points=total_max_points, exercises=exercises_list ) @router.delete("/{assessment_id}", status_code=204) async def delete_assessment( assessment_id: int, session: AsyncSessionDep, ): """ Supprime une évaluation avec tous ses exercices, éléments et notes. """ # Récupérer l'évaluation query = select(Assessment).where(Assessment.id == assessment_id) result = await session.execute(query) assessment = result.scalar_one_or_none() if not assessment: raise HTTPException(status_code=404, detail="Évaluation non trouvée") # Supprimer l'évaluation (cascade supprimera exercices, éléments et notes) await session.delete(assessment) await session.commit() return None @router.get("/{assessment_id}/grades", response_model=List[GradeRead]) async def get_grades( assessment_id: int, session: AsyncSessionDep, ): """ Récupère toutes les notes d'une évaluation. Retourne la liste des notes avec student_id, element_id et value. """ # Vérifier que l'évaluation existe assessment_query = select(Assessment).where(Assessment.id == assessment_id) assessment_result = await session.execute(assessment_query) assessment = assessment_result.scalar_one_or_none() if not assessment: raise HTTPException(status_code=404, detail="Évaluation non trouvée") # Récupérer toutes les notes pour les éléments de cette évaluation grades_query = ( select(Grade) .join(GradingElement) .join(Exercise) .where(Exercise.assessment_id == assessment_id) ) grades_result = await session.execute(grades_query) grades = grades_result.scalars().all() # Convertir en format attendu par le frontend return [ GradeRead( id=grade.id, student_id=grade.student_id, grading_element_id=grade.grading_element_id, value=grade.value, comment=grade.comment ) for grade in grades ] @router.post("/{assessment_id}/grades", response_model=BulkGradeResponse) async def save_grades( assessment_id: int, grades_data: BulkGradeCreate, session: AsyncSessionDep, ): """ Sauvegarde des notes pour une évaluation. Effectue un upsert : crée la note si elle n'existe pas, la met à jour sinon. """ # Vérifier que l'évaluation existe assessment_query = select(Assessment).where(Assessment.id == assessment_id) assessment_result = await session.execute(assessment_query) assessment = assessment_result.scalar_one_or_none() if not assessment: raise HTTPException(status_code=404, detail="Évaluation non trouvée") # Récupérer les IDs des éléments de notation de cette évaluation elements_query = ( select(GradingElement.id) .join(Exercise) .where(Exercise.assessment_id == assessment_id) ) elements_result = await session.execute(elements_query) valid_element_ids = {row[0] for row in elements_result.fetchall()} created_count = 0 updated_count = 0 for grade_data in grades_data.grades: # Vérifier que l'élément appartient bien à cette évaluation if grade_data.grading_element_id not in valid_element_ids: raise HTTPException( status_code=400, detail=f"L'élément de notation {grade_data.grading_element_id} " f"n'appartient pas à cette évaluation" ) # Chercher une note existante existing_query = select(Grade).where( Grade.student_id == grade_data.student_id, Grade.grading_element_id == grade_data.grading_element_id ) existing_result = await session.execute(existing_query) existing_grade = existing_result.scalar_one_or_none() if existing_grade: # Mettre à jour existing_grade.value = grade_data.value if grade_data.comment is not None: existing_grade.comment = grade_data.comment updated_count += 1 else: # Créer new_grade = Grade( student_id=grade_data.student_id, grading_element_id=grade_data.grading_element_id, value=grade_data.value, comment=grade_data.comment ) session.add(new_grade) created_count += 1 await session.commit() return BulkGradeResponse( saved_count=created_count + updated_count, updated_count=updated_count, created_count=created_count, message=f"{created_count + updated_count} note(s) enregistrée(s) " f"({created_count} créée(s), {updated_count} mise(s) à jour)" ) @router.post("/{assessment_id}/send-reports", response_model=SendReportsResponse) async def send_reports( assessment_id: int, request: SendReportsRequest, session: AsyncSessionDep, ): """ Envoie les bilans d'évaluation par email aux élèves sélectionnés. Cette fonctionnalité nécessite la configuration du service d'email (SMTP). """ # Récupérer la configuration SMTP depuis la base de données config_query = select(AppConfig) config_result = await session.execute(config_query) config_items = config_result.scalars().all() # Convertir en dict db_config = {item.key: item.value for item in config_items} # Créer le service d'email email_service = EmailService.from_database_config(db_config) if not email_service.is_configured(): return SendReportsResponse( success=False, total_sent=0, total_failed=len(request.student_ids), results=[], message="Service d'email non configuré. Configurez SMTP dans les paramètres." ) # Vérifier que l'évaluation existe avec toutes les relations nécessaires assessment_query = ( select(Assessment) .options( selectinload(Assessment.class_group), selectinload(Assessment.exercises).selectinload(Exercise.grading_elements).selectinload(GradingElement.grades), selectinload(Assessment.exercises).selectinload(Exercise.grading_elements).selectinload(GradingElement.domain) ) .where(Assessment.id == assessment_id) ) assessment_result = await session.execute(assessment_query) assessment = assessment_result.scalar_one_or_none() if not assessment: raise HTTPException(status_code=404, detail="Évaluation non trouvée") # Récupérer les élèves demandés students_query = select(Student).where(Student.id.in_(request.student_ids)) students_result = await session.execute(students_query) students = students_result.scalars().all() # Vérifier que tous les élèves existent found_ids = {s.id for s in students} missing_ids = set(request.student_ids) - found_ids if missing_ids: raise HTTPException( status_code=400, detail=f"Élèves non trouvés: {list(missing_ids)}" ) # Préparer les données pour le service de rapport assessment_data = { 'title': assessment.title, 'description': assessment.description, 'date': assessment.date.isoformat() if assessment.date else '', 'trimester': assessment.trimester, 'class_name': assessment.class_group.name, 'coefficient': assessment.coefficient } # Préparer les données des exercices et éléments exercises_data = [] all_students_grades = {} # {student_id: [grades]} for exercise in sorted(assessment.exercises, key=lambda x: x.order): elements_data = [] for element in exercise.grading_elements: element_data = { 'id': element.id, 'exercise_id': exercise.id, 'label': element.label, 'description': element.description, 'skill': element.skill, 'domain_name': element.domain.name if element.domain else None, 'domain_color': element.domain.color if element.domain else None, 'grading_type': element.grading_type, 'max_points': element.max_points } elements_data.append(element_data) # Collecter les notes for grade in element.grades: if grade.student_id not in all_students_grades: all_students_grades[grade.student_id] = [] all_students_grades[grade.student_id].append({ 'element_id': element.id, 'value': grade.value, 'comment': grade.comment }) exercises_data.append({ 'id': exercise.id, 'title': exercise.title, 'description': exercise.description, 'order': exercise.order, 'elements': elements_data }) # Créer le service de rapport report_service = StudentReportService() # Préparer les résultats results = [] total_sent = 0 total_failed = 0 for student in students: student_name = student.full_name # Vérifier que l'élève a une adresse email if not student.email: results.append(SendReportResult( student_id=student.id, student_name=student_name, email=None, success=False, error_message="Pas d'adresse email configurée" )) total_failed += 1 continue try: # Générer le rapport student_data = { 'id': student.id, 'first_name': student.first_name, 'last_name': student.last_name, 'email': student.email } report_data = report_service.generate_student_report( assessment_data, student_data, all_students_grades, exercises_data ) # Générer le HTML html_body = generate_report_html(report_data, request.custom_message or "") # Envoyer l'email subject = f"Bilan d'évaluation : {assessment.title}" send_result = email_service.send_email( to_emails=[student.email], subject=subject, html_body=html_body ) if send_result['success']: results.append(SendReportResult( student_id=student.id, student_name=student_name, email=student.email, success=True, error_message=None )) total_sent += 1 else: results.append(SendReportResult( student_id=student.id, student_name=student_name, email=student.email, success=False, error_message=send_result.get('error', 'Erreur inconnue') )) total_failed += 1 except Exception as e: results.append(SendReportResult( student_id=student.id, student_name=student_name, email=student.email, success=False, error_message=str(e) )) total_failed += 1 # Message de résultat if total_sent > 0 and total_failed == 0: message = f"{total_sent} bilan(s) envoyé(s) avec succès." elif total_sent > 0: message = f"{total_sent} bilan(s) envoyé(s), {total_failed} échec(s)." else: message = f"Échec de l'envoi de {total_failed} bilan(s)." return SendReportsResponse( success=total_sent > 0, total_sent=total_sent, total_failed=total_failed, results=results, message=message ) @router.get("/{assessment_id}/preview-report/{student_id}", response_class=HTMLResponse) async def preview_report( assessment_id: int, student_id: int, session: AsyncSessionDep, ): """ Génère un aperçu HTML du bilan d'un élève sans l'envoyer. Utile pour vérifier le rendu du bilan avant l'envoi groupé. Retourne le HTML brut qui serait envoyé par email, directement affichable dans le navigateur. """ # Récupérer l'évaluation avec toutes les relations nécessaires assessment_query = ( select(Assessment) .options( selectinload(Assessment.class_group), selectinload(Assessment.exercises).selectinload(Exercise.grading_elements).selectinload(GradingElement.grades), selectinload(Assessment.exercises).selectinload(Exercise.grading_elements).selectinload(GradingElement.domain) ) .where(Assessment.id == assessment_id) ) assessment_result = await session.execute(assessment_query) assessment = assessment_result.scalar_one_or_none() if not assessment: raise HTTPException(status_code=404, detail="Évaluation non trouvée") # Récupérer l'élève student_query = select(Student).where(Student.id == student_id) student_result = await session.execute(student_query) student = student_result.scalar_one_or_none() if not student: raise HTTPException(status_code=404, detail="Élève non trouvé") # Préparer les données pour le service de rapport (même logique que send_reports) assessment_data = { 'title': assessment.title, 'description': assessment.description, 'date': assessment.date.isoformat() if assessment.date else '', 'trimester': assessment.trimester, 'class_name': assessment.class_group.name, 'coefficient': assessment.coefficient } # Préparer les données des exercices et éléments exercises_data = [] all_students_grades = {} # {student_id: [grades]} for exercise in sorted(assessment.exercises, key=lambda x: x.order): elements_data = [] for element in exercise.grading_elements: element_data = { 'id': element.id, 'exercise_id': exercise.id, 'label': element.label, 'description': element.description, 'skill': element.skill, 'domain_name': element.domain.name if element.domain else None, 'domain_color': element.domain.color if element.domain else None, 'grading_type': element.grading_type, 'max_points': element.max_points } elements_data.append(element_data) # Collecter les notes for grade in element.grades: if grade.student_id not in all_students_grades: all_students_grades[grade.student_id] = [] all_students_grades[grade.student_id].append({ 'element_id': element.id, 'value': grade.value, 'comment': grade.comment }) exercises_data.append({ 'id': exercise.id, 'title': exercise.title, 'description': exercise.description, 'order': exercise.order, 'elements': elements_data }) # Vérifier que l'élève a des notes pour cette évaluation if student_id not in all_students_grades: raise HTTPException( status_code=400, detail=f"L'élève {student.full_name} n'a pas de notes pour cette évaluation" ) # Créer le service de rapport report_service = StudentReportService() # Préparer les données de l'élève student_data = { 'id': student.id, 'first_name': student.first_name, 'last_name': student.last_name, 'email': student.email } try: # Générer le rapport report_data = report_service.generate_student_report( assessment_data, student_data, all_students_grades, exercises_data ) # Générer le HTML (sans message personnalisé pour la prévisualisation) html_body = generate_report_html(report_data, "") # Retourner directement le HTML pour affichage dans le navigateur return HTMLResponse(content=html_body) except Exception as e: raise HTTPException( status_code=500, detail=f"Erreur lors de la génération du rapport: {str(e)}" )