Files
notytex/backend/api/routes/students.py
Bertrand Benjamin 08c8ee4931
All checks were successful
Build and Publish Docker Images / Build Backend Image (push) Successful in 3m1s
Build and Publish Docker Images / Build Frontend Image (push) Successful in 3m0s
Build and Publish Docker Images / Build Summary (push) Successful in 3s
feat(class): improve class/id/student
2025-12-03 06:32:16 +01:00

574 lines
19 KiB
Python

"""
Routes API pour les étudiants (Student).
"""
from typing import Optional
from datetime import date
from fastapi import APIRouter, HTTPException, Query
from sqlalchemy import select, func
from sqlalchemy.orm import selectinload
from api.dependencies import AsyncSessionDep
from infrastructure.database.models import (
Student,
StudentEnrollment,
ClassGroup,
)
from schemas.student import (
StudentRead,
StudentWithClass,
StudentDetail,
StudentList,
EnrollmentRead,
StudentCreate,
StudentUpdate,
EnrollRequest,
TransferRequest,
DepartureRequest,
EnrollmentResponse,
TransferResponse,
DepartureResponse,
)
router = APIRouter(prefix="/students", tags=["Students"])
@router.get("", response_model=StudentList)
async def get_students(
session: AsyncSessionDep,
search: Optional[str] = Query(None, description="Recherche par nom ou prénom"),
class_id: Optional[int] = Query(None, description="Filtrer par classe actuelle"),
):
"""
Récupère la liste de tous les étudiants.
"""
query = select(Student).options(
selectinload(Student.enrollments).selectinload(StudentEnrollment.class_group)
)
if search:
search_term = f"%{search}%"
query = query.where(
(Student.last_name.ilike(search_term)) |
(Student.first_name.ilike(search_term))
)
query = query.order_by(Student.last_name, Student.first_name)
result = await session.execute(query)
students = result.scalars().all()
students_list = []
for student in students:
# Trouver l'inscription active
current_enrollment = None
for enrollment in student.enrollments:
if enrollment.departure_date is None:
current_enrollment = enrollment
break
# Filtrer par classe si demandé
if class_id and (not current_enrollment or current_enrollment.class_group_id != class_id):
continue
students_list.append(
StudentWithClass(
id=student.id,
last_name=student.last_name,
first_name=student.first_name,
email=student.email,
full_name=f"{student.first_name} {student.last_name}",
current_class_id=current_enrollment.class_group_id if current_enrollment else None,
current_class_name=current_enrollment.class_group.name if current_enrollment else None
)
)
return StudentList(
students=students_list,
total=len(students_list)
)
@router.get("/{student_id}", response_model=StudentDetail)
async def get_student(
student_id: int,
session: AsyncSessionDep,
):
"""
Récupère les détails d'un étudiant spécifique avec son historique d'inscriptions.
"""
query = (
select(Student)
.options(
selectinload(Student.enrollments).selectinload(StudentEnrollment.class_group)
)
.where(Student.id == student_id)
)
result = await session.execute(query)
student = result.scalar_one_or_none()
if not student:
raise HTTPException(status_code=404, detail="Étudiant non trouvé")
# Trouver l'inscription active
current_enrollment = None
enrollments_list = []
for enrollment in student.enrollments:
if enrollment.departure_date is None:
current_enrollment = enrollment
enrollments_list.append(
EnrollmentRead(
id=enrollment.id,
student_id=enrollment.student_id,
class_group_id=enrollment.class_group_id,
class_name=enrollment.class_group.name,
enrollment_date=enrollment.enrollment_date,
departure_date=enrollment.departure_date,
enrollment_reason=enrollment.enrollment_reason,
departure_reason=enrollment.departure_reason,
is_active=enrollment.departure_date is None,
created_at=enrollment.created_at,
updated_at=enrollment.updated_at
)
)
# Trier les inscriptions par date décroissante
enrollments_list.sort(key=lambda e: e.enrollment_date, reverse=True)
return StudentDetail(
id=student.id,
last_name=student.last_name,
first_name=student.first_name,
email=student.email,
full_name=f"{student.first_name} {student.last_name}",
current_class_id=current_enrollment.class_group_id if current_enrollment else None,
current_class_name=current_enrollment.class_group.name if current_enrollment else None,
enrollments=enrollments_list
)
@router.post("", response_model=StudentWithClass, status_code=201)
async def create_student(
student_data: StudentCreate,
session: AsyncSessionDep,
):
"""
Crée un nouvel étudiant et l'inscrit optionnellement dans une classe.
"""
# Vérifier l'unicité de l'email si fourni
if student_data.email:
existing_query = select(Student).where(Student.email == student_data.email)
existing_result = await session.execute(existing_query)
if existing_result.scalar_one_or_none():
raise HTTPException(
status_code=400,
detail=f"Un élève avec l'email '{student_data.email}' existe déjà"
)
# Créer l'étudiant
new_student = Student(
last_name=student_data.last_name,
first_name=student_data.first_name,
email=student_data.email
)
session.add(new_student)
await session.flush() # Pour obtenir l'ID
current_class_id = None
current_class_name = None
# Créer l'inscription si class_group_id est fourni
if student_data.class_group_id:
# Vérifier que la classe existe
class_query = select(ClassGroup).where(ClassGroup.id == student_data.class_group_id)
class_result = await session.execute(class_query)
class_group = class_result.scalar_one_or_none()
if not class_group:
raise HTTPException(status_code=404, detail="Classe non trouvée")
enrollment = StudentEnrollment(
student_id=new_student.id,
class_group_id=student_data.class_group_id,
enrollment_date=student_data.enrollment_date or date.today(),
enrollment_reason=student_data.enrollment_reason
)
session.add(enrollment)
current_class_id = class_group.id
current_class_name = class_group.name
await session.commit()
return StudentWithClass(
id=new_student.id,
last_name=new_student.last_name,
first_name=new_student.first_name,
email=new_student.email,
full_name=f"{new_student.first_name} {new_student.last_name}",
current_class_id=current_class_id,
current_class_name=current_class_name
)
@router.put("/{student_id}", response_model=StudentWithClass)
async def update_student(
student_id: int,
student_data: StudentUpdate,
session: AsyncSessionDep,
):
"""
Modifie les informations d'un étudiant.
"""
# Récupérer l'étudiant
query = (
select(Student)
.options(
selectinload(Student.enrollments).selectinload(StudentEnrollment.class_group)
)
.where(Student.id == student_id)
)
result = await session.execute(query)
student = result.scalar_one_or_none()
if not student:
raise HTTPException(status_code=404, detail="Étudiant non trouvé")
# Vérifier l'unicité du nouvel email si modifié
if student_data.email and student_data.email != student.email:
existing_query = select(Student).where(
Student.email == student_data.email,
Student.id != student_id
)
existing_result = await session.execute(existing_query)
if existing_result.scalar_one_or_none():
raise HTTPException(
status_code=400,
detail=f"Un autre élève avec l'email '{student_data.email}' existe déjà"
)
# Appliquer les modifications
if student_data.last_name is not None:
student.last_name = student_data.last_name
if student_data.first_name is not None:
student.first_name = student_data.first_name
if student_data.email is not None:
student.email = student_data.email if student_data.email else None
await session.commit()
await session.refresh(student)
# Trouver la classe actuelle
current_enrollment = None
for enrollment in student.enrollments:
if enrollment.departure_date is None:
current_enrollment = enrollment
break
return StudentWithClass(
id=student.id,
last_name=student.last_name,
first_name=student.first_name,
email=student.email,
full_name=f"{student.first_name} {student.last_name}",
current_class_id=current_enrollment.class_group_id if current_enrollment else None,
current_class_name=current_enrollment.class_group.name if current_enrollment else None
)
@router.patch("/{student_id}/email", response_model=StudentWithClass)
async def update_student_email(
student_id: int,
email: str,
session: AsyncSessionDep,
):
"""
Modifie rapidement l'email d'un étudiant.
Endpoint optimisé pour l'édition inline.
"""
# Récupérer l'étudiant
query = (
select(Student)
.options(
selectinload(Student.enrollments).selectinload(StudentEnrollment.class_group)
)
.where(Student.id == student_id)
)
result = await session.execute(query)
student = result.scalar_one_or_none()
if not student:
raise HTTPException(status_code=404, detail="Étudiant non trouvé")
# Vérifier l'unicité du nouvel email si fourni
if email and email != student.email:
existing_query = select(Student).where(
Student.email == email,
Student.id != student_id
)
existing_result = await session.execute(existing_query)
if existing_result.scalar_one_or_none():
raise HTTPException(
status_code=400,
detail=f"Un autre élève avec l'email '{email}' existe déjà"
)
# Mettre à jour l'email (permet de le vider avec chaîne vide)
student.email = email if email else None
await session.commit()
await session.refresh(student)
# Trouver la classe actuelle
current_enrollment = None
for enrollment in student.enrollments:
if enrollment.departure_date is None:
current_enrollment = enrollment
break
return StudentWithClass(
id=student.id,
last_name=student.last_name,
first_name=student.first_name,
email=student.email,
full_name=f"{student.first_name} {student.last_name}",
current_class_id=current_enrollment.class_group_id if current_enrollment else None,
current_class_name=current_enrollment.class_group.name if current_enrollment else None
)
@router.delete("/{student_id}", status_code=204)
async def delete_student(
student_id: int,
session: AsyncSessionDep,
):
"""
Supprime un étudiant et toutes ses inscriptions.
"""
# Récupérer l'étudiant
query = select(Student).where(Student.id == student_id)
result = await session.execute(query)
student = result.scalar_one_or_none()
if not student:
raise HTTPException(status_code=404, detail="Étudiant non trouvé")
# Supprimer l'étudiant (les inscriptions seront supprimées en cascade)
await session.delete(student)
await session.commit()
return None
@router.post("/enroll", response_model=EnrollmentResponse, status_code=201)
async def enroll_student(
enroll_data: EnrollRequest,
session: AsyncSessionDep,
):
"""
Inscrit un élève dans une classe.
Peut créer un nouvel élève si student_id n'est pas fourni.
"""
# Vérifier que la classe existe
class_query = select(ClassGroup).where(ClassGroup.id == enroll_data.class_group_id)
class_result = await session.execute(class_query)
class_group = class_result.scalar_one_or_none()
if not class_group:
raise HTTPException(status_code=404, detail="Classe non trouvée")
is_new_student = False
if enroll_data.student_id:
# Élève existant
student_query = select(Student).where(Student.id == enroll_data.student_id)
student_result = await session.execute(student_query)
student = student_result.scalar_one_or_none()
if not student:
raise HTTPException(status_code=404, detail="Élève non trouvé")
# Vérifier qu'il n'a pas déjà une inscription active
active_query = select(StudentEnrollment).where(
StudentEnrollment.student_id == enroll_data.student_id,
StudentEnrollment.departure_date.is_(None)
)
active_result = await session.execute(active_query)
if active_result.scalar_one_or_none():
raise HTTPException(
status_code=400,
detail=f"L'élève {student.first_name} {student.last_name} est déjà inscrit dans une classe"
)
else:
# Nouvel élève
if not enroll_data.first_name or not enroll_data.last_name:
raise HTTPException(
status_code=400,
detail="Le prénom et le nom sont obligatoires pour créer un nouvel élève"
)
# Vérifier l'unicité de l'email si fourni
if enroll_data.email:
existing_query = select(Student).where(Student.email == enroll_data.email)
existing_result = await session.execute(existing_query)
if existing_result.scalar_one_or_none():
raise HTTPException(
status_code=400,
detail=f"Un élève avec l'email '{enroll_data.email}' existe déjà"
)
# Créer le nouvel élève
student = Student(
first_name=enroll_data.first_name,
last_name=enroll_data.last_name,
email=enroll_data.email
)
session.add(student)
await session.flush()
is_new_student = True
# Créer l'inscription
enrollment = StudentEnrollment(
student_id=student.id,
class_group_id=enroll_data.class_group_id,
enrollment_date=enroll_data.enrollment_date,
enrollment_reason=enroll_data.enrollment_reason
)
session.add(enrollment)
await session.commit()
await session.refresh(enrollment)
return EnrollmentResponse(
enrollment_id=enrollment.id,
student_id=student.id,
student_name=f"{student.first_name} {student.last_name}",
class_name=class_group.name,
message=f"Élève {student.first_name} {student.last_name} inscrit en {class_group.name}",
is_new_student=is_new_student
)
@router.post("/transfer", response_model=TransferResponse)
async def transfer_student(
transfer_data: TransferRequest,
session: AsyncSessionDep,
):
"""
Transfère un élève d'une classe à une autre.
"""
# Récupérer l'élève
student_query = select(Student).where(Student.id == transfer_data.student_id)
student_result = await session.execute(student_query)
student = student_result.scalar_one_or_none()
if not student:
raise HTTPException(status_code=404, detail="Élève non trouvé")
# Vérifier que la nouvelle classe existe
new_class_query = select(ClassGroup).where(ClassGroup.id == transfer_data.new_class_group_id)
new_class_result = await session.execute(new_class_query)
new_class = new_class_result.scalar_one_or_none()
if not new_class:
raise HTTPException(status_code=404, detail="Nouvelle classe non trouvée")
# Trouver l'inscription active
active_query = (
select(StudentEnrollment)
.options(selectinload(StudentEnrollment.class_group))
.where(
StudentEnrollment.student_id == transfer_data.student_id,
StudentEnrollment.departure_date.is_(None)
)
)
active_result = await session.execute(active_query)
old_enrollment = active_result.scalar_one_or_none()
if not old_enrollment:
raise HTTPException(
status_code=400,
detail=f"Aucune inscription active trouvée pour l'élève {student.first_name} {student.last_name}"
)
old_class_name = old_enrollment.class_group.name
# Terminer l'ancienne inscription
old_enrollment.departure_date = transfer_data.transfer_date
old_enrollment.departure_reason = transfer_data.transfer_reason or f"Transfert vers {new_class.name}"
# Créer la nouvelle inscription
new_enrollment = StudentEnrollment(
student_id=transfer_data.student_id,
class_group_id=transfer_data.new_class_group_id,
enrollment_date=transfer_data.transfer_date,
enrollment_reason=transfer_data.transfer_reason or f"Transfert depuis {old_class_name}"
)
session.add(new_enrollment)
await session.commit()
await session.refresh(new_enrollment)
return TransferResponse(
old_enrollment_id=old_enrollment.id,
new_enrollment_id=new_enrollment.id,
student_name=f"{student.first_name} {student.last_name}",
old_class_name=old_class_name,
new_class_name=new_class.name,
message=f"Élève {student.first_name} {student.last_name} transféré de {old_class_name} vers {new_class.name}"
)
@router.post("/departure", response_model=DepartureResponse)
async def record_departure(
departure_data: DepartureRequest,
session: AsyncSessionDep,
):
"""
Enregistre le départ d'un élève de sa classe actuelle.
"""
# Récupérer l'élève
student_query = select(Student).where(Student.id == departure_data.student_id)
student_result = await session.execute(student_query)
student = student_result.scalar_one_or_none()
if not student:
raise HTTPException(status_code=404, detail="Élève non trouvé")
# Trouver l'inscription active
active_query = (
select(StudentEnrollment)
.options(selectinload(StudentEnrollment.class_group))
.where(
StudentEnrollment.student_id == departure_data.student_id,
StudentEnrollment.departure_date.is_(None)
)
)
active_result = await session.execute(active_query)
enrollment = active_result.scalar_one_or_none()
if not enrollment:
raise HTTPException(
status_code=400,
detail=f"Aucune inscription active trouvée pour l'élève {student.first_name} {student.last_name}"
)
class_name = enrollment.class_group.name
# Enregistrer le départ
enrollment.departure_date = departure_data.departure_date
enrollment.departure_reason = departure_data.departure_reason
await session.commit()
return DepartureResponse(
enrollment_id=enrollment.id,
student_name=f"{student.first_name} {student.last_name}",
class_name=class_name,
message=f"Départ de {student.first_name} {student.last_name} de {class_name} enregistré"
)