Files
notytex/backend/api/routes/assessments.py
Bertrand Benjamin f76b033d55
All checks were successful
Build and Publish Docker Images / Build Frontend Image (push) Successful in 2m56s
Build and Publish Docker Images / Build Backend Image (push) Successful in 3m5s
Build and Publish Docker Images / Build Summary (push) Successful in 3s
feat(mail): restauration de l'envoie de mail
2025-12-04 06:04:13 +01:00

1344 lines
48 KiB
Python

"""
Routes API pour les évaluations (Assessment).
"""
from typing import Optional, List
from datetime import date
from fastapi import APIRouter, HTTPException, Query
from fastapi.responses import HTMLResponse
from sqlalchemy import select, func
from sqlalchemy.orm import selectinload
from api.dependencies import AsyncSessionDep
from infrastructure.database.models import (
Assessment,
Exercise,
GradingElement,
Grade,
ClassGroup,
Student,
StudentEnrollment,
)
from schemas.assessment import (
AssessmentRead,
AssessmentWithProgress,
AssessmentDetail,
AssessmentList,
ExerciseRead,
GradingElementRead,
AssessmentResults,
AssessmentStatistics,
StudentScore,
AssessmentCreate,
AssessmentUpdate,
AssessmentResponse,
SendReportsRequest,
SendReportResult,
SendReportsResponse,
HeatmapCell,
HeatmapData,
)
from schemas.grading import BulkGradeCreate, BulkGradeResponse, GradeRead
from domain.services import GradingCalculator, StatisticsService, StudentReportService, generate_report_html, ConfigService
from infrastructure.external.email_service import EmailService, SMTPConfig
from infrastructure.database.models import AppConfig
router = APIRouter(prefix="/assessments", tags=["Assessments"])
def calculate_grading_progress(
assessment: Assessment,
grades_count: int,
total_elements: int,
eligible_students_count: int
) -> dict:
"""Calcule la progression de correction d'une évaluation."""
total = total_elements * eligible_students_count
completed = grades_count
if total == 0:
percentage = 0
status = "not_started"
else:
percentage = round((completed / total) * 100)
if percentage == 0:
status = "not_started"
elif percentage == 100:
status = "completed"
else:
status = "in_progress"
return {
"percentage": percentage,
"completed": completed,
"total": total,
"status": status,
"students_count": eligible_students_count
}
@router.get("", response_model=AssessmentList)
async def get_assessments(
session: AsyncSessionDep,
class_id: Optional[int] = Query(None, description="Filtrer par classe"),
trimester: Optional[int] = Query(None, ge=1, le=3, description="Filtrer par trimestre"),
sort: Optional[str] = Query("date_desc", description="Tri: date_desc, date_asc, title"),
):
"""
Récupère la liste de toutes les évaluations.
"""
query = (
select(Assessment)
.options(
selectinload(Assessment.class_group),
selectinload(Assessment.exercises).selectinload(Exercise.grading_elements)
)
)
if class_id:
query = query.where(Assessment.class_group_id == class_id)
if trimester:
query = query.where(Assessment.trimester == trimester)
# Tri
if sort == "date_asc":
query = query.order_by(Assessment.date.asc())
elif sort == "title":
query = query.order_by(Assessment.title)
else: # date_desc par défaut
query = query.order_by(Assessment.date.desc())
result = await session.execute(query)
assessments = result.scalars().all()
assessments_list = []
for assessment in assessments:
# Calculer le total des points
total_max_points = 0
total_elements = 0
for exercise in assessment.exercises:
for element in exercise.grading_elements:
total_max_points += element.max_points
total_elements += 1
# Compter les élèves éligibles (inscrits à la date de l'évaluation)
eligible_query = select(func.count(StudentEnrollment.id)).where(
StudentEnrollment.class_group_id == assessment.class_group_id,
StudentEnrollment.enrollment_date <= assessment.date,
(StudentEnrollment.departure_date.is_(None) |
(StudentEnrollment.departure_date >= assessment.date))
)
eligible_result = await session.execute(eligible_query)
eligible_students_count = eligible_result.scalar() or 0
# Compter les notes saisies uniquement pour les élèves éligibles
eligible_student_ids = select(StudentEnrollment.student_id).where(
StudentEnrollment.class_group_id == assessment.class_group_id,
StudentEnrollment.enrollment_date <= assessment.date,
(StudentEnrollment.departure_date.is_(None) |
(StudentEnrollment.departure_date >= assessment.date))
)
grades_query = select(func.count(Grade.id)).where(
Grade.grading_element_id.in_(
select(GradingElement.id)
.join(Exercise)
.where(Exercise.assessment_id == assessment.id)
),
Grade.student_id.in_(eligible_student_ids),
Grade.value.isnot(None)
)
grades_result = await session.execute(grades_query)
grades_count = grades_result.scalar() or 0
progress = calculate_grading_progress(
assessment, grades_count, total_elements, eligible_students_count
)
assessments_list.append(
AssessmentWithProgress(
id=assessment.id,
title=assessment.title,
description=assessment.description,
date=assessment.date,
trimester=assessment.trimester,
coefficient=assessment.coefficient,
class_group_id=assessment.class_group_id,
class_name=assessment.class_group.name if assessment.class_group else None,
grading_progress=progress,
total_max_points=total_max_points
)
)
return AssessmentList(
assessments=assessments_list,
total=len(assessments_list)
)
@router.get("/{assessment_id}", response_model=AssessmentDetail)
async def get_assessment(
assessment_id: int,
session: AsyncSessionDep,
):
"""
Récupère les détails d'une évaluation avec ses exercices et éléments de notation.
"""
query = (
select(Assessment)
.options(
selectinload(Assessment.class_group),
selectinload(Assessment.exercises)
.selectinload(Exercise.grading_elements)
.selectinload(GradingElement.domain)
)
.where(Assessment.id == assessment_id)
)
result = await session.execute(query)
assessment = result.scalar_one_or_none()
if not assessment:
raise HTTPException(status_code=404, detail="Évaluation non trouvée")
# Construire la liste des exercices
exercises_list = []
total_max_points = 0
total_elements = 0
for exercise in sorted(assessment.exercises, key=lambda e: e.order):
elements_list = []
for element in exercise.grading_elements:
total_max_points += element.max_points
total_elements += 1
elements_list.append(
GradingElementRead(
id=element.id,
exercise_id=element.exercise_id,
label=element.label,
description=element.description,
skill=element.skill,
max_points=element.max_points,
grading_type=element.grading_type,
domain_id=element.domain_id,
domain_name=element.domain.name if element.domain else None,
domain_color=element.domain.color if element.domain else None
)
)
exercises_list.append(
ExerciseRead(
id=exercise.id,
assessment_id=exercise.assessment_id,
title=exercise.title,
description=exercise.description,
order=exercise.order,
grading_elements=elements_list
)
)
# Calculer la progression
eligible_query = select(func.count(StudentEnrollment.id)).where(
StudentEnrollment.class_group_id == assessment.class_group_id,
StudentEnrollment.enrollment_date <= assessment.date,
(StudentEnrollment.departure_date.is_(None) |
(StudentEnrollment.departure_date >= assessment.date))
)
eligible_result = await session.execute(eligible_query)
eligible_students_count = eligible_result.scalar() or 0
# Compter les notes uniquement pour les élèves éligibles
eligible_student_ids = select(StudentEnrollment.student_id).where(
StudentEnrollment.class_group_id == assessment.class_group_id,
StudentEnrollment.enrollment_date <= assessment.date,
(StudentEnrollment.departure_date.is_(None) |
(StudentEnrollment.departure_date >= assessment.date))
)
grades_query = select(func.count(Grade.id)).where(
Grade.grading_element_id.in_(
select(GradingElement.id)
.join(Exercise)
.where(Exercise.assessment_id == assessment.id)
),
Grade.student_id.in_(eligible_student_ids),
Grade.value.isnot(None)
)
grades_result = await session.execute(grades_query)
grades_count = grades_result.scalar() or 0
progress = calculate_grading_progress(
assessment, grades_count, total_elements, eligible_students_count
)
return AssessmentDetail(
id=assessment.id,
title=assessment.title,
description=assessment.description,
date=assessment.date,
trimester=assessment.trimester,
coefficient=assessment.coefficient,
class_group_id=assessment.class_group_id,
class_name=assessment.class_group.name if assessment.class_group else None,
grading_progress=progress,
total_max_points=total_max_points,
exercises=exercises_list
)
@router.get("/{assessment_id}/results", response_model=AssessmentResults)
async def get_assessment_results(
assessment_id: int,
session: AsyncSessionDep,
):
"""
Récupère les résultats d'une évaluation avec statistiques et scores par élève.
Utilise les services domain pour les calculs de scores et statistiques.
"""
# Initialiser les services domain
grading_calc = GradingCalculator()
stats_service = StatisticsService()
# Récupérer l'évaluation avec ses éléments et domaines
query = (
select(Assessment)
.options(
selectinload(Assessment.class_group),
selectinload(Assessment.exercises)
.selectinload(Exercise.grading_elements)
.selectinload(GradingElement.grades),
selectinload(Assessment.exercises)
.selectinload(Exercise.grading_elements)
.selectinload(GradingElement.domain)
)
.where(Assessment.id == assessment_id)
)
result = await session.execute(query)
assessment = result.scalar_one_or_none()
if not assessment:
raise HTTPException(status_code=404, detail="Évaluation non trouvée")
# Récupérer les élèves éligibles
eligible_query = (
select(StudentEnrollment)
.options(selectinload(StudentEnrollment.student))
.where(
StudentEnrollment.class_group_id == assessment.class_group_id,
StudentEnrollment.enrollment_date <= assessment.date,
(StudentEnrollment.departure_date.is_(None) |
(StudentEnrollment.departure_date >= assessment.date))
)
)
eligible_result = await session.execute(eligible_query)
enrollments = eligible_result.scalars().all()
# Calculer le total des points maximum
total_max_points = 0
for exercise in assessment.exercises:
for element in exercise.grading_elements:
total_max_points += element.max_points
# Calculer les scores par élève en utilisant GradingCalculator
student_scores = {}
for enrollment in enrollments:
student = enrollment.student
student_id = student.id
total_score = 0.0
counted_max = 0.0
exercise_scores = {}
has_any_grade = False # Tracker si l'élève a au moins une note
for exercise in assessment.exercises:
ex_score = 0.0
ex_max = 0.0
for element in exercise.grading_elements:
# Trouver la note de l'élève pour cet élément
grade = None
for g in element.grades:
if g.student_id == student_id:
grade = g
break
if grade and grade.value:
value = grade.value.strip()
if value: # La valeur n'est pas vide
has_any_grade = True
# Utiliser GradingCalculator pour le calcul
calculated_score = grading_calc.calculate_score(
value, element.grading_type, element.max_points
)
# Vérifier si compte dans le total
if grading_calc.is_counted_in_total(value):
if calculated_score is not None: # Pas dispensé
ex_score += calculated_score
ex_max += element.max_points
exercise_scores[exercise.id] = {
"score": round(ex_score, 2),
"max": ex_max
}
total_score += ex_score
counted_max += ex_max
percentage = round((total_score / counted_max * 100) if counted_max > 0 else 0, 1)
student_scores[student_id] = StudentScore(
student_id=student_id,
student_name=f"{student.last_name} {student.first_name}",
email=student.email, # Ajouter l'email de l'élève
total_score=round(total_score, 2),
total_max_points=counted_max,
percentage=percentage,
exercise_scores=exercise_scores,
has_grades=has_any_grade
)
# Utiliser StatisticsService pour les calculs statistiques
# Inclure uniquement les élèves qui ont des notes
scores = [s.total_score for s in student_scores.values() if s.total_max_points > 0 and s.has_grades]
stats_result = stats_service.calculate_statistics(scores)
stats = AssessmentStatistics(
count=stats_result.count,
mean=stats_result.mean,
median=stats_result.median,
min=stats_result.min,
max=stats_result.max,
std_dev=stats_result.std_dev
)
# Utiliser StatisticsService pour l'histogramme
histogram_data = stats_service.create_simple_histogram(scores, total_max_points)
# Trier les scores par nom
sorted_scores = sorted(
student_scores.values(),
key=lambda s: s.student_name.lower()
)
# Calculer les heatmaps par compétence et domaine
competences_heatmap = None
domains_heatmap = None
# Collecter les compétences et domaines uniques
all_competences = set()
all_domains = {} # name -> color
for exercise in assessment.exercises:
for element in exercise.grading_elements:
if element.skill:
all_competences.add(element.skill)
if element.domain:
all_domains[element.domain.name] = element.domain.color
# Calculer heatmap des compétences si présentes
if all_competences:
competences_cells = []
for enrollment in enrollments:
student = enrollment.student
student_name = f"{student.last_name} {student.first_name}"
# Calculer score par compétence pour cet élève
competence_scores = {}
for comp in all_competences:
competence_scores[comp] = {"score": 0.0, "max": 0.0}
for exercise in assessment.exercises:
for element in exercise.grading_elements:
if element.skill and element.skill in competence_scores:
# Trouver la note
for g in element.grades:
if g.student_id == student.id and g.value:
calc_score = grading_calc.calculate_score(
g.value.strip(), element.grading_type, element.max_points
)
if calc_score is not None:
competence_scores[element.skill]["score"] += calc_score
competence_scores[element.skill]["max"] += element.max_points
break
# Créer les cellules
for comp, data in competence_scores.items():
if data["max"] > 0:
pct = round((data["score"] / data["max"]) * 100, 1)
competences_cells.append(HeatmapCell(
student_id=student.id,
student_name=student_name,
item_name=comp,
score=round(data["score"], 2),
max_points=data["max"],
percentage=pct
))
competences_heatmap = HeatmapData(
items=sorted(list(all_competences)),
students=[s.student_name for s in sorted_scores],
cells=competences_cells
)
# Calculer heatmap des domaines si présents
if all_domains:
domains_cells = []
for enrollment in enrollments:
student = enrollment.student
student_name = f"{student.last_name} {student.first_name}"
# Calculer score par domaine pour cet élève
domain_scores = {}
for dom in all_domains:
domain_scores[dom] = {"score": 0.0, "max": 0.0}
for exercise in assessment.exercises:
for element in exercise.grading_elements:
if element.domain and element.domain.name in domain_scores:
# Trouver la note
for g in element.grades:
if g.student_id == student.id and g.value:
calc_score = grading_calc.calculate_score(
g.value.strip(), element.grading_type, element.max_points
)
if calc_score is not None:
domain_scores[element.domain.name]["score"] += calc_score
domain_scores[element.domain.name]["max"] += element.max_points
break
# Créer les cellules
for dom, data in domain_scores.items():
if data["max"] > 0:
pct = round((data["score"] / data["max"]) * 100, 1)
domains_cells.append(HeatmapCell(
student_id=student.id,
student_name=student_name,
item_name=dom,
score=round(data["score"], 2),
max_points=data["max"],
percentage=pct,
color=all_domains.get(dom)
))
domains_heatmap = HeatmapData(
items=sorted(list(all_domains.keys())),
students=[s.student_name for s in sorted_scores],
cells=domains_cells
)
return AssessmentResults(
assessment_id=assessment.id,
assessment_title=assessment.title,
statistics=stats,
students_scores=sorted_scores,
histogram_data=histogram_data,
competences_heatmap=competences_heatmap,
domains_heatmap=domains_heatmap
)
@router.post("", response_model=AssessmentDetail, status_code=201)
async def create_assessment(
assessment_data: AssessmentCreate,
session: AsyncSessionDep,
):
"""
Crée une évaluation complète avec ses exercices et éléments de notation.
C'est une création unifiée qui permet de créer en une seule requête:
- L'évaluation
- Ses exercices
- Les éléments de notation de chaque exercice
"""
# Vérifier que la classe existe
class_query = select(ClassGroup).where(ClassGroup.id == assessment_data.class_group_id)
class_result = await session.execute(class_query)
class_group = class_result.scalar_one_or_none()
if not class_group:
raise HTTPException(status_code=404, detail="Classe non trouvée")
# Créer l'évaluation
assessment = Assessment(
title=assessment_data.title,
description=assessment_data.description,
date=assessment_data.date,
trimester=assessment_data.trimester,
coefficient=assessment_data.coefficient,
class_group_id=assessment_data.class_group_id
)
session.add(assessment)
await session.flush() # Pour obtenir l'ID
# Créer les exercices et leurs éléments
exercises_list = []
total_max_points = 0
total_elements = 0
for ex_idx, ex_data in enumerate(assessment_data.exercises):
exercise = Exercise(
assessment_id=assessment.id,
title=ex_data.title,
description=ex_data.description,
order=ex_data.order if ex_data.order else ex_idx + 1
)
session.add(exercise)
await session.flush()
elements_list = []
for elem_data in ex_data.grading_elements:
element = GradingElement(
exercise_id=exercise.id,
label=elem_data.label,
description=elem_data.description,
skill=elem_data.skill,
max_points=elem_data.max_points,
grading_type=elem_data.grading_type,
domain_id=elem_data.domain_id
)
session.add(element)
total_max_points += elem_data.max_points
total_elements += 1
elements_list.append(
GradingElementRead(
id=0, # Sera mis à jour après flush
exercise_id=exercise.id,
label=elem_data.label,
description=elem_data.description,
skill=elem_data.skill,
max_points=elem_data.max_points,
grading_type=elem_data.grading_type,
domain_id=elem_data.domain_id,
domain_name=None,
domain_color=None
)
)
exercises_list.append(
ExerciseRead(
id=exercise.id,
assessment_id=assessment.id,
title=exercise.title,
description=exercise.description,
order=exercise.order,
grading_elements=elements_list
)
)
await session.commit()
# Calculer la progression (0% car nouvelle évaluation)
progress = {
"percentage": 0,
"completed": 0,
"total": 0,
"status": "not_started",
"students_count": 0
}
return AssessmentDetail(
id=assessment.id,
title=assessment.title,
description=assessment.description,
date=assessment.date,
trimester=assessment.trimester,
coefficient=assessment.coefficient,
class_group_id=assessment.class_group_id,
class_name=class_group.name,
grading_progress=progress,
total_max_points=total_max_points,
exercises=exercises_list
)
@router.put("/{assessment_id}", response_model=AssessmentDetail)
async def update_assessment(
assessment_id: int,
assessment_data: AssessmentUpdate,
session: AsyncSessionDep,
):
"""
Modifie une évaluation existante.
Si des exercices sont fournis, ils remplacent entièrement les exercices existants.
"""
# Récupérer l'évaluation avec ses exercices
query = (
select(Assessment)
.options(
selectinload(Assessment.class_group),
selectinload(Assessment.exercises).selectinload(Exercise.grading_elements)
)
.where(Assessment.id == assessment_id)
)
result = await session.execute(query)
assessment = result.scalar_one_or_none()
if not assessment:
raise HTTPException(status_code=404, detail="Évaluation non trouvée")
# Mettre à jour les champs de base
if assessment_data.title is not None:
assessment.title = assessment_data.title
if assessment_data.description is not None:
assessment.description = assessment_data.description
if assessment_data.date is not None:
# Convertir string en date si nécessaire
if isinstance(assessment_data.date, str):
from datetime import date as date_type
assessment.date = date_type.fromisoformat(assessment_data.date)
else:
assessment.date = assessment_data.date
if assessment_data.trimester is not None:
assessment.trimester = assessment_data.trimester
if assessment_data.coefficient is not None:
assessment.coefficient = assessment_data.coefficient
# Si des exercices sont fournis, mise à jour intelligente
if assessment_data.exercises is not None:
# Collecter les IDs des exercices et éléments à conserver
new_exercise_ids = {ex.id for ex in assessment_data.exercises if ex.id}
new_element_ids = set()
for ex in assessment_data.exercises:
for el in ex.grading_elements:
if el.id:
new_element_ids.add(el.id)
# Supprimer les exercices qui ne sont plus dans la liste
for exercise in assessment.exercises:
if exercise.id not in new_exercise_ids:
await session.delete(exercise)
else:
# Supprimer les éléments qui ne sont plus dans la liste
for element in exercise.grading_elements:
if element.id not in new_element_ids:
await session.delete(element)
await session.flush()
# Mettre à jour ou créer les exercices
for ex_idx, ex_data in enumerate(assessment_data.exercises):
if ex_data.id:
# Mise à jour d'un exercice existant
ex_query = select(Exercise).where(Exercise.id == ex_data.id)
ex_result = await session.execute(ex_query)
exercise = ex_result.scalar_one_or_none()
if exercise:
exercise.title = ex_data.title
exercise.description = ex_data.description
exercise.order = ex_data.order if ex_data.order else ex_idx + 1
else:
# Création d'un nouvel exercice
exercise = Exercise(
assessment_id=assessment.id,
title=ex_data.title,
description=ex_data.description,
order=ex_data.order if ex_data.order else ex_idx + 1
)
session.add(exercise)
await session.flush()
# Mettre à jour ou créer les éléments de notation
for elem_data in ex_data.grading_elements:
if elem_data.id:
# Mise à jour d'un élément existant (préserve les notes)
el_query = select(GradingElement).where(GradingElement.id == elem_data.id)
el_result = await session.execute(el_query)
element = el_result.scalar_one_or_none()
if element:
element.label = elem_data.label
element.description = elem_data.description
element.skill = elem_data.skill
element.max_points = elem_data.max_points
element.grading_type = elem_data.grading_type
element.domain_id = elem_data.domain_id
else:
# Création d'un nouvel élément
element = GradingElement(
exercise_id=exercise.id,
label=elem_data.label,
description=elem_data.description,
skill=elem_data.skill,
max_points=elem_data.max_points,
grading_type=elem_data.grading_type,
domain_id=elem_data.domain_id
)
session.add(element)
await session.commit()
# Recharger l'évaluation avec les nouvelles données
query = (
select(Assessment)
.options(
selectinload(Assessment.class_group),
selectinload(Assessment.exercises).selectinload(Exercise.grading_elements)
)
.where(Assessment.id == assessment_id)
)
result = await session.execute(query)
assessment = result.scalar_one_or_none()
# Vérification de sécurité (ne devrait jamais arriver après commit)
if not assessment:
raise HTTPException(status_code=500, detail="Erreur lors de la mise à jour")
# Construire la réponse
exercises_list = []
total_max_points = 0
total_elements = 0
for exercise in sorted(assessment.exercises, key=lambda e: e.order):
elements_list = []
for element in exercise.grading_elements:
total_max_points += element.max_points
total_elements += 1
elements_list.append(
GradingElementRead(
id=element.id,
exercise_id=element.exercise_id,
label=element.label,
description=element.description,
skill=element.skill,
max_points=element.max_points,
grading_type=element.grading_type,
domain_id=element.domain_id,
domain_name=None,
domain_color=None
)
)
exercises_list.append(
ExerciseRead(
id=exercise.id,
assessment_id=exercise.assessment_id,
title=exercise.title,
description=exercise.description,
order=exercise.order,
grading_elements=elements_list
)
)
# Calculer la progression
eligible_query = select(func.count(StudentEnrollment.id)).where(
StudentEnrollment.class_group_id == assessment.class_group_id,
StudentEnrollment.enrollment_date <= assessment.date,
(StudentEnrollment.departure_date.is_(None) |
(StudentEnrollment.departure_date >= assessment.date))
)
eligible_result = await session.execute(eligible_query)
eligible_students_count = eligible_result.scalar() or 0
# Compter les notes uniquement pour les élèves éligibles
eligible_student_ids = select(StudentEnrollment.student_id).where(
StudentEnrollment.class_group_id == assessment.class_group_id,
StudentEnrollment.enrollment_date <= assessment.date,
(StudentEnrollment.departure_date.is_(None) |
(StudentEnrollment.departure_date >= assessment.date))
)
grades_query = select(func.count(Grade.id)).where(
Grade.grading_element_id.in_(
select(GradingElement.id)
.join(Exercise)
.where(Exercise.assessment_id == assessment.id)
),
Grade.student_id.in_(eligible_student_ids),
Grade.value.isnot(None)
)
grades_result = await session.execute(grades_query)
grades_count = grades_result.scalar() or 0
progress = calculate_grading_progress(
assessment, grades_count, total_elements, eligible_students_count
)
return AssessmentDetail(
id=assessment.id,
title=assessment.title,
description=assessment.description,
date=assessment.date,
trimester=assessment.trimester,
coefficient=assessment.coefficient,
class_group_id=assessment.class_group_id,
class_name=assessment.class_group.name if assessment.class_group else None,
grading_progress=progress,
total_max_points=total_max_points,
exercises=exercises_list
)
@router.delete("/{assessment_id}", status_code=204)
async def delete_assessment(
assessment_id: int,
session: AsyncSessionDep,
):
"""
Supprime une évaluation avec tous ses exercices, éléments et notes.
"""
# Récupérer l'évaluation
query = select(Assessment).where(Assessment.id == assessment_id)
result = await session.execute(query)
assessment = result.scalar_one_or_none()
if not assessment:
raise HTTPException(status_code=404, detail="Évaluation non trouvée")
# Supprimer l'évaluation (cascade supprimera exercices, éléments et notes)
await session.delete(assessment)
await session.commit()
return None
@router.get("/{assessment_id}/grades", response_model=List[GradeRead])
async def get_grades(
assessment_id: int,
session: AsyncSessionDep,
):
"""
Récupère toutes les notes d'une évaluation.
Retourne la liste des notes avec student_id, element_id et value.
"""
# Vérifier que l'évaluation existe
assessment_query = select(Assessment).where(Assessment.id == assessment_id)
assessment_result = await session.execute(assessment_query)
assessment = assessment_result.scalar_one_or_none()
if not assessment:
raise HTTPException(status_code=404, detail="Évaluation non trouvée")
# Récupérer toutes les notes pour les éléments de cette évaluation
grades_query = (
select(Grade)
.join(GradingElement)
.join(Exercise)
.where(Exercise.assessment_id == assessment_id)
)
grades_result = await session.execute(grades_query)
grades = grades_result.scalars().all()
# Convertir en format attendu par le frontend
return [
GradeRead(
id=grade.id,
student_id=grade.student_id,
grading_element_id=grade.grading_element_id,
value=grade.value,
comment=grade.comment
)
for grade in grades
]
@router.post("/{assessment_id}/grades", response_model=BulkGradeResponse)
async def save_grades(
assessment_id: int,
grades_data: BulkGradeCreate,
session: AsyncSessionDep,
):
"""
Sauvegarde des notes pour une évaluation.
Effectue un upsert : crée la note si elle n'existe pas, la met à jour sinon.
"""
# Vérifier que l'évaluation existe
assessment_query = select(Assessment).where(Assessment.id == assessment_id)
assessment_result = await session.execute(assessment_query)
assessment = assessment_result.scalar_one_or_none()
if not assessment:
raise HTTPException(status_code=404, detail="Évaluation non trouvée")
# Récupérer les IDs des éléments de notation de cette évaluation
elements_query = (
select(GradingElement.id)
.join(Exercise)
.where(Exercise.assessment_id == assessment_id)
)
elements_result = await session.execute(elements_query)
valid_element_ids = {row[0] for row in elements_result.fetchall()}
created_count = 0
updated_count = 0
for grade_data in grades_data.grades:
# Vérifier que l'élément appartient bien à cette évaluation
if grade_data.grading_element_id not in valid_element_ids:
raise HTTPException(
status_code=400,
detail=f"L'élément de notation {grade_data.grading_element_id} "
f"n'appartient pas à cette évaluation"
)
# Chercher une note existante
existing_query = select(Grade).where(
Grade.student_id == grade_data.student_id,
Grade.grading_element_id == grade_data.grading_element_id
)
existing_result = await session.execute(existing_query)
existing_grade = existing_result.scalar_one_or_none()
if existing_grade:
# Mettre à jour
existing_grade.value = grade_data.value
if grade_data.comment is not None:
existing_grade.comment = grade_data.comment
updated_count += 1
else:
# Créer
new_grade = Grade(
student_id=grade_data.student_id,
grading_element_id=grade_data.grading_element_id,
value=grade_data.value,
comment=grade_data.comment
)
session.add(new_grade)
created_count += 1
await session.commit()
return BulkGradeResponse(
saved_count=created_count + updated_count,
updated_count=updated_count,
created_count=created_count,
message=f"{created_count + updated_count} note(s) enregistrée(s) "
f"({created_count} créée(s), {updated_count} mise(s) à jour)"
)
@router.post("/{assessment_id}/send-reports", response_model=SendReportsResponse)
async def send_reports(
assessment_id: int,
request: SendReportsRequest,
session: AsyncSessionDep,
):
"""
Envoie les bilans d'évaluation par email aux élèves sélectionnés.
Cette fonctionnalité nécessite la configuration du service d'email (SMTP).
"""
# Récupérer la configuration SMTP depuis la base de données
config_query = select(AppConfig)
config_result = await session.execute(config_query)
config_items = config_result.scalars().all()
# Convertir en dict
db_config = {item.key: item.value for item in config_items}
# Créer le service d'email
email_service = EmailService.from_database_config(db_config)
if not email_service.is_configured():
return SendReportsResponse(
success=False,
total_sent=0,
total_failed=len(request.student_ids),
results=[],
message="Service d'email non configuré. Configurez SMTP dans les paramètres."
)
# Vérifier que l'évaluation existe avec toutes les relations nécessaires
assessment_query = (
select(Assessment)
.options(
selectinload(Assessment.class_group),
selectinload(Assessment.exercises).selectinload(Exercise.grading_elements).selectinload(GradingElement.grades),
selectinload(Assessment.exercises).selectinload(Exercise.grading_elements).selectinload(GradingElement.domain)
)
.where(Assessment.id == assessment_id)
)
assessment_result = await session.execute(assessment_query)
assessment = assessment_result.scalar_one_or_none()
if not assessment:
raise HTTPException(status_code=404, detail="Évaluation non trouvée")
# Récupérer les élèves demandés
students_query = select(Student).where(Student.id.in_(request.student_ids))
students_result = await session.execute(students_query)
students = students_result.scalars().all()
# Vérifier que tous les élèves existent
found_ids = {s.id for s in students}
missing_ids = set(request.student_ids) - found_ids
if missing_ids:
raise HTTPException(
status_code=400,
detail=f"Élèves non trouvés: {list(missing_ids)}"
)
# Préparer les données pour le service de rapport
assessment_data = {
'title': assessment.title,
'description': assessment.description,
'date': assessment.date.isoformat() if assessment.date else '',
'trimester': assessment.trimester,
'class_name': assessment.class_group.name,
'coefficient': assessment.coefficient
}
# Préparer les données des exercices et éléments
exercises_data = []
all_students_grades = {} # {student_id: [grades]}
for exercise in sorted(assessment.exercises, key=lambda x: x.order):
elements_data = []
for element in exercise.grading_elements:
element_data = {
'id': element.id,
'exercise_id': exercise.id,
'label': element.label,
'description': element.description,
'skill': element.skill,
'domain_name': element.domain.name if element.domain else None,
'domain_color': element.domain.color if element.domain else None,
'grading_type': element.grading_type,
'max_points': element.max_points
}
elements_data.append(element_data)
# Collecter les notes
for grade in element.grades:
if grade.student_id not in all_students_grades:
all_students_grades[grade.student_id] = []
all_students_grades[grade.student_id].append({
'element_id': element.id,
'value': grade.value,
'comment': grade.comment
})
exercises_data.append({
'id': exercise.id,
'title': exercise.title,
'description': exercise.description,
'order': exercise.order,
'elements': elements_data
})
# Créer le service de rapport
report_service = StudentReportService()
# Préparer les résultats
results = []
total_sent = 0
total_failed = 0
for student in students:
student_name = f"{student.first_name} {student.last_name}"
# Vérifier que l'élève a une adresse email
if not student.email:
results.append(SendReportResult(
student_id=student.id,
student_name=student_name,
email=None,
success=False,
error_message="Pas d'adresse email configurée"
))
total_failed += 1
continue
try:
# Générer le rapport
student_data = {
'id': student.id,
'first_name': student.first_name,
'last_name': student.last_name,
'email': student.email
}
report_data = report_service.generate_student_report(
assessment_data,
student_data,
all_students_grades,
exercises_data
)
# Générer le HTML
html_body = generate_report_html(report_data, request.custom_message or "")
# Envoyer l'email
subject = f"Bilan d'évaluation : {assessment.title}"
send_result = email_service.send_email(
to_emails=[student.email],
subject=subject,
html_body=html_body
)
if send_result['success']:
results.append(SendReportResult(
student_id=student.id,
student_name=student_name,
email=student.email,
success=True,
error_message=None
))
total_sent += 1
else:
results.append(SendReportResult(
student_id=student.id,
student_name=student_name,
email=student.email,
success=False,
error_message=send_result.get('error', 'Erreur inconnue')
))
total_failed += 1
except Exception as e:
results.append(SendReportResult(
student_id=student.id,
student_name=student_name,
email=student.email,
success=False,
error_message=str(e)
))
total_failed += 1
# Message de résultat
if total_sent > 0 and total_failed == 0:
message = f"{total_sent} bilan(s) envoyé(s) avec succès."
elif total_sent > 0:
message = f"{total_sent} bilan(s) envoyé(s), {total_failed} échec(s)."
else:
message = f"Échec de l'envoi de {total_failed} bilan(s)."
return SendReportsResponse(
success=total_sent > 0,
total_sent=total_sent,
total_failed=total_failed,
results=results,
message=message
)
@router.get("/{assessment_id}/preview-report/{student_id}", response_class=HTMLResponse)
async def preview_report(
assessment_id: int,
student_id: int,
session: AsyncSessionDep,
):
"""
Génère un aperçu HTML du bilan d'un élève sans l'envoyer.
Utile pour vérifier le rendu du bilan avant l'envoi groupé.
Retourne le HTML brut qui serait envoyé par email, directement affichable dans le navigateur.
"""
# Récupérer l'évaluation avec toutes les relations nécessaires
assessment_query = (
select(Assessment)
.options(
selectinload(Assessment.class_group),
selectinload(Assessment.exercises).selectinload(Exercise.grading_elements).selectinload(GradingElement.grades),
selectinload(Assessment.exercises).selectinload(Exercise.grading_elements).selectinload(GradingElement.domain)
)
.where(Assessment.id == assessment_id)
)
assessment_result = await session.execute(assessment_query)
assessment = assessment_result.scalar_one_or_none()
if not assessment:
raise HTTPException(status_code=404, detail="Évaluation non trouvée")
# Récupérer l'élève
student_query = select(Student).where(Student.id == student_id)
student_result = await session.execute(student_query)
student = student_result.scalar_one_or_none()
if not student:
raise HTTPException(status_code=404, detail="Élève non trouvé")
# Préparer les données pour le service de rapport (même logique que send_reports)
assessment_data = {
'title': assessment.title,
'description': assessment.description,
'date': assessment.date.isoformat() if assessment.date else '',
'trimester': assessment.trimester,
'class_name': assessment.class_group.name,
'coefficient': assessment.coefficient
}
# Préparer les données des exercices et éléments
exercises_data = []
all_students_grades = {} # {student_id: [grades]}
for exercise in sorted(assessment.exercises, key=lambda x: x.order):
elements_data = []
for element in exercise.grading_elements:
element_data = {
'id': element.id,
'exercise_id': exercise.id,
'label': element.label,
'description': element.description,
'skill': element.skill,
'domain_name': element.domain.name if element.domain else None,
'domain_color': element.domain.color if element.domain else None,
'grading_type': element.grading_type,
'max_points': element.max_points
}
elements_data.append(element_data)
# Collecter les notes
for grade in element.grades:
if grade.student_id not in all_students_grades:
all_students_grades[grade.student_id] = []
all_students_grades[grade.student_id].append({
'element_id': element.id,
'value': grade.value,
'comment': grade.comment
})
exercises_data.append({
'id': exercise.id,
'title': exercise.title,
'description': exercise.description,
'order': exercise.order,
'elements': elements_data
})
# Vérifier que l'élève a des notes pour cette évaluation
if student_id not in all_students_grades:
raise HTTPException(
status_code=400,
detail=f"L'élève {student.first_name} {student.last_name} n'a pas de notes pour cette évaluation"
)
# Créer le service de rapport
report_service = StudentReportService()
# Préparer les données de l'élève
student_data = {
'id': student.id,
'first_name': student.first_name,
'last_name': student.last_name,
'email': student.email
}
try:
# Générer le rapport
report_data = report_service.generate_student_report(
assessment_data,
student_data,
all_students_grades,
exercises_data
)
# Générer le HTML (sans message personnalisé pour la prévisualisation)
html_body = generate_report_html(report_data, "")
# Retourner directement le HTML pour affichage dans le navigateur
return HTMLResponse(content=html_body)
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Erreur lors de la génération du rapport: {str(e)}"
)