Files
notytex/backend/api/routes/classes.py
Bertrand Benjamin 2b08eb534a Migration v1 (Flask) -> v2 (FastAPI + Vue.js) complétée
 Changements majeurs:
- Suppression complète du code Flask legacy
- Migration backend FastAPI vers racine /backend
- Migration frontend Vue.js vers racine /frontend
- Suppression de notytex-v2/ (code monté à la racine)

 Validations:
- Backend démarre correctement (port 8000)
- API /api/v2/health répond healthy
- 99/99 tests unitaires passent
- Frontend configuré avec proxy Vite

📝 Documentation:
- README.md réécrit pour v2
- Instructions de démarrage mises à jour
- .gitignore adapté pour backend/frontend/

🎯 Architecture finale:
notytex/
├── backend/     # FastAPI + SQLAlchemy + Pydantic
├── frontend/    # Vue 3 + Vite + TailwindCSS
├── docs/        # Documentation
└── school_management.db  # Base de données (inchangée)

Jalon 6 complété: Application v2 prête pour utilisation!
2025-11-25 21:09:47 +01:00

840 lines
28 KiB
Python

"""
Routes API pour les classes (ClassGroup).
"""
import csv
from io import StringIO
from typing import Optional, List, Tuple
from datetime import date
from fastapi import APIRouter, HTTPException, Query, UploadFile, File, Form
from sqlalchemy import select, func
from sqlalchemy.orm import selectinload
from api.dependencies import AsyncSessionDep
from infrastructure.database.models import (
ClassGroup,
Student,
StudentEnrollment,
Assessment,
Exercise,
GradingElement,
Grade,
Domain,
Competence,
)
from schemas.class_group import (
ClassGroupRead,
ClassGroupDetail,
ClassGroupList,
ClassGroupCreate,
ClassGroupUpdate,
ClassGroupResponse,
ClassDashboardStats,
StudentAverage,
HistogramBin,
DomainStats,
CompetenceStats,
)
from domain.services.grading_calculator import GradingCalculator
from schemas.student import StudentWithClass, StudentList
from schemas.csv_import import (
CSVImportResponse,
ImportedStudentInfo,
SkippedStudentInfo,
ImportErrorInfo,
)
router = APIRouter(prefix="/classes", tags=["Classes"])
@router.get("", response_model=ClassGroupList)
async def get_classes(
session: AsyncSessionDep,
year: Optional[str] = Query(None, description="Filtrer par année scolaire"),
):
"""
Récupère la liste de toutes les classes.
"""
query = select(ClassGroup)
if year:
query = query.where(ClassGroup.year == year)
query = query.order_by(ClassGroup.year.desc(), ClassGroup.name)
result = await session.execute(query)
classes = result.scalars().all()
# Calculer le nombre d'étudiants et d'évaluations pour chaque classe
classes_with_count = []
for cls in classes:
# Compter les étudiants actuellement inscrits
students_count_query = select(func.count(StudentEnrollment.id)).where(
StudentEnrollment.class_group_id == cls.id,
StudentEnrollment.departure_date.is_(None)
)
students_count_result = await session.execute(students_count_query)
students_count = students_count_result.scalar() or 0
# Compter les évaluations de cette classe
assessments_count_query = select(func.count(Assessment.id)).where(
Assessment.class_group_id == cls.id
)
assessments_count_result = await session.execute(assessments_count_query)
assessments_count = assessments_count_result.scalar() or 0
classes_with_count.append(
ClassGroupRead(
id=cls.id,
name=cls.name,
description=cls.description,
year=cls.year,
students_count=students_count,
assessments_count=assessments_count
)
)
return ClassGroupList(
classes=classes_with_count,
total=len(classes_with_count)
)
@router.get("/{class_id}", response_model=ClassGroupDetail)
async def get_class(
class_id: int,
session: AsyncSessionDep,
):
"""
Récupère les détails d'une classe spécifique.
"""
query = select(ClassGroup).where(ClassGroup.id == class_id)
result = await session.execute(query)
cls = result.scalar_one_or_none()
if not cls:
raise HTTPException(status_code=404, detail="Classe non trouvée")
# Compter les étudiants
count_query = select(func.count(StudentEnrollment.id)).where(
StudentEnrollment.class_group_id == cls.id,
StudentEnrollment.departure_date.is_(None)
)
count_result = await session.execute(count_query)
students_count = count_result.scalar() or 0
# Compter les évaluations
assessments_query = select(func.count(Assessment.id)).where(
Assessment.class_group_id == cls.id
)
assessments_result = await session.execute(assessments_query)
assessments_count = assessments_result.scalar() or 0
return ClassGroupDetail(
id=cls.id,
name=cls.name,
description=cls.description,
year=cls.year,
students_count=students_count,
assessments_count=assessments_count
)
@router.get("/{class_id}/students", response_model=StudentList)
async def get_class_students(
class_id: int,
session: AsyncSessionDep,
include_departed: bool = Query(False, description="Inclure les élèves partis"),
at_date: Optional[str] = Query(None, description="Filtrer les élèves inscrits à cette date (YYYY-MM-DD)"),
):
"""
Récupère la liste des étudiants d'une classe.
Si at_date est fourni, retourne uniquement les élèves qui étaient inscrits à cette date.
"""
# Vérifier que la classe existe
class_query = select(ClassGroup).where(ClassGroup.id == class_id)
class_result = await session.execute(class_query)
cls = class_result.scalar_one_or_none()
if not cls:
raise HTTPException(status_code=404, detail="Classe non trouvée")
# Récupérer les inscriptions
query = (
select(StudentEnrollment)
.options(selectinload(StudentEnrollment.student))
.where(StudentEnrollment.class_group_id == class_id)
)
# Filtrer par date si spécifié
if at_date:
from datetime import date as date_type
try:
filter_date = date_type.fromisoformat(at_date)
except ValueError:
raise HTTPException(status_code=400, detail="Format de date invalide (YYYY-MM-DD)")
query = query.where(
StudentEnrollment.enrollment_date <= filter_date,
(StudentEnrollment.departure_date.is_(None) |
(StudentEnrollment.departure_date >= filter_date))
)
elif not include_departed:
query = query.where(StudentEnrollment.departure_date.is_(None))
result = await session.execute(query)
enrollments = result.scalars().all()
students = []
for enrollment in enrollments:
student = enrollment.student
students.append(
StudentWithClass(
id=student.id,
last_name=student.last_name,
first_name=student.first_name,
email=student.email,
full_name=f"{student.first_name} {student.last_name}",
current_class_id=class_id if enrollment.departure_date is None else None,
current_class_name=cls.name if enrollment.departure_date is None else None
)
)
# Trier par nom de famille puis prénom
students.sort(key=lambda s: (s.last_name.lower(), s.first_name.lower()))
return StudentList(
students=students,
total=len(students)
)
@router.get("/{class_id}/stats", response_model=ClassDashboardStats)
async def get_class_stats(
class_id: int,
trimester: int,
session: AsyncSessionDep,
):
"""
Récupère les statistiques complètes d'une classe pour un trimestre.
Inclut:
- Moyennes par élève
- Statistiques globales (moyenne, médiane, écart-type)
- Histogramme des moyennes
- Analyse par domaines et compétences
"""
# Vérifier que la classe existe
class_query = select(ClassGroup).where(ClassGroup.id == class_id)
class_result = await session.execute(class_query)
cls = class_result.scalar_one_or_none()
if not cls:
raise HTTPException(status_code=404, detail="Classe non trouvée")
# Récupérer les élèves actuellement inscrits
students_query = (
select(Student)
.join(StudentEnrollment, Student.id == StudentEnrollment.student_id)
.where(
StudentEnrollment.class_group_id == class_id,
StudentEnrollment.departure_date.is_(None)
)
.order_by(Student.last_name, Student.first_name)
)
students_result = await session.execute(students_query)
students = students_result.scalars().all()
# Récupérer les évaluations du trimestre
assessments_query = select(Assessment).where(
Assessment.class_group_id == class_id,
Assessment.trimester == trimester
)
assessments_result = await session.execute(assessments_query)
assessments = assessments_result.scalars().all()
# Calculer les moyennes de chaque élève
calculator = GradingCalculator()
student_averages = []
all_averages = []
for student in students:
weighted_sum = 0.0
total_coefficient = 0.0
assessment_count = 0
for assessment in assessments:
# Récupérer les notes de l'élève pour cette évaluation
grades_query = (
select(Grade, GradingElement)
.join(GradingElement, Grade.grading_element_id == GradingElement.id)
.join(Exercise, GradingElement.exercise_id == Exercise.id)
.where(
Exercise.assessment_id == assessment.id,
Grade.student_id == student.id
)
)
grades_result = await session.execute(grades_query)
grades_data = grades_result.all()
if grades_data:
total_score = 0.0
total_max_points = 0.0
for grade, element in grades_data:
if grade.value:
score = calculator.calculate_score(
grade.value, element.grading_type, element.max_points
)
if score is not None and calculator.is_counted_in_total(grade.value):
total_score += score
total_max_points += element.max_points
if total_max_points > 0:
# Ramener sur 20
score_on_20 = total_score / total_max_points * 20
weighted_sum += score_on_20 * assessment.coefficient
total_coefficient += assessment.coefficient
assessment_count += 1
# Calculer la moyenne pondérée
average = None
if total_coefficient > 0:
average = round(weighted_sum / total_coefficient, 2)
all_averages.append(average)
student_averages.append(StudentAverage(
student_id=student.id,
first_name=student.first_name,
last_name=student.last_name,
full_name=f"{student.first_name} {student.last_name}",
average=average,
assessment_count=assessment_count
))
# Calculer les statistiques globales
mean = median = std_dev = min_score = max_score = None
if all_averages:
mean = round(sum(all_averages) / len(all_averages), 2)
sorted_averages = sorted(all_averages)
n = len(sorted_averages)
if n % 2 == 0:
median = (sorted_averages[n // 2 - 1] + sorted_averages[n // 2]) / 2
else:
median = sorted_averages[n // 2]
median = round(median, 2)
min_score = min(all_averages)
max_score = max(all_averages)
# Écart-type
variance = sum((x - mean) ** 2 for x in all_averages) / len(all_averages)
std_dev = round(variance ** 0.5, 2)
# Créer l'histogramme (bins de 2 points)
histogram = []
for i in range(0, 20, 2):
count = sum(1 for avg in all_averages if i <= avg < i + 2)
histogram.append(HistogramBin(
range_start=float(i),
range_end=float(i + 2),
label=f"{i}-{i + 2}",
count=count
))
# Ajouter le dernier bin pour 20
count_20 = sum(1 for avg in all_averages if avg == 20)
if count_20 > 0:
histogram[-1].count += count_20
# Compter les évaluations par statut
assessments_completed = 0
assessments_in_progress = 0
for assessment in assessments:
# Compter les notes pour cette évaluation
grades_count_query = (
select(func.count(Grade.id))
.join(GradingElement, Grade.grading_element_id == GradingElement.id)
.join(Exercise, GradingElement.exercise_id == Exercise.id)
.where(Exercise.assessment_id == assessment.id)
)
grades_count_result = await session.execute(grades_count_query)
grades_count = grades_count_result.scalar() or 0
# Compter le total attendu
elements_count_query = (
select(func.count(GradingElement.id))
.join(Exercise, GradingElement.exercise_id == Exercise.id)
.where(Exercise.assessment_id == assessment.id)
)
elements_count_result = await session.execute(elements_count_query)
elements_count = elements_count_result.scalar() or 0
expected_grades = elements_count * len(students)
if expected_grades > 0:
if grades_count >= expected_grades:
assessments_completed += 1
elif grades_count > 0:
assessments_in_progress += 1
# Statistiques par domaine et compétence (simplifié)
domains_stats = []
competences_stats = []
# Récupérer les domaines
domains_query = select(Domain).order_by(Domain.name)
domains_result = await session.execute(domains_query)
domains = domains_result.scalars().all()
for domain in domains:
domains_stats.append(DomainStats(
id=domain.id,
name=domain.name,
color=domain.color,
mean=None,
elements_count=0
))
# Récupérer les compétences
competences_query = select(Competence).order_by(Competence.order_index)
competences_result = await session.execute(competences_query)
competences = competences_result.scalars().all()
for competence in competences:
competences_stats.append(CompetenceStats(
id=competence.id,
name=competence.name,
color=competence.color,
mean=None,
elements_count=0
))
return ClassDashboardStats(
class_id=class_id,
class_name=cls.name,
trimester=trimester,
students_count=len(students),
mean=mean,
median=median,
std_dev=std_dev,
min_score=min_score,
max_score=max_score,
assessments_total=len(assessments),
assessments_completed=assessments_completed,
assessments_in_progress=assessments_in_progress,
student_averages=student_averages,
histogram=histogram,
domains_stats=domains_stats,
competences_stats=competences_stats
)
@router.post("", response_model=ClassGroupRead, status_code=201)
async def create_class(
class_data: ClassGroupCreate,
session: AsyncSessionDep,
):
"""
Crée une nouvelle classe.
"""
# Vérifier l'unicité du nom
existing_query = select(ClassGroup).where(ClassGroup.name == class_data.name)
existing_result = await session.execute(existing_query)
if existing_result.scalar_one_or_none():
raise HTTPException(
status_code=400,
detail=f"Une classe avec le nom '{class_data.name}' existe déjà"
)
# Créer la nouvelle classe
new_class = ClassGroup(
name=class_data.name,
description=class_data.description,
year=class_data.year
)
session.add(new_class)
await session.commit()
await session.refresh(new_class)
return ClassGroupRead(
id=new_class.id,
name=new_class.name,
description=new_class.description,
year=new_class.year,
students_count=0
)
@router.put("/{class_id}", response_model=ClassGroupRead)
async def update_class(
class_id: int,
class_data: ClassGroupUpdate,
session: AsyncSessionDep,
):
"""
Modifie une classe existante.
"""
# Récupérer la classe
query = select(ClassGroup).where(ClassGroup.id == class_id)
result = await session.execute(query)
cls = result.scalar_one_or_none()
if not cls:
raise HTTPException(status_code=404, detail="Classe non trouvée")
# Vérifier l'unicité du nouveau nom si modifié
if class_data.name and class_data.name != cls.name:
existing_query = select(ClassGroup).where(
ClassGroup.name == class_data.name,
ClassGroup.id != class_id
)
existing_result = await session.execute(existing_query)
if existing_result.scalar_one_or_none():
raise HTTPException(
status_code=400,
detail=f"Une autre classe avec le nom '{class_data.name}' existe déjà"
)
# Appliquer les modifications
if class_data.name is not None:
cls.name = class_data.name
if class_data.description is not None:
cls.description = class_data.description
if class_data.year is not None:
cls.year = class_data.year
await session.commit()
await session.refresh(cls)
# Compter les étudiants
count_query = select(func.count(StudentEnrollment.id)).where(
StudentEnrollment.class_group_id == cls.id,
StudentEnrollment.departure_date.is_(None)
)
count_result = await session.execute(count_query)
students_count = count_result.scalar() or 0
return ClassGroupRead(
id=cls.id,
name=cls.name,
description=cls.description,
year=cls.year,
students_count=students_count
)
@router.delete("/{class_id}", status_code=204)
async def delete_class(
class_id: int,
session: AsyncSessionDep,
):
"""
Supprime une classe.
La suppression échoue si la classe contient des élèves ou des évaluations.
"""
# Récupérer la classe
query = select(ClassGroup).where(ClassGroup.id == class_id)
result = await session.execute(query)
cls = result.scalar_one_or_none()
if not cls:
raise HTTPException(status_code=404, detail="Classe non trouvée")
# Vérifier les dépendances : élèves inscrits
students_query = select(func.count(StudentEnrollment.id)).where(
StudentEnrollment.class_group_id == class_id
)
students_result = await session.execute(students_query)
students_count = students_result.scalar() or 0
# Vérifier les dépendances : évaluations
assessments_query = select(func.count(Assessment.id)).where(
Assessment.class_group_id == class_id
)
assessments_result = await session.execute(assessments_query)
assessments_count = assessments_result.scalar() or 0
if students_count > 0 or assessments_count > 0:
raise HTTPException(
status_code=400,
detail=f"Impossible de supprimer la classe '{cls.name}'. "
f"Elle contient {students_count} inscription(s) et {assessments_count} évaluation(s). "
f"Supprimez d'abord ces éléments."
)
# Supprimer la classe
await session.delete(cls)
await session.commit()
return None
def extract_name_parts(full_name: str) -> Tuple[str, str]:
"""
Extrait le nom et prénom depuis un nom complet.
Stratégie pour "AABIDA LAHDILI Fatima Zahra":
- Les mots en MAJUSCULES → nom de famille
- Les autres mots → prénoms
Args:
full_name: Nom complet de l'élève
Returns:
Tuple (nom_de_famille, prenoms)
"""
if not full_name or not full_name.strip():
return "", ""
# Nettoyer et séparer les mots
parts = full_name.strip().split()
if not parts:
return "", ""
# Séparer les mots en majuscules des autres
uppercase_words = []
other_words = []
for part in parts:
# Un mot est considéré en majuscules s'il contient au moins 2 caractères
# et que tous ses caractères alphabétiques sont en majuscules
if len(part) >= 2 and part.isupper():
uppercase_words.append(part)
else:
other_words.append(part)
# Le nom de famille = mots en majuscules
last_name = " ".join(uppercase_words) if uppercase_words else parts[0]
# Les prénoms = autres mots, ou le reste si pas de majuscules détectées
if uppercase_words and other_words:
first_name = " ".join(other_words)
elif len(parts) > 1:
# Fallback: premier mot = nom, reste = prénoms
last_name = parts[0]
first_name = " ".join(parts[1:])
else:
# Un seul mot : considérer comme prénom
first_name = parts[0]
last_name = ""
return last_name.strip(), first_name.strip()
@router.post("/{class_id}/import-csv", response_model=CSVImportResponse)
async def import_csv(
class_id: int,
session: AsyncSessionDep,
file: UploadFile = File(..., description="Fichier CSV à importer"),
enrollment_date: Optional[date] = Form(None, description="Date d'inscription (défaut: aujourd'hui)"),
skip_duplicates: bool = Form(True, description="Ignorer les doublons (True) ou échouer (False)"),
):
"""
Importe des élèves depuis un fichier CSV dans une classe.
Format CSV attendu:
- Séparateur: ;
- Première colonne: "NOM Prénoms" (ex: "DUPONT Marie Claire")
- Première ligne: headers (ignorée)
L'extraction du nom/prénom utilise la logique suivante:
- Mots en MAJUSCULES → nom de famille
- Autres mots → prénoms
"""
# Vérifier que la classe existe
class_query = select(ClassGroup).where(ClassGroup.id == class_id)
class_result = await session.execute(class_query)
cls = class_result.scalar_one_or_none()
if not cls:
raise HTTPException(status_code=404, detail="Classe non trouvée")
# Utiliser la date du jour si non spécifiée
if enrollment_date is None:
enrollment_date = date.today()
# Lire le contenu du fichier
try:
content = await file.read()
csv_content = content.decode('utf-8')
except UnicodeDecodeError:
# Essayer avec latin-1
try:
csv_content = content.decode('latin-1')
except Exception as e:
return CSVImportResponse(
success=False,
total_lines=0,
imported_count=0,
skipped_count=0,
error_count=1,
imported_students=[],
skipped_students=[],
errors=[ImportErrorInfo(line=0, error=f"Erreur de décodage du fichier: {str(e)}")],
message="Erreur de décodage du fichier"
)
# Parser le CSV
parsed_students = []
try:
csv_reader = csv.reader(StringIO(csv_content), delimiter=';')
# Ignorer la première ligne (headers)
headers = next(csv_reader, None)
if not headers:
return CSVImportResponse(
success=True,
total_lines=0,
imported_count=0,
skipped_count=0,
error_count=0,
imported_students=[],
skipped_students=[],
errors=[],
message="Fichier CSV vide"
)
for line_number, row in enumerate(csv_reader, start=2):
if not row or len(row) < 1:
continue
# Prendre la première colonne (nom complet)
full_name = row[0].strip().strip('"')
if not full_name or full_name.lower() in ['undefined', 'null', '']:
continue
# Extraire nom et prénom
last_name, first_name = extract_name_parts(full_name)
if not first_name and not last_name:
continue
parsed_students.append({
'first_name': first_name or "Prénom",
'last_name': last_name or "Nom",
'line_number': line_number,
'raw_name': full_name
})
except Exception as e:
return CSVImportResponse(
success=False,
total_lines=0,
imported_count=0,
skipped_count=0,
error_count=1,
imported_students=[],
skipped_students=[],
errors=[ImportErrorInfo(line=0, error=f"Erreur lors de l'analyse du fichier CSV: {str(e)}")],
message=f"Erreur lors de l'analyse du fichier CSV: {str(e)}"
)
if not parsed_students:
return CSVImportResponse(
success=True,
total_lines=0,
imported_count=0,
skipped_count=0,
error_count=0,
imported_students=[],
skipped_students=[],
errors=[],
message="Aucun élève trouvé dans le fichier CSV"
)
# Importer les élèves
imported_students = []
skipped_students = []
errors = []
for student_data in parsed_students:
try:
# Vérifier si l'élève existe déjà
existing_query = select(Student).where(
func.lower(Student.first_name) == student_data['first_name'].lower(),
func.lower(Student.last_name) == student_data['last_name'].lower()
)
existing_result = await session.execute(existing_query)
existing_student = existing_result.scalar_one_or_none()
if existing_student:
if skip_duplicates:
skipped_students.append(SkippedStudentInfo(
line=student_data['line_number'],
name=f"{student_data['first_name']} {student_data['last_name']}",
reason="Élève déjà existant"
))
continue
else:
errors.append(ImportErrorInfo(
line=student_data['line_number'],
error=f"Élève déjà existant: {student_data['first_name']} {student_data['last_name']}"
))
continue
# Créer le nouvel élève
new_student = Student(
first_name=student_data['first_name'],
last_name=student_data['last_name'],
email=None
)
session.add(new_student)
await session.flush() # Pour obtenir l'ID
# Créer l'inscription dans la classe
enrollment = StudentEnrollment(
student_id=new_student.id,
class_group_id=class_id,
enrollment_date=enrollment_date,
enrollment_reason="Import CSV"
)
session.add(enrollment)
imported_students.append(ImportedStudentInfo(
first_name=student_data['first_name'],
last_name=student_data['last_name'],
email=None,
line_number=student_data['line_number'],
raw_name=student_data['raw_name']
))
except Exception as e:
errors.append(ImportErrorInfo(
line=student_data['line_number'],
error=f"Erreur création élève {student_data['first_name']} {student_data['last_name']}: {str(e)}"
))
# Gérer les erreurs et le commit
if errors and not skip_duplicates:
await session.rollback()
return CSVImportResponse(
success=False,
total_lines=len(parsed_students),
imported_count=0,
skipped_count=len(skipped_students),
error_count=len(errors),
imported_students=[],
skipped_students=skipped_students,
errors=errors,
message=f"Import échoué: {len(errors)} erreur(s)"
)
await session.commit()
return CSVImportResponse(
success=True,
total_lines=len(parsed_students),
imported_count=len(imported_students),
skipped_count=len(skipped_students),
error_count=len(errors),
imported_students=imported_students,
skipped_students=skipped_students,
errors=errors,
message=f"Import réussi: {len(imported_students)} élève(s) importé(s), {len(skipped_students)} ignoré(s)"
)