Files
notytex/backend/api/routes/classes.py
Bertrand Benjamin a0ab7224e1 refactor: extract duplicated patterns into shared helpers
Backend: create api/helpers.py with eligible_enrollment_filter,
count_eligible_students, get_active_enrollment, ensure_unique_name,
upsert_app_configs, and build_heatmap. Add full_name properties to
Student model. Apply across all route files (-481/+184 lines).

Frontend: create stores/helpers.js with withLoading composable,
apply to assessments and classes Pinia stores.

96/96 tests pass.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-19 14:05:10 +01:00

810 lines
27 KiB
Python

"""
Routes API pour les classes (ClassGroup).
"""
import csv
from io import StringIO
from typing import Optional, List, Tuple
from datetime import date
from fastapi import APIRouter, HTTPException, Query, UploadFile, File, Form
from sqlalchemy import select, func
from sqlalchemy.orm import selectinload
from api.dependencies import AsyncSessionDep
from api.helpers import ensure_unique_name
from infrastructure.database.models import (
ClassGroup,
Student,
StudentEnrollment,
Assessment,
Exercise,
GradingElement,
Grade,
Domain,
Competence,
)
from schemas.class_group import (
ClassGroupRead,
ClassGroupDetail,
ClassGroupList,
ClassGroupCreate,
ClassGroupUpdate,
ClassGroupResponse,
ClassDashboardStats,
StudentAverage,
HistogramBin,
DomainStats,
CompetenceStats,
AssessmentScore,
DomainStudentStats,
CompetenceStudentStats,
)
from domain.services.grading_calculator import GradingCalculator
from domain.services.class_statistics_service import ClassStatisticsService
from schemas.student import StudentWithClass, StudentList, StudentWithEnrollmentInfo, StudentEnrollmentList
from schemas.csv_import import (
CSVImportResponse,
ImportedStudentInfo,
SkippedStudentInfo,
ImportErrorInfo,
)
router = APIRouter(prefix="/classes", tags=["Classes"])
@router.get("", response_model=ClassGroupList)
async def get_classes(
session: AsyncSessionDep,
year: Optional[str] = Query(None, description="Filtrer par année scolaire"),
):
"""
Récupère la liste de toutes les classes.
"""
query = select(ClassGroup)
if year:
query = query.where(ClassGroup.year == year)
query = query.order_by(ClassGroup.year.desc(), ClassGroup.name)
result = await session.execute(query)
classes = result.scalars().all()
# Calculer le nombre d'étudiants et d'évaluations pour chaque classe
classes_with_count = []
for cls in classes:
# Compter les étudiants actuellement inscrits
students_count_query = select(func.count(StudentEnrollment.id)).where(
StudentEnrollment.class_group_id == cls.id,
StudentEnrollment.departure_date.is_(None)
)
students_count_result = await session.execute(students_count_query)
students_count = students_count_result.scalar() or 0
# Compter les évaluations de cette classe
assessments_count_query = select(func.count(Assessment.id)).where(
Assessment.class_group_id == cls.id
)
assessments_count_result = await session.execute(assessments_count_query)
assessments_count = assessments_count_result.scalar() or 0
classes_with_count.append(
ClassGroupRead(
id=cls.id,
name=cls.name,
description=cls.description,
year=cls.year,
students_count=students_count,
assessments_count=assessments_count
)
)
return ClassGroupList(
classes=classes_with_count,
total=len(classes_with_count)
)
@router.get("/{class_id}", response_model=ClassGroupDetail)
async def get_class(
class_id: int,
session: AsyncSessionDep,
):
"""
Récupère les détails d'une classe spécifique.
"""
query = select(ClassGroup).where(ClassGroup.id == class_id)
result = await session.execute(query)
cls = result.scalar_one_or_none()
if not cls:
raise HTTPException(status_code=404, detail="Classe non trouvée")
# Compter les étudiants
count_query = select(func.count(StudentEnrollment.id)).where(
StudentEnrollment.class_group_id == cls.id,
StudentEnrollment.departure_date.is_(None)
)
count_result = await session.execute(count_query)
students_count = count_result.scalar() or 0
# Compter les évaluations
assessments_query = select(func.count(Assessment.id)).where(
Assessment.class_group_id == cls.id
)
assessments_result = await session.execute(assessments_query)
assessments_count = assessments_result.scalar() or 0
return ClassGroupDetail(
id=cls.id,
name=cls.name,
description=cls.description,
year=cls.year,
students_count=students_count,
assessments_count=assessments_count
)
@router.get("/{class_id}/students", response_model=StudentEnrollmentList)
async def get_class_students(
class_id: int,
session: AsyncSessionDep,
include_departed: bool = Query(False, description="Inclure les élèves partis"),
at_date: Optional[str] = Query(None, description="Filtrer les élèves inscrits à cette date (YYYY-MM-DD)"),
):
"""
Récupère la liste des étudiants d'une classe avec leurs informations d'inscription.
Si at_date est fourni, retourne uniquement les élèves qui étaient inscrits à cette date.
"""
# Vérifier que la classe existe
class_query = select(ClassGroup).where(ClassGroup.id == class_id)
class_result = await session.execute(class_query)
cls = class_result.scalar_one_or_none()
if not cls:
raise HTTPException(status_code=404, detail="Classe non trouvée")
# Récupérer les inscriptions
query = (
select(StudentEnrollment)
.options(selectinload(StudentEnrollment.student))
.where(StudentEnrollment.class_group_id == class_id)
)
# Filtrer par date si spécifié
if at_date:
from datetime import date as date_type
try:
filter_date = date_type.fromisoformat(at_date)
except ValueError:
raise HTTPException(status_code=400, detail="Format de date invalide (YYYY-MM-DD)")
query = query.where(
StudentEnrollment.enrollment_date <= filter_date,
(StudentEnrollment.departure_date.is_(None) |
(StudentEnrollment.departure_date >= filter_date))
)
elif not include_departed:
query = query.where(StudentEnrollment.departure_date.is_(None))
result = await session.execute(query)
enrollments = result.scalars().all()
students = []
for enrollment in enrollments:
student = enrollment.student
is_active = enrollment.departure_date is None
students.append(
StudentWithEnrollmentInfo(
id=student.id,
last_name=student.last_name,
first_name=student.first_name,
email=student.email,
full_name=student.full_name,
current_class_id=class_id if is_active else None,
current_class_name=cls.name if is_active else None,
enrollment_id=enrollment.id,
enrollment_date=enrollment.enrollment_date,
departure_date=enrollment.departure_date,
enrollment_reason=enrollment.enrollment_reason,
departure_reason=enrollment.departure_reason,
is_active=is_active
)
)
# Trier par nom de famille puis prénom
students.sort(key=lambda s: (s.last_name.lower(), s.first_name.lower()))
return StudentEnrollmentList(
students=students,
total=len(students)
)
@router.get("/{class_id}/stats", response_model=ClassDashboardStats)
async def get_class_stats(
class_id: int,
session: AsyncSessionDep,
trimester: Optional[int] = Query(None, description="Trimestre (1, 2, 3) ou None pour vision annuelle"),
):
"""
Récupère les statistiques complètes d'une classe pour un trimestre ou toute l'année.
Inclut:
- Moyennes par élève avec détail par évaluation
- Statistiques globales (moyenne, médiane, écart-type)
- Histogramme des moyennes
- Analyse par domaines et compétences (nombre d'évaluations + points)
Args:
class_id: ID de la classe
trimester: Trimestre spécifique (1, 2, 3) ou None pour vision annuelle (toutes évaluations)
"""
# Vérifier que la classe existe
class_query = select(ClassGroup).where(ClassGroup.id == class_id)
class_result = await session.execute(class_query)
cls = class_result.scalar_one_or_none()
if not cls:
raise HTTPException(status_code=404, detail="Classe non trouvée")
# Récupérer les élèves actuellement inscrits
students_query = (
select(Student)
.join(StudentEnrollment, Student.id == StudentEnrollment.student_id)
.where(
StudentEnrollment.class_group_id == class_id,
StudentEnrollment.departure_date.is_(None)
)
.order_by(Student.last_name, Student.first_name)
)
students_result = await session.execute(students_query)
students = students_result.scalars().all()
# Récupérer les évaluations (trimestre spécifique ou toutes)
assessments_query = (
select(Assessment)
.options(
selectinload(Assessment.exercises).selectinload(Exercise.grading_elements)
)
.where(Assessment.class_group_id == class_id)
)
# Filtrer par trimestre seulement si spécifié
if trimester is not None:
assessments_query = assessments_query.where(Assessment.trimester == trimester)
assessments_query = assessments_query.order_by(Assessment.date)
assessments_result = await session.execute(assessments_query)
assessments = assessments_result.scalars().all()
# Récupérer les domaines et compétences
domains_query = select(Domain).order_by(Domain.name)
domains_result = await session.execute(domains_query)
domains = domains_result.scalars().all()
competences_query = select(Competence).order_by(Competence.order_index)
competences_result = await session.execute(competences_query)
competences = competences_result.scalars().all()
# Récupérer toutes les notes en une seule requête pour optimiser
grades_by_student_assessment = {}
for student in students:
for assessment in assessments:
grades_query = (
select(Grade, GradingElement)
.join(GradingElement, Grade.grading_element_id == GradingElement.id)
.join(Exercise, GradingElement.exercise_id == Exercise.id)
.where(
Exercise.assessment_id == assessment.id,
Grade.student_id == student.id
)
)
grades_result = await session.execute(grades_query)
grades_by_student_assessment[(student.id, assessment.id)] = grades_result.all()
# Utiliser le service pour calculer les statistiques
stats_service = ClassStatisticsService()
student_averages = await stats_service.calculate_student_statistics(
students=students,
assessments=assessments,
grades_by_student_assessment=grades_by_student_assessment,
domains=domains,
competences=competences,
)
# Calculer les statistiques domaines/compétences depuis les éléments de notation
# Perspective enseignant : ce qui a été évalué, pas les résultats des élèves
domains_stats, competences_stats = stats_service.calculate_domain_competence_from_elements(
assessments=assessments,
domains=domains,
competences=competences,
)
# Calculer les statistiques globales
all_averages = [s.average for s in student_averages if s.average is not None]
mean = median = std_dev = min_score = max_score = None
if all_averages:
mean = round(sum(all_averages) / len(all_averages), 2)
sorted_averages = sorted(all_averages)
n = len(sorted_averages)
if n % 2 == 0:
median = (sorted_averages[n // 2 - 1] + sorted_averages[n // 2]) / 2
else:
median = sorted_averages[n // 2]
median = round(median, 2)
min_score = min(all_averages)
max_score = max(all_averages)
# Écart-type
variance = sum((x - mean) ** 2 for x in all_averages) / len(all_averages)
std_dev = round(variance ** 0.5, 2)
# Créer l'histogramme (bins de 2 points)
histogram = []
for i in range(0, 20, 2):
count = sum(1 for avg in all_averages if i <= avg < i + 2)
histogram.append(HistogramBin(
range_start=float(i),
range_end=float(i + 2),
label=f"{i}-{i + 2}",
count=count
))
# Ajouter le dernier bin pour 20
if histogram:
count_20 = sum(1 for avg in all_averages if avg == 20)
if count_20 > 0:
histogram[-1].count += count_20
# Compter les évaluations par statut
assessments_completed = 0
assessments_in_progress = 0
for assessment in assessments:
# Compter les notes pour cette évaluation
grades_count_query = (
select(func.count(Grade.id))
.join(GradingElement, Grade.grading_element_id == GradingElement.id)
.join(Exercise, GradingElement.exercise_id == Exercise.id)
.where(Exercise.assessment_id == assessment.id)
)
grades_count_result = await session.execute(grades_count_query)
grades_count = grades_count_result.scalar() or 0
# Compter le total attendu
elements_count_query = (
select(func.count(GradingElement.id))
.join(Exercise, GradingElement.exercise_id == Exercise.id)
.where(Exercise.assessment_id == assessment.id)
)
elements_count_result = await session.execute(elements_count_query)
elements_count = elements_count_result.scalar() or 0
expected_grades = elements_count * len(students)
if expected_grades > 0:
if grades_count >= expected_grades:
assessments_completed += 1
elif grades_count > 0:
assessments_in_progress += 1
return ClassDashboardStats(
class_id=class_id,
class_name=cls.name,
trimester=trimester,
students_count=len(students),
mean=mean,
median=median,
std_dev=std_dev,
min_score=min_score,
max_score=max_score,
assessments_total=len(assessments),
assessments_completed=assessments_completed,
assessments_in_progress=assessments_in_progress,
student_averages=student_averages,
histogram=histogram,
domains_stats=domains_stats,
competences_stats=competences_stats
)
@router.post("", response_model=ClassGroupRead, status_code=201)
async def create_class(
class_data: ClassGroupCreate,
session: AsyncSessionDep,
):
"""
Crée une nouvelle classe.
"""
# Vérifier l'unicité du nom
await ensure_unique_name(session, ClassGroup, class_data.name, entity_label="classe")
# Créer la nouvelle classe
new_class = ClassGroup(
name=class_data.name,
description=class_data.description,
year=class_data.year
)
session.add(new_class)
await session.commit()
await session.refresh(new_class)
return ClassGroupRead(
id=new_class.id,
name=new_class.name,
description=new_class.description,
year=new_class.year,
students_count=0
)
@router.put("/{class_id}", response_model=ClassGroupRead)
async def update_class(
class_id: int,
class_data: ClassGroupUpdate,
session: AsyncSessionDep,
):
"""
Modifie une classe existante.
"""
# Récupérer la classe
query = select(ClassGroup).where(ClassGroup.id == class_id)
result = await session.execute(query)
cls = result.scalar_one_or_none()
if not cls:
raise HTTPException(status_code=404, detail="Classe non trouvée")
# Vérifier l'unicité du nouveau nom si modifié
if class_data.name and class_data.name != cls.name:
await ensure_unique_name(
session, ClassGroup, class_data.name,
exclude_id=class_id, entity_label="classe"
)
# Appliquer les modifications
if class_data.name is not None:
cls.name = class_data.name
if class_data.description is not None:
cls.description = class_data.description
if class_data.year is not None:
cls.year = class_data.year
await session.commit()
await session.refresh(cls)
# Compter les étudiants
count_query = select(func.count(StudentEnrollment.id)).where(
StudentEnrollment.class_group_id == cls.id,
StudentEnrollment.departure_date.is_(None)
)
count_result = await session.execute(count_query)
students_count = count_result.scalar() or 0
return ClassGroupRead(
id=cls.id,
name=cls.name,
description=cls.description,
year=cls.year,
students_count=students_count
)
@router.delete("/{class_id}", status_code=204)
async def delete_class(
class_id: int,
session: AsyncSessionDep,
):
"""
Supprime une classe.
La suppression échoue si la classe contient des élèves ou des évaluations.
"""
# Récupérer la classe
query = select(ClassGroup).where(ClassGroup.id == class_id)
result = await session.execute(query)
cls = result.scalar_one_or_none()
if not cls:
raise HTTPException(status_code=404, detail="Classe non trouvée")
# Vérifier les dépendances : élèves inscrits
students_query = select(func.count(StudentEnrollment.id)).where(
StudentEnrollment.class_group_id == class_id
)
students_result = await session.execute(students_query)
students_count = students_result.scalar() or 0
# Vérifier les dépendances : évaluations
assessments_query = select(func.count(Assessment.id)).where(
Assessment.class_group_id == class_id
)
assessments_result = await session.execute(assessments_query)
assessments_count = assessments_result.scalar() or 0
if students_count > 0 or assessments_count > 0:
raise HTTPException(
status_code=400,
detail=f"Impossible de supprimer la classe '{cls.name}'. "
f"Elle contient {students_count} inscription(s) et {assessments_count} évaluation(s). "
f"Supprimez d'abord ces éléments."
)
# Supprimer la classe
await session.delete(cls)
await session.commit()
return None
def extract_name_parts(full_name: str) -> Tuple[str, str]:
"""
Extrait le nom et prénom depuis un nom complet.
Stratégie pour "AABIDA LAHDILI Fatima Zahra":
- Les mots en MAJUSCULES → nom de famille
- Les autres mots → prénoms
Args:
full_name: Nom complet de l'élève
Returns:
Tuple (nom_de_famille, prenoms)
"""
if not full_name or not full_name.strip():
return "", ""
# Nettoyer et séparer les mots
parts = full_name.strip().split()
if not parts:
return "", ""
# Séparer les mots en majuscules des autres
uppercase_words = []
other_words = []
for part in parts:
# Un mot est considéré en majuscules s'il contient au moins 2 caractères
# et que tous ses caractères alphabétiques sont en majuscules
if len(part) >= 2 and part.isupper():
uppercase_words.append(part)
else:
other_words.append(part)
# Le nom de famille = mots en majuscules
last_name = " ".join(uppercase_words) if uppercase_words else parts[0]
# Les prénoms = autres mots, ou le reste si pas de majuscules détectées
if uppercase_words and other_words:
first_name = " ".join(other_words)
elif len(parts) > 1:
# Fallback: premier mot = nom, reste = prénoms
last_name = parts[0]
first_name = " ".join(parts[1:])
else:
# Un seul mot : considérer comme prénom
first_name = parts[0]
last_name = ""
return last_name.strip(), first_name.strip()
@router.post("/{class_id}/import-csv", response_model=CSVImportResponse)
async def import_csv(
class_id: int,
session: AsyncSessionDep,
file: UploadFile = File(..., description="Fichier CSV à importer"),
enrollment_date: Optional[date] = Form(None, description="Date d'inscription (défaut: aujourd'hui)"),
skip_duplicates: bool = Form(True, description="Ignorer les doublons (True) ou échouer (False)"),
):
"""
Importe des élèves depuis un fichier CSV dans une classe.
Format CSV attendu:
- Séparateur: ;
- Première colonne: "NOM Prénoms" (ex: "DUPONT Marie Claire")
- Première ligne: headers (ignorée)
L'extraction du nom/prénom utilise la logique suivante:
- Mots en MAJUSCULES → nom de famille
- Autres mots → prénoms
"""
# Vérifier que la classe existe
class_query = select(ClassGroup).where(ClassGroup.id == class_id)
class_result = await session.execute(class_query)
cls = class_result.scalar_one_or_none()
if not cls:
raise HTTPException(status_code=404, detail="Classe non trouvée")
# Utiliser la date du jour si non spécifiée
if enrollment_date is None:
enrollment_date = date.today()
# Lire le contenu du fichier
try:
content = await file.read()
csv_content = content.decode('utf-8')
except UnicodeDecodeError:
# Essayer avec latin-1
try:
csv_content = content.decode('latin-1')
except Exception as e:
return CSVImportResponse(
success=False,
total_lines=0,
imported_count=0,
skipped_count=0,
error_count=1,
imported_students=[],
skipped_students=[],
errors=[ImportErrorInfo(line=0, error=f"Erreur de décodage du fichier: {str(e)}")],
message="Erreur de décodage du fichier"
)
# Parser le CSV
parsed_students = []
try:
csv_reader = csv.reader(StringIO(csv_content), delimiter=';')
# Ignorer la première ligne (headers)
headers = next(csv_reader, None)
if not headers:
return CSVImportResponse(
success=True,
total_lines=0,
imported_count=0,
skipped_count=0,
error_count=0,
imported_students=[],
skipped_students=[],
errors=[],
message="Fichier CSV vide"
)
for line_number, row in enumerate(csv_reader, start=2):
if not row or len(row) < 1:
continue
# Prendre la première colonne (nom complet)
full_name = row[0].strip().strip('"')
if not full_name or full_name.lower() in ['undefined', 'null', '']:
continue
# Extraire nom et prénom
last_name, first_name = extract_name_parts(full_name)
if not first_name and not last_name:
continue
parsed_students.append({
'first_name': first_name or "Prénom",
'last_name': last_name or "Nom",
'line_number': line_number,
'raw_name': full_name
})
except Exception as e:
return CSVImportResponse(
success=False,
total_lines=0,
imported_count=0,
skipped_count=0,
error_count=1,
imported_students=[],
skipped_students=[],
errors=[ImportErrorInfo(line=0, error=f"Erreur lors de l'analyse du fichier CSV: {str(e)}")],
message=f"Erreur lors de l'analyse du fichier CSV: {str(e)}"
)
if not parsed_students:
return CSVImportResponse(
success=True,
total_lines=0,
imported_count=0,
skipped_count=0,
error_count=0,
imported_students=[],
skipped_students=[],
errors=[],
message="Aucun élève trouvé dans le fichier CSV"
)
# Importer les élèves
imported_students = []
skipped_students = []
errors = []
for student_data in parsed_students:
try:
# Vérifier si l'élève existe déjà
existing_query = select(Student).where(
func.lower(Student.first_name) == student_data['first_name'].lower(),
func.lower(Student.last_name) == student_data['last_name'].lower()
)
existing_result = await session.execute(existing_query)
existing_student = existing_result.scalar_one_or_none()
if existing_student:
if skip_duplicates:
skipped_students.append(SkippedStudentInfo(
line=student_data['line_number'],
name=f"{student_data['first_name']} {student_data['last_name']}",
reason="Élève déjà existant"
))
continue
else:
errors.append(ImportErrorInfo(
line=student_data['line_number'],
error=f"Élève déjà existant: {student_data['first_name']} {student_data['last_name']}"
))
continue
# Créer le nouvel élève
new_student = Student(
first_name=student_data['first_name'],
last_name=student_data['last_name'],
email=None
)
session.add(new_student)
await session.flush() # Pour obtenir l'ID
# Créer l'inscription dans la classe
enrollment = StudentEnrollment(
student_id=new_student.id,
class_group_id=class_id,
enrollment_date=enrollment_date,
enrollment_reason="Import CSV"
)
session.add(enrollment)
imported_students.append(ImportedStudentInfo(
first_name=student_data['first_name'],
last_name=student_data['last_name'],
email=None,
line_number=student_data['line_number'],
raw_name=student_data['raw_name']
))
except Exception as e:
errors.append(ImportErrorInfo(
line=student_data['line_number'],
error=f"Erreur création élève {student_data['first_name']} {student_data['last_name']}: {str(e)}"
))
# Gérer les erreurs et le commit
if errors and not skip_duplicates:
await session.rollback()
return CSVImportResponse(
success=False,
total_lines=len(parsed_students),
imported_count=0,
skipped_count=len(skipped_students),
error_count=len(errors),
imported_students=[],
skipped_students=skipped_students,
errors=errors,
message=f"Import échoué: {len(errors)} erreur(s)"
)
await session.commit()
return CSVImportResponse(
success=True,
total_lines=len(parsed_students),
imported_count=len(imported_students),
skipped_count=len(skipped_students),
error_count=len(errors),
imported_students=imported_students,
skipped_students=skipped_students,
errors=errors,
message=f"Import réussi: {len(imported_students)} élève(s) importé(s), {len(skipped_students)} ignoré(s)"
)