clean: clean python code

This commit is contained in:
2025-08-07 14:16:15 +02:00
parent db96807422
commit 8cee665f2b
18 changed files with 316 additions and 936 deletions

19
app.py
View File

@@ -1,7 +1,7 @@
import os
import logging
from flask import Flask, render_template
from models import db, Assessment, Student, ClassGroup
from models import db, ClassGroup
from repositories import AssessmentRepository, StudentRepository
from commands import init_db, create_large_test_data
from app_config_classes import config
from app_config import config_manager
@@ -52,9 +52,12 @@ def create_app(config_name=None):
@app.route('/')
def index():
try:
recent_assessments = Assessment.query.order_by(Assessment.date.desc()).limit(5).all()
total_students = Student.query.count()
total_assessments = Assessment.query.count()
assessment_repo = AssessmentRepository()
student_repo = StudentRepository()
recent_assessments = assessment_repo.find_recent(5)
total_students = student_repo.model_class.query.count() # Keeping simple count
total_assessments = assessment_repo.model_class.query.count() # Keeping simple count
total_classes = ClassGroup.query.count()
return render_template('index.html',
recent_assessments=recent_assessments,
@@ -78,10 +81,8 @@ def create_app(config_name=None):
def students():
try:
# Optimisation: utiliser joinedload pour éviter les requêtes N+1
from sqlalchemy.orm import joinedload
students = Student.query.options(joinedload(Student.class_group)).order_by(
ClassGroup.name, Student.last_name, Student.first_name
).join(ClassGroup).all()
student_repo = StudentRepository()
students = student_repo.find_all_with_class_ordered()
return render_template('students.html', students=students)
except Exception as e:
app.logger.error(f'Erreur lors du chargement des étudiants: {e}')

View File

@@ -1,5 +1,4 @@
from typing import Dict, Any, Optional, List
from flask import current_app
from models import db, AppConfig, CompetenceScaleValue, Competence
class ConfigManager:
@@ -279,7 +278,8 @@ class ConfigManager:
return True
except Exception as e:
db.session.rollback()
print(f"Erreur lors de la sauvegarde de la configuration: {e}")
from flask import current_app
current_app.logger.error(f"Erreur lors de la sauvegarde de la configuration: {e}", exc_info=True)
return False
def get_competence_scale_values(self) -> Dict[str, Dict[str, Any]]:
@@ -504,7 +504,8 @@ class ConfigManager:
return True
except Exception as e:
db.session.rollback()
print(f"Erreur lors de l'ajout de la compétence: {e}")
from flask import current_app
current_app.logger.error(f"Erreur lors de l'ajout de la compétence: {e}", exc_info=True)
return False
def update_competence(self, competence_id: int, name: str, color: str, icon: str) -> bool:
@@ -520,7 +521,8 @@ class ConfigManager:
return False
except Exception as e:
db.session.rollback()
print(f"Erreur lors de la mise à jour de la compétence: {e}")
from flask import current_app
current_app.logger.error(f"Erreur lors de la mise à jour de la compétence: {e}", exc_info=True)
return False
def delete_competence(self, competence_id: int) -> bool:
@@ -534,7 +536,8 @@ class ConfigManager:
return False
except Exception as e:
db.session.rollback()
print(f"Erreur lors de la suppression de la compétence: {e}")
from flask import current_app
current_app.logger.error(f"Erreur lors de la suppression de la compétence: {e}", exc_info=True)
return False
# Méthodes spécifiques pour la gestion de l'échelle
@@ -553,7 +556,8 @@ class ConfigManager:
return True
except Exception as e:
db.session.rollback()
print(f"Erreur lors de l'ajout de la valeur d'échelle: {e}")
from flask import current_app
current_app.logger.error(f"Erreur lors de l'ajout de la valeur d'échelle: {e}", exc_info=True)
return False
def update_scale_value(self, value: str, label: str, color: str, included_in_total: bool) -> bool:
@@ -569,7 +573,8 @@ class ConfigManager:
return False
except Exception as e:
db.session.rollback()
print(f"Erreur lors de la mise à jour de la valeur d'échelle: {e}")
from flask import current_app
current_app.logger.error(f"Erreur lors de la mise à jour de la valeur d'échelle: {e}", exc_info=True)
return False
def delete_scale_value(self, value: str) -> bool:
@@ -583,7 +588,8 @@ class ConfigManager:
return False
except Exception as e:
db.session.rollback()
print(f"Erreur lors de la suppression de la valeur d'échelle: {e}")
from flask import current_app
current_app.logger.error(f"Erreur lors de la suppression de la valeur d'échelle: {e}", exc_info=True)
return False

View File

@@ -1,334 +0,0 @@
#!/usr/bin/env python3
"""
Benchmark Final de Migration - JOUR 7
Script de benchmark complet pour mesurer les performances de la nouvelle
architecture refactorisée vs l'ancienne implémentation legacy.
Mesure les performances de tous les services migrés:
- AssessmentProgressService
- StudentScoreCalculator avec UnifiedGradingCalculator
- AssessmentStatisticsService
- Pattern Strategy vs logique conditionnelle
Génère un rapport complet de performance avec métriques détaillées.
"""
import time
import statistics
import traceback
from typing import Dict, List, Any, Tuple
from contextlib import contextmanager
from dataclasses import dataclass
from flask import Flask
from models import db, Assessment
import os
@dataclass
class BenchmarkResult:
"""Résultat d'un benchmark avec métriques détaillées."""
service_name: str
old_time: float
new_time: float
iterations: int
improvement_percent: float
old_times: List[float]
new_times: List[float]
@property
def old_avg(self) -> float:
return statistics.mean(self.old_times)
@property
def new_avg(self) -> float:
return statistics.mean(self.new_times)
@property
def old_std(self) -> float:
return statistics.stdev(self.old_times) if len(self.old_times) > 1 else 0.0
@property
def new_std(self) -> float:
return statistics.stdev(self.new_times) if len(self.new_times) > 1 else 0.0
class MigrationBenchmark:
"""Benchmark complet de la migration avec mesures détaillées."""
def __init__(self):
self.app = self._create_app()
self.results: List[BenchmarkResult] = []
def _create_app(self) -> Flask:
"""Crée l'application Flask pour les tests."""
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///school_management.db'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
db.init_app(app)
return app
@contextmanager
def _feature_flags_context(self, enabled: bool):
"""Context manager pour activer/désactiver les feature flags."""
env_vars = [
'FEATURE_FLAG_USE_STRATEGY_PATTERN',
'FEATURE_FLAG_USE_REFACTORED_ASSESSMENT',
'FEATURE_FLAG_USE_NEW_STUDENT_SCORE_CALCULATOR',
'FEATURE_FLAG_USE_NEW_ASSESSMENT_STATISTICS_SERVICE'
]
# Sauvegarder l'état actuel
old_values = {var: os.environ.get(var) for var in env_vars}
try:
# Configurer les nouveaux feature flags
value = 'true' if enabled else 'false'
for var in env_vars:
os.environ[var] = value
yield
finally:
# Restaurer l'état précédent
for var, old_value in old_values.items():
if old_value is None:
os.environ.pop(var, None)
else:
os.environ[var] = old_value
def _benchmark_service(self,
service_name: str,
test_function: callable,
iterations: int = 100) -> BenchmarkResult:
"""
Benchmark un service avec l'ancienne et nouvelle implémentation.
Args:
service_name: Nom du service testé
test_function: Fonction de test qui prend (assessment) en paramètre
iterations: Nombre d'itérations pour la mesure
"""
with self.app.app_context():
assessment = Assessment.query.first()
if not assessment:
raise ValueError("Aucune évaluation trouvée pour le benchmark")
print(f"\n🔥 Benchmark {service_name}:")
print(f" Évaluation ID: {assessment.id}, Itérations: {iterations}")
# === BENCHMARK ANCIEN SYSTÈME ===
print(" 📊 Mesure ancienne implémentation...")
old_times = []
with self._feature_flags_context(enabled=False):
# Préchauffage
for _ in range(5):
try:
test_function(assessment)
except Exception:
pass
# Mesures
for i in range(iterations):
start_time = time.perf_counter()
try:
test_function(assessment)
end_time = time.perf_counter()
old_times.append(end_time - start_time)
except Exception as e:
print(f" ⚠️ Erreur itération {i}: {str(e)}")
continue
# === BENCHMARK NOUVEAU SYSTÈME ===
print(" 🚀 Mesure nouvelle implémentation...")
new_times = []
with self._feature_flags_context(enabled=True):
# Préchauffage
for _ in range(5):
try:
test_function(assessment)
except Exception:
pass
# Mesures
for i in range(iterations):
start_time = time.perf_counter()
try:
test_function(assessment)
end_time = time.perf_counter()
new_times.append(end_time - start_time)
except Exception as e:
print(f" ⚠️ Erreur itération {i}: {str(e)}")
continue
# === CALCUL DES RÉSULTATS ===
if not old_times or not new_times:
print(f" ❌ Données insuffisantes pour {service_name}")
return None
old_avg = statistics.mean(old_times)
new_avg = statistics.mean(new_times)
improvement = ((old_avg - new_avg) / old_avg) * 100
result = BenchmarkResult(
service_name=service_name,
old_time=old_avg,
new_time=new_avg,
iterations=len(new_times),
improvement_percent=improvement,
old_times=old_times,
new_times=new_times
)
print(f" ✅ Ancien: {old_avg*1000:.2f}ms, Nouveau: {new_avg*1000:.2f}ms")
print(f" 🎯 Amélioration: {improvement:+.1f}%")
return result
def benchmark_grading_progress(self) -> BenchmarkResult:
"""Benchmark de la progression des notes."""
def test_func(assessment):
return assessment.grading_progress
return self._benchmark_service("AssessmentProgressService", test_func, 50)
def benchmark_student_scores(self) -> BenchmarkResult:
"""Benchmark du calcul des scores étudiants."""
def test_func(assessment):
return assessment.calculate_student_scores()
return self._benchmark_service("StudentScoreCalculator", test_func, 30)
def benchmark_statistics(self) -> BenchmarkResult:
"""Benchmark des statistiques d'évaluation."""
def test_func(assessment):
return assessment.get_assessment_statistics()
return self._benchmark_service("AssessmentStatisticsService", test_func, 30)
def benchmark_grading_calculator(self) -> BenchmarkResult:
"""Benchmark du Pattern Strategy vs logique conditionnelle."""
from models import GradingCalculator
def test_func(_):
# Test de différents types de calculs
GradingCalculator.calculate_score("15.5", "notes", 20)
GradingCalculator.calculate_score("2", "score", 3)
GradingCalculator.calculate_score(".", "notes", 20)
GradingCalculator.calculate_score("d", "score", 3)
return self._benchmark_service("UnifiedGradingCalculator", test_func, 200)
def run_complete_benchmark(self) -> List[BenchmarkResult]:
"""Lance le benchmark complet de tous les services."""
print("🚀 BENCHMARK COMPLET DE MIGRATION - JOUR 7")
print("=" * 70)
print("Mesure des performances : Ancienne vs Nouvelle Architecture")
benchmarks = [
("1. Progression des notes", self.benchmark_grading_progress),
("2. Calcul scores étudiants", self.benchmark_student_scores),
("3. Statistiques évaluation", self.benchmark_statistics),
("4. Calculateur de notation", self.benchmark_grading_calculator),
]
for description, benchmark_func in benchmarks:
print(f"\n📊 {description}")
try:
result = benchmark_func()
if result:
self.results.append(result)
except Exception as e:
print(f"❌ Erreur benchmark {description}: {str(e)}")
traceback.print_exc()
return self.results
def generate_report(self) -> str:
"""Génère un rapport détaillé des performances."""
if not self.results:
return "❌ Aucun résultat de benchmark disponible"
report = []
report.append("🏆 RAPPORT FINAL DE MIGRATION - JOUR 7")
report.append("=" * 80)
report.append(f"Date: {time.strftime('%Y-%m-%d %H:%M:%S')}")
report.append(f"Services testés: {len(self.results)}")
report.append("")
# === RÉSUMÉ EXÉCUTIF ===
improvements = [r.improvement_percent for r in self.results]
avg_improvement = statistics.mean(improvements)
report.append("📈 RÉSUMÉ EXÉCUTIF:")
report.append(f" Amélioration moyenne: {avg_improvement:+.1f}%")
report.append(f" Meilleure amélioration: {max(improvements):+.1f}% ({max(self.results, key=lambda r: r.improvement_percent).service_name})")
report.append(f" Services améliorés: {sum(1 for i in improvements if i > 0)}/{len(improvements)}")
report.append("")
# === DÉTAIL PAR SERVICE ===
report.append("📊 DÉTAIL PAR SERVICE:")
report.append("")
for result in self.results:
report.append(f"🔹 {result.service_name}")
report.append(f" Ancien temps: {result.old_avg*1000:8.2f}ms ± {result.old_std*1000:.2f}ms")
report.append(f" Nouveau temps: {result.new_avg*1000:8.2f}ms ± {result.new_std*1000:.2f}ms")
report.append(f" Amélioration: {result.improvement_percent:+8.1f}%")
report.append(f" Itérations: {result.iterations:8d}")
# Facteur d'amélioration
if result.new_avg > 0:
speedup = result.old_avg / result.new_avg
report.append(f" Accélération: {speedup:8.2f}x")
report.append("")
# === ANALYSE TECHNIQUE ===
report.append("🔧 ANALYSE TECHNIQUE:")
report.append("")
positive_results = [r for r in self.results if r.improvement_percent > 0]
negative_results = [r for r in self.results if r.improvement_percent <= 0]
if positive_results:
report.append("✅ Services améliorés:")
for result in positive_results:
report.append(f"{result.service_name}: {result.improvement_percent:+.1f}%")
report.append("")
if negative_results:
report.append("⚠️ Services avec régression:")
for result in negative_results:
report.append(f"{result.service_name}: {result.improvement_percent:+.1f}%")
report.append("")
# === CONCLUSION ===
report.append("🎯 CONCLUSION:")
if avg_improvement > 0:
report.append(f"✅ Migration réussie avec {avg_improvement:.1f}% d'amélioration moyenne")
report.append("✅ Architecture refactorisée plus performante")
report.append("✅ Objectif de performance atteint")
else:
report.append(f"⚠️ Performance globale: {avg_improvement:+.1f}%")
report.append("⚠️ Analyse des régressions nécessaire")
report.append("")
report.append("🚀 Prêt pour la production avec la nouvelle architecture !")
return "\n".join(report)
if __name__ == "__main__":
benchmark = MigrationBenchmark()
results = benchmark.run_complete_benchmark()
print("\n" + "=" * 70)
report = benchmark.generate_report()
print(report)
# Sauvegarder le rapport
with open("migration_final_benchmark_report.txt", "w") as f:
f.write(report)
print(f"\n💾 Rapport sauvegardé dans: migration_final_benchmark_report.txt")

View File

@@ -1,16 +0,0 @@
from app import create_app
app = create_app('development')
with app.test_client() as client:
response = client.get('/assessments/1/grading')
content = response.get_data(as_text=True)
# Chercher une section plus large
start = content.find('special_values: {')
if start != -1:
end = start + 300
config_section = content[start:end]
print('Configuration JavaScript complète:')
print(config_section)
else:
print('Section special_values non trouvée')

View File

@@ -1,428 +0,0 @@
#!/usr/bin/env python3
"""
Script de Nettoyage Code Legacy (JOUR 7 - Étape 4.3)
Ce script nettoie sélectivement le code legacy maintenant que la migration est terminée.
Il procède par étapes sécurisées avec possibilité de rollback à chaque étape.
APPROCHE SÉCURISÉE:
1. Identifier le code legacy inutilisé (avec feature flags actifs)
2. Commenter le code legacy plutôt que le supprimer
3. Maintenir les feature flags pour rollback possible
4. Tests après chaque nettoyage
Ce script suit le principe: "Préserver la stabilité avant tout"
"""
import os
import sys
import re
import time
import subprocess
from pathlib import Path
from datetime import datetime
def setup_flask_context():
"""Configure le contexte Flask pour les tests."""
project_root = Path(__file__).parent
if str(project_root) not in sys.path:
sys.path.insert(0, str(project_root))
from app import create_app
app = create_app()
ctx = app.app_context()
ctx.push()
return app, ctx
def run_all_tests():
"""Exécute tous les tests pour vérifier la stabilité."""
result = subprocess.run([
sys.executable, "-m", "pytest",
"tests/", "-v", "--tb=short", "--disable-warnings", "-q"
], capture_output=True, text=True)
return result.returncode == 0, result.stdout
def create_backup():
"""Crée une sauvegarde avant nettoyage."""
backup_dir = f"backups/pre_cleanup_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
os.makedirs(backup_dir, exist_ok=True)
# Sauvegarder les fichiers critiques
critical_files = [
"models.py",
"services/assessment_services.py",
"config/feature_flags.py"
]
for file_path in critical_files:
if os.path.exists(file_path):
subprocess.run(["cp", file_path, f"{backup_dir}/"], check=True)
print(f"✅ Sauvegarde créée: {backup_dir}")
return backup_dir
def analyze_legacy_code():
"""
Analyse le code legacy qui peut être nettoyé maintenant que les feature flags sont actifs.
"""
print("🔍 ANALYSE DU CODE LEGACY À NETTOYER")
print("=" * 50)
legacy_findings = {
"legacy_methods": [],
"dead_code_blocks": [],
"unused_imports": [],
"commented_code": []
}
# 1. Méthodes legacy dans models.py
with open("models.py", 'r') as f:
content = f.read()
# Chercher les méthodes _legacy
legacy_methods = re.findall(r'def (_\w*legacy\w*)\(.*?\):', content)
legacy_findings["legacy_methods"] = legacy_methods
# Chercher les blocs de code commenté
commented_blocks = re.findall(r'^\s*#.*(?:\n\s*#.*)*', content, re.MULTILINE)
legacy_findings["commented_code"] = [block for block in commented_blocks if len(block) > 100]
# 2. Tests obsolètes ou dupliqués
test_files = ["tests/test_feature_flags.py", "tests/test_pattern_strategy_migration.py"]
for test_file in test_files:
if os.path.exists(test_file):
# Ces tests sont maintenant permanents, pas legacy
pass
print(f"📋 Legacy methods trouvées: {len(legacy_findings['legacy_methods'])}")
for method in legacy_findings["legacy_methods"]:
print(f" - {method}")
print(f"📋 Blocs commentés longs: {len(legacy_findings['commented_code'])}")
return legacy_findings
def selective_code_cleanup():
"""
Nettoyage SÉLECTIF et CONSERVATEUR du code.
Principe: Ne nettoyer QUE ce qui est garantit sûr
- NE PAS supprimer les feature flags (rollback nécessaire)
- NE PAS supprimer les méthodes legacy (sécurité)
- Nettoyer SEULEMENT les commentaires anciens et imports inutilisés
"""
print("\n🧹 NETTOYAGE SÉLECTIF DU CODE")
print("=" * 50)
cleanup_summary = {
"files_cleaned": 0,
"lines_removed": 0,
"comments_cleaned": 0,
"imports_removed": 0
}
# NETTOYAGE TRÈS CONSERVATEUR
files_to_clean = [
"models.py",
"services/assessment_services.py"
]
for file_path in files_to_clean:
if not os.path.exists(file_path):
continue
print(f"\n📄 Nettoyage de {file_path}...")
with open(file_path, 'r') as f:
original_content = f.read()
cleaned_content = original_content
lines_removed = 0
# 1. NETTOYER SEULEMENT: Lignes de debug print() temporaires
debug_lines = re.findall(r'^\s*print\s*\([^)]*\)\s*$', original_content, re.MULTILINE)
if debug_lines:
print(f" Trouvé {len(debug_lines)} lignes print() de debug")
# Pour la sécurité, on les commente au lieu de les supprimer
for debug_line in debug_lines:
cleaned_content = cleaned_content.replace(debug_line, f"# DEBUG REMOVED: {debug_line.strip()}")
lines_removed += 1
# 2. NETTOYER: Commentaires TODOs résolus (très sélectif)
# On cherche seulement les TODOs explicitement marqués comme résolus
resolved_todos = re.findall(r'^\s*# TODO:.*RESOLVED.*$', original_content, re.MULTILINE)
for todo in resolved_todos:
cleaned_content = cleaned_content.replace(todo, "")
lines_removed += 1
# 3. NETTOYER: Imports potentiellement inutilisés (TRÈS CONSERVATEUR)
# Ne nettoyer QUE les imports explicitement marqués comme temporaires
temp_imports = re.findall(r'^\s*# TEMP IMPORT:.*$', original_content, re.MULTILINE)
for temp_import in temp_imports:
cleaned_content = cleaned_content.replace(temp_import, "")
lines_removed += 1
# Sauvegarder seulement si il y a eu des modifications
if cleaned_content != original_content:
with open(file_path, 'w') as f:
f.write(cleaned_content)
cleanup_summary["files_cleaned"] += 1
cleanup_summary["lines_removed"] += lines_removed
print(f"{lines_removed} lignes nettoyées")
else:
print(f" Aucun nettoyage nécessaire")
print("\n📊 RÉSUMÉ DU NETTOYAGE:")
print(f" Fichiers nettoyés: {cleanup_summary['files_cleaned']}")
print(f" Lignes supprimées: {cleanup_summary['lines_removed']}")
print(f" Approche: CONSERVATRICE (préservation maximale)")
return cleanup_summary
def update_documentation():
"""Met à jour la documentation pour refléter l'architecture finale."""
print("\n📚 MISE À JOUR DOCUMENTATION")
print("=" * 50)
# Mettre à jour MIGRATION_PROGRESSIVE.md avec le statut final
migration_doc_path = "MIGRATION_PROGRESSIVE.md"
if os.path.exists(migration_doc_path):
with open(migration_doc_path, 'r') as f:
content = f.read()
# Ajouter un header indiquant que la migration est terminée
if "🎉 MIGRATION TERMINÉE" not in content:
final_status = f"""
---
## 🎉 MIGRATION TERMINÉE AVEC SUCCÈS
**Date de finalisation:** {datetime.now().strftime('%d/%m/%Y à %H:%M:%S')}
**État:** PRODUCTION READY ✅
**Feature flags:** Tous actifs et fonctionnels
**Tests:** 214+ tests passants
**Architecture:** Services découplés opérationnels
**Actions réalisées:**
- ✅ Étape 4.1: Activation définitive des feature flags
- ✅ Étape 4.2: Tests finaux et validation complète
- ✅ Étape 4.3: Nettoyage conservateur du code
- ✅ Documentation mise à jour
**Prochaines étapes recommandées:**
1. Surveillance performance en production (2 semaines)
2. Formation équipe sur nouvelle architecture
3. Nettoyage approfondi du legacy (optionnel, après validation)
{content}"""
with open(migration_doc_path, 'w') as f:
f.write(final_status)
print(f"{migration_doc_path} mis à jour avec statut final")
# Créer un fichier ARCHITECTURE_FINAL.md
arch_doc_path = "ARCHITECTURE_FINAL.md"
architecture_content = f"""# 🏗️ ARCHITECTURE FINALE - NOTYTEX
**Date de finalisation:** {datetime.now().strftime('%d/%m/%Y à %H:%M:%S')}
**Version:** Services Découplés - Phase 2 Complète
## 📋 Services Créés
### 1. AssessmentProgressService
- **Responsabilité:** Calcul de progression de correction
- **Emplacement:** `services/assessment_services.py`
- **Interface:** `calculate_grading_progress(assessment) -> ProgressResult`
- **Optimisations:** Requêtes optimisées, élimination N+1
### 2. StudentScoreCalculator
- **Responsabilité:** Calculs de scores pour tous les étudiants
- **Emplacement:** `services/assessment_services.py`
- **Interface:** `calculate_student_scores(assessment) -> List[StudentScore]`
- **Optimisations:** Calculs en batch, requêtes optimisées
### 3. AssessmentStatisticsService
- **Responsabilité:** Analyses statistiques (moyenne, médiane, etc.)
- **Emplacement:** `services/assessment_services.py`
- **Interface:** `get_assessment_statistics(assessment) -> StatisticsResult`
- **Optimisations:** Agrégations SQL, calculs optimisés
### 4. UnifiedGradingCalculator
- **Responsabilité:** Logique de notation centralisée avec Pattern Strategy
- **Emplacement:** `services/assessment_services.py`
- **Interface:** `calculate_score(grade_value, grading_type, max_points)`
- **Extensibilité:** Ajout de nouveaux types sans modification code
## 🔧 Pattern Strategy Opérationnel
### GradingStrategy (Interface)
```python
class GradingStrategy:
def calculate_score(self, grade_value: str, max_points: float) -> Optional[float]
```
### Implémentations
- **NotesStrategy:** Pour notation numérique (0-20, etc.)
- **ScoreStrategy:** Pour notation par compétences (0-3)
- **Extensible:** Nouveaux types via simple implémentation interface
### Factory
```python
factory = GradingStrategyFactory()
strategy = factory.create(grading_type)
score = strategy.calculate_score(grade_value, max_points)
```
## 🔌 Injection de Dépendances
### Providers (Interfaces)
- **ConfigProvider:** Accès configuration
- **DatabaseProvider:** Accès base de données
### Implémentations
- **ConfigManagerProvider:** Via app_config manager
- **SQLAlchemyDatabaseProvider:** Via SQLAlchemy
### Bénéfices
- Élimination imports circulaires
- Tests unitaires 100% mockables
- Découplage architecture
## 🚀 Feature Flags System
### Flags de Migration (ACTIFS)
- `use_strategy_pattern`: Pattern Strategy actif
- `use_refactored_assessment`: Nouveau service progression
- `use_new_student_score_calculator`: Nouveau calculateur scores
- `use_new_assessment_statistics_service`: Nouveau service stats
### Sécurité
- Rollback instantané possible
- Logging automatique des changements
- Configuration via variables d'environnement
## 📊 Métriques de Qualité
| Métrique | Avant | Après | Amélioration |
|----------|-------|-------|--------------|
| Modèle Assessment | 267 lignes | 80 lignes | -70% |
| Responsabilités | 4 | 1 | SRP respecté |
| Imports circulaires | 3 | 0 | 100% éliminés |
| Services découplés | 0 | 4 | Architecture moderne |
| Tests passants | Variable | 214+ | Stabilité |
## 🔮 Extensibilité Future
### Nouveaux Types de Notation
1. Créer nouvelle `GradingStrategy`
2. Enregistrer dans `GradingStrategyFactory`
3. Aucune modification code existant nécessaire
### Nouveaux Services
1. Implémenter interfaces `ConfigProvider`/`DatabaseProvider`
2. Injection via constructeurs
3. Tests unitaires avec mocks
### Optimisations
- Cache Redis pour calculs coûteux
- Pagination pour grandes listes
- API REST pour intégrations
---
**Cette architecture respecte les principes SOLID et est prête pour la production et l'évolution future.** 🚀
"""
with open(arch_doc_path, 'w') as f:
f.write(architecture_content)
print(f"{arch_doc_path} créé")
return ["MIGRATION_PROGRESSIVE.md", "ARCHITECTURE_FINAL.md"]
def main():
"""Fonction principale de nettoyage legacy."""
print("🧹 NETTOYAGE CODE LEGACY - JOUR 7 ÉTAPE 4.3")
print("=" * 60)
print("APPROCHE: Nettoyage CONSERVATEUR avec préservation maximale")
print("=" * 60)
try:
# Configuration Flask
app, ctx = setup_flask_context()
print("✅ Contexte Flask configuré")
# Tests initiaux pour s'assurer que tout fonctionne
print("\n🧪 TESTS INITIAUX...")
tests_ok, test_output = run_all_tests()
if not tests_ok:
raise RuntimeError("Tests initiaux échoués - arrêt du nettoyage")
print("✅ Tests initiaux passent")
# Sauvegarde de sécurité
backup_dir = create_backup()
# Analyse du code legacy
legacy_analysis = analyze_legacy_code()
# Décision: NETTOYAGE TRÈS CONSERVATEUR SEULEMENT
print("\n⚖️ DÉCISION DE NETTOYAGE:")
print(" Approche choisie: CONSERVATRICE MAXIMALE")
print(" Raison: Stabilité prioritaire, feature flags maintiennent rollback")
print(" Action: Nettoyage minimal seulement (debug lines, TODOs résolus)")
# Nettoyage sélectif
cleanup_results = selective_code_cleanup()
# Tests après nettoyage
print("\n🧪 TESTS APRÈS NETTOYAGE...")
tests_ok, test_output = run_all_tests()
if not tests_ok:
print("❌ Tests échoués après nettoyage - ROLLBACK recommandé")
print(f" Restaurer depuis: {backup_dir}")
return False
print("✅ Tests après nettoyage passent")
# Mise à jour documentation
updated_docs = update_documentation()
# Nettoyage contexte
ctx.pop()
print("\n" + "=" * 60)
print("✅ NETTOYAGE LEGACY TERMINÉ AVEC SUCCÈS")
print("=" * 60)
print("📊 RÉSULTATS:")
print(f" • Fichiers nettoyés: {cleanup_results['files_cleaned']}")
print(f" • Lignes supprimées: {cleanup_results['lines_removed']}")
print(f" • Documentation mise à jour: {len(updated_docs)} fichiers")
print(f" • Sauvegarde créée: {backup_dir}")
print(f" • Tests: ✅ PASSENT")
print("\n🚀 ÉTAT FINAL:")
print(" • Architecture moderne opérationnelle")
print(" • Feature flags actifs (rollback possible)")
print(" • 214+ tests passants")
print(" • Code legacy préservé par sécurité")
print(" • Documentation à jour")
print("\n📋 PROCHAINES ÉTAPES RECOMMANDÉES:")
print(" 1. Déployer en production avec surveillance")
print(" 2. Monitorer pendant 2-4 semaines")
print(" 3. Formation équipe sur nouvelle architecture")
print(" 4. Nettoyage approfondi legacy (optionnel après validation)")
print(" 5. Optimisations performance si nécessaire")
return True
except Exception as e:
print(f"❌ ERREUR DURANT NETTOYAGE: {str(e)}")
print(f"🔄 ROLLBACK: Restaurer depuis {backup_dir if 'backup_dir' in locals() else 'sauvegarde'}")
return False
if __name__ == "__main__":
success = main()
sys.exit(0 if success else 1)

View File

@@ -1,17 +0,0 @@
from app import create_app
app = create_app('development')
with app.test_client() as client:
response = client.get('/assessments/1/grading')
content = response.get_data(as_text=True)
# Chercher une section plus large
start = content.find('special_values: {')
if start != -1:
# Chercher jusqu'à la fermeture du bloc
end = start + 500 # Prendre plus de contexte
config_section = content[start:end]
print('Configuration JavaScript:')
print(config_section)
else:
print('Section special_values non trouvée')

View File

@@ -1,9 +1,14 @@
from flask_wtf import FlaskForm
from wtforms import StringField, TextAreaField, FloatField, SelectField, DateField, IntegerField, SubmitField
from wtforms import StringField, TextAreaField, FloatField, SelectField, DateField, SubmitField
from wtforms.validators import DataRequired, Email, NumberRange, Optional, Length
from datetime import date
from models import ClassGroup
# Utilitaire pour éviter la duplication dans l'initialisation des choix de classe
def _populate_class_choices(field):
"""Remplit les choix d'un champ SelectField avec les classes disponibles."""
field.choices = [(cg.id, cg.name) for cg in ClassGroup.query.order_by(ClassGroup.name).all()]
class AssessmentForm(FlaskForm):
title = StringField('Titre', validators=[DataRequired(), Length(max=200)])
description = TextAreaField('Description', validators=[Optional()])
@@ -16,7 +21,7 @@ class AssessmentForm(FlaskForm):
def __init__(self, *args, **kwargs):
super(AssessmentForm, self).__init__(*args, **kwargs)
self.class_group_id.choices = [(cg.id, cg.name) for cg in ClassGroup.query.order_by(ClassGroup.name).all()]
_populate_class_choices(self.class_group_id)
class ClassGroupForm(FlaskForm):
name = StringField('Nom de la classe', validators=[DataRequired(), Length(max=100)])
@@ -33,7 +38,7 @@ class StudentForm(FlaskForm):
def __init__(self, *args, **kwargs):
super(StudentForm, self).__init__(*args, **kwargs)
self.class_group_id.choices = [(cg.id, cg.name) for cg in ClassGroup.query.order_by(ClassGroup.name).all()]
_populate_class_choices(self.class_group_id)
# Formulaires ExerciseForm et GradingElementForm supprimés
# Ces éléments sont maintenant gérés via le formulaire unifié AssessmentForm

View File

@@ -1,9 +1,7 @@
from flask_sqlalchemy import SQLAlchemy
from datetime import datetime
from sqlalchemy import Index, CheckConstraint, Enum
from decimal import Decimal
from sqlalchemy import CheckConstraint, Enum
from typing import Optional, Dict, Any
from flask import current_app
db = SQLAlchemy()
@@ -275,16 +273,20 @@ class Assessment(db.Model):
'students_count': total_students
}
def calculate_student_scores(self):
def calculate_student_scores(self, grade_repo=None):
"""Calcule les scores de tous les élèves pour cette évaluation.
Retourne un dictionnaire avec les scores par élève et par exercice.
Logique de calcul simplifiée avec 2 types seulement."""
Logique de calcul simplifiée avec 2 types seulement.
Args:
grade_repo: Repository des notes (optionnel, pour l'injection de dépendances)
"""
# Feature flag pour migration progressive vers services optimisés
from config.feature_flags import is_feature_enabled, FeatureFlag
if is_feature_enabled(FeatureFlag.USE_REFACTORED_ASSESSMENT):
return self._calculate_student_scores_optimized()
return self._calculate_student_scores_legacy()
return self._calculate_student_scores_legacy(grade_repo)
def _calculate_student_scores_optimized(self):
"""Version optimisée avec services découplés et requête unique."""
@@ -312,7 +314,7 @@ class Assessment(db.Model):
return students_scores, exercise_scores
def _calculate_student_scores_legacy(self):
def _calculate_student_scores_legacy(self, grade_repo=None):
"""Version legacy avec requêtes N+1 - à conserver temporairement."""
from collections import defaultdict
@@ -329,10 +331,14 @@ class Assessment(db.Model):
exercise_max_points = 0
for element in exercise.grading_elements:
grade = Grade.query.filter_by(
student_id=student.id,
grading_element_id=element.id
).first()
if grade_repo:
grade = grade_repo.find_by_student_and_element(student.id, element.id)
else:
# Fallback vers l'ancienne méthode
grade = Grade.query.filter_by(
student_id=student.id,
grading_element_id=element.id
).first()
# Si une note a été saisie pour cet élément (y compris valeurs spéciales)
if grade and grade.value and grade.value != '':

View File

@@ -1 +1,13 @@
# Repositories module
# Repositories module
from .base_repository import BaseRepository
from .assessment_repository import AssessmentRepository
from .student_repository import StudentRepository
from .grade_repository import GradeRepository
__all__ = [
'BaseRepository',
'AssessmentRepository',
'StudentRepository',
'GradeRepository'
]

View File

@@ -17,7 +17,7 @@ class AssessmentRepository(BaseRepository[Assessment]):
class_id: Optional[int] = None,
sort_by: str = 'date_desc'
) -> List[Assessment]:
"""Trouve les évaluations selon les filtres."""
"""Trouve les évaluations selon les filtres avec eager loading des classes."""
query = Assessment.query.options(
joinedload(Assessment.class_group)
)
@@ -44,6 +44,28 @@ class AssessmentRepository(BaseRepository[Assessment]):
joinedload(Assessment.exercises).joinedload(Exercise.grading_elements)
).filter_by(id=id).first()
def get_or_404(self, id: int) -> Assessment:
"""Récupère une évaluation ou lève une erreur 404."""
return Assessment.query.get_or_404(id)
def get_with_class_or_404(self, id: int) -> Assessment:
"""Récupère une évaluation avec sa classe ou lève une erreur 404."""
from flask import abort
assessment = Assessment.query.options(
joinedload(Assessment.class_group)
).filter_by(id=id).first()
if not assessment:
abort(404)
return assessment
def get_with_full_details_or_404(self, id: int) -> Assessment:
"""Récupère une évaluation avec tous ses détails ou lève une erreur 404."""
from flask import abort
assessment = self.find_with_full_details(id)
if not assessment:
abort(404)
return assessment
def find_recent(self, limit: int = 5) -> List[Assessment]:
"""Trouve les évaluations récentes."""
return Assessment.query.order_by(

View File

@@ -0,0 +1,100 @@
from typing import List, Optional, Dict, Any
from sqlalchemy.orm import joinedload
from models import Grade, GradingElement, Exercise, Assessment, Student
from .base_repository import BaseRepository
class GradeRepository(BaseRepository[Grade]):
"""Repository pour les notes."""
def __init__(self):
super().__init__(Grade)
def find_by_student_and_element(self, student_id: int, grading_element_id: int) -> Optional[Grade]:
"""Trouve une note par étudiant et élément de notation."""
return Grade.query.filter_by(
student_id=student_id,
grading_element_id=grading_element_id
).first()
def find_or_create_by_student_and_element(self, student_id: int, grading_element_id: int) -> Grade:
"""Trouve ou crée une note par étudiant et élément de notation."""
grade = self.find_by_student_and_element(student_id, grading_element_id)
if not grade:
grade = Grade(
student_id=student_id,
grading_element_id=grading_element_id
)
self.save(grade)
self.flush() # Pour obtenir l'ID
return grade
def find_by_assessment(self, assessment_id: int) -> List[Grade]:
"""Trouve toutes les notes d'une évaluation."""
return Grade.query.join(
GradingElement
).join(
Exercise
).filter_by(
assessment_id=assessment_id
).all()
def find_by_student(self, student_id: int) -> List[Grade]:
"""Trouve toutes les notes d'un étudiant."""
return Grade.query.filter_by(
student_id=student_id
).all()
def delete_by_student(self, student_id: int) -> int:
"""Supprime toutes les notes d'un étudiant. Retourne le nombre supprimé."""
count = Grade.query.filter_by(student_id=student_id).count()
Grade.query.filter_by(student_id=student_id).delete()
return count
def find_existing_grades_for_assessment(self, assessment_id: int) -> Dict[str, Grade]:
"""
Trouve les notes existantes d'une évaluation indexées par clé.
Clé format: "{student_id}_{grading_element_id}"
"""
existing_grades = {}
for grade in self.find_by_assessment(assessment_id):
key = f"{grade.student_id}_{grade.grading_element_id}"
existing_grades[key] = grade
return existing_grades
def bulk_update_or_create_grades(self, grade_data: List[Dict[str, Any]]) -> int:
"""
Met à jour ou crée plusieurs notes en lot.
Args:
grade_data: Liste de dictionnaires avec student_id, grading_element_id, value, comment
Returns:
Nombre de notes traitées
"""
count = 0
for data in grade_data:
grade = self.find_by_student_and_element(
data['student_id'],
data['grading_element_id']
)
if data.get('value') or data.get('comment'):
if not grade:
grade = Grade(
student_id=data['student_id'],
grading_element_id=data['grading_element_id'],
value=data.get('value'),
comment=data.get('comment')
)
self.save(grade)
else:
grade.value = data.get('value')
grade.comment = data.get('comment')
count += 1
elif grade:
# Supprimer si valeur et commentaire vides
self.delete(grade)
count += 1
return count

View File

@@ -0,0 +1,50 @@
from typing import List, Optional
from sqlalchemy.orm import joinedload
from models import Student, ClassGroup
from .base_repository import BaseRepository
class StudentRepository(BaseRepository[Student]):
"""Repository pour les étudiants."""
def __init__(self):
super().__init__(Student)
def find_by_class_ordered(self, class_group_id: int) -> List[Student]:
"""Trouve les étudiants d'une classe triés par nom."""
return Student.query.filter_by(
class_group_id=class_group_id
).order_by(
Student.last_name,
Student.first_name
).all()
def find_all_with_class_ordered(self) -> List[Student]:
"""Trouve tous les étudiants avec leur classe, triés par classe puis nom."""
return Student.query.options(
joinedload(Student.class_group)
).join(
ClassGroup
).order_by(
ClassGroup.name,
Student.last_name,
Student.first_name
).all()
def find_by_class_group(self, class_group_id: int) -> List[Student]:
"""Trouve tous les étudiants d'une classe."""
return Student.query.filter_by(
class_group_id=class_group_id
).all()
def count_by_class_group(self, class_group_id: int) -> int:
"""Compte les étudiants d'une classe."""
return Student.query.filter_by(
class_group_id=class_group_id
).count()
def find_with_class_group(self, id: int) -> Optional[Student]:
"""Trouve un étudiant avec sa classe."""
return Student.query.options(
joinedload(Student.class_group)
).filter_by(id=id).first()

View File

@@ -1,44 +1,28 @@
from flask import Blueprint, render_template, redirect, url_for, flash, request, jsonify, current_app
from models import db, Assessment, ClassGroup
from models import db, ClassGroup
from forms import AssessmentForm
# Import du service avec rétrocompatibilité gérée dans services/__init__.py
from services import AssessmentService
from repositories import AssessmentRepository
from utils import handle_db_errors, ValidationError
from datetime import datetime
bp = Blueprint('assessments', __name__, url_prefix='/assessments')
@bp.route('/')
@handle_db_errors
def list():
from sqlalchemy.orm import joinedload
assessment_repo = AssessmentRepository()
# Récupérer les paramètres de filtrage
trimester_filter = request.args.get('trimester', '')
class_filter = request.args.get('class', '')
sort_by = request.args.get('sort', 'date_desc')
# Construire la requête de base
query = Assessment.query.options(joinedload(Assessment.class_group))
# Appliquer les filtres
if trimester_filter:
query = query.filter(Assessment.trimester == int(trimester_filter))
if class_filter:
query = query.filter(Assessment.class_group_id == int(class_filter))
# Appliquer le tri
if sort_by == 'date_desc':
query = query.order_by(Assessment.date.desc())
elif sort_by == 'date_asc':
query = query.order_by(Assessment.date.asc())
elif sort_by == 'title':
query = query.order_by(Assessment.title.asc())
elif sort_by == 'class':
query = query.join(ClassGroup).order_by(ClassGroup.name.asc())
assessments = query.all()
# Utiliser le repository pour les filtres
assessments = assessment_repo.find_by_filters(
trimester=int(trimester_filter) if trimester_filter else None,
class_id=int(class_filter) if class_filter else None,
sort_by=sort_by
)
# Récupérer toutes les classes pour le filtre
classes = ClassGroup.query.order_by(ClassGroup.name.asc()).all()
@@ -55,12 +39,8 @@ def list():
@bp.route('/<int:id>')
@handle_db_errors
def detail(id):
from sqlalchemy.orm import joinedload
from models import Exercise, GradingElement
assessment = Assessment.query.options(
joinedload(Assessment.class_group),
joinedload(Assessment.exercises).joinedload(Exercise.grading_elements)
).get_or_404(id)
assessment_repo = AssessmentRepository()
assessment = assessment_repo.get_with_full_details_or_404(id)
return render_template('assessment_detail.html', assessment=assessment)
def _handle_unified_assessment_request(form, assessment=None, is_edit=False):
@@ -127,12 +107,8 @@ def _handle_unified_assessment_request(form, assessment=None, is_edit=False):
@bp.route('/<int:id>/edit', methods=['GET', 'POST'])
@handle_db_errors
def edit(id):
from sqlalchemy.orm import joinedload
from models import Exercise, GradingElement
assessment = Assessment.query.options(
joinedload(Assessment.class_group),
joinedload(Assessment.exercises).joinedload(Exercise.grading_elements)
).get_or_404(id)
assessment_repo = AssessmentRepository()
assessment = assessment_repo.get_with_full_details_or_404(id)
form = AssessmentForm(obj=assessment)
result = _handle_unified_assessment_request(form, assessment, is_edit=True)
@@ -199,13 +175,8 @@ def new():
@bp.route('/<int:id>/results')
@handle_db_errors
def results(id):
from sqlalchemy.orm import joinedload
from models import Exercise, GradingElement
assessment = Assessment.query.options(
joinedload(Assessment.class_group),
joinedload(Assessment.exercises).joinedload(Exercise.grading_elements)
).get_or_404(id)
assessment_repo = AssessmentRepository()
assessment = assessment_repo.get_with_full_details_or_404(id)
# Calculer les scores des élèves
students_scores, exercise_scores = assessment.calculate_student_scores()
@@ -231,7 +202,8 @@ def results(id):
@bp.route('/<int:id>/delete', methods=['POST'])
@handle_db_errors
def delete(id):
assessment = Assessment.query.get_or_404(id)
assessment_repo = AssessmentRepository()
assessment = assessment_repo.get_or_404(id)
title = assessment.title # Conserver pour le log
db.session.delete(assessment)
db.session.commit()

View File

@@ -1,7 +1,7 @@
from flask import Blueprint, render_template, request, flash, redirect, url_for, jsonify
from app_config import config_manager
from models import db, AppConfig, CompetenceScaleValue, Competence
from utils import handle_error
from utils import handle_error, handle_db_errors
import logging
bp = Blueprint('config', __name__, url_prefix='/config')
@@ -74,40 +74,37 @@ def competences():
return handle_error(e, "Erreur lors du chargement des compétences")
@bp.route('/competences/add', methods=['POST'])
@handle_db_errors
def add_competence():
"""Ajouter une nouvelle compétence."""
try:
name = request.form.get('name')
color = request.form.get('color', '#3b82f6')
icon = request.form.get('icon', 'star')
if not name:
flash('Le nom de la compétence est requis', 'error')
return redirect(url_for('config.competences'))
# Validation de la couleur hexadécimale
import re
if not re.match(r'^#[0-9a-fA-F]{6}$', color):
flash('Format de couleur invalide', 'error')
return redirect(url_for('config.competences'))
# Vérifier si le nom existe déjà
if Competence.query.filter_by(name=name).first():
flash(f'Une compétence "{name}" existe déjà', 'error')
return redirect(url_for('config.competences'))
if config_manager.add_competence(name, color, icon):
flash(f'Compétence "{name}" ajoutée avec succès', 'success')
else:
flash('Erreur lors de la sauvegarde', 'error')
except Exception as e:
logging.error(f"Erreur ajout compétence: {e}")
flash('Erreur lors de l\'ajout de la compétence', 'error')
name = request.form.get('name')
color = request.form.get('color', '#3b82f6')
icon = request.form.get('icon', 'star')
if not name:
flash('Le nom de la compétence est requis', 'error')
return redirect(url_for('config.competences'))
# Validation de la couleur hexadécimale
import re
if not re.match(r'^#[0-9a-fA-F]{6}$', color):
flash('Format de couleur invalide', 'error')
return redirect(url_for('config.competences'))
# Vérifier si le nom existe déjà
if Competence.query.filter_by(name=name).first():
flash(f'Une compétence "{name}" existe déjà', 'error')
return redirect(url_for('config.competences'))
if config_manager.add_competence(name, color, icon):
flash(f'Compétence "{name}" ajoutée avec succès', 'success')
else:
flash('Erreur lors de la sauvegarde', 'error')
return redirect(url_for('config.competences'))
@bp.route('/competences/update', methods=['POST'])
@handle_db_errors
def update_competence():
"""Modifier une compétence existante."""
try:
@@ -154,6 +151,7 @@ def update_competence():
return redirect(url_for('config.competences'))
@bp.route('/competences/delete/<int:index>', methods=['POST'])
@handle_db_errors
def delete_competence(index):
"""Supprimer une compétence."""
try:
@@ -351,6 +349,7 @@ def general():
return handle_error(e, "Erreur lors du chargement de la configuration générale")
@bp.route('/general/update', methods=['POST'])
@handle_db_errors
def update_general():
"""Mettre à jour la configuration générale."""
try:

View File

@@ -1,13 +1,18 @@
from flask import Blueprint, render_template, redirect, url_for, flash, request, jsonify, current_app
from models import db, Assessment, Student, Grade, GradingElement, Exercise
from flask import Blueprint, render_template, request, jsonify
from models import db, Grade, GradingElement
from repositories import AssessmentRepository, StudentRepository, GradeRepository
from app_config import config_manager
bp = Blueprint('grading', __name__)
@bp.route('/assessments/<int:assessment_id>/grading')
def assessment_grading(assessment_id):
assessment = Assessment.query.get_or_404(assessment_id)
students = Student.query.filter_by(class_group_id=assessment.class_group_id).order_by(Student.last_name, Student.first_name).all()
assessment_repo = AssessmentRepository()
student_repo = StudentRepository()
grade_repo = GradeRepository()
assessment = assessment_repo.get_or_404(assessment_id)
students = student_repo.find_by_class_ordered(assessment.class_group_id)
# Get all grading elements for this assessment
grading_elements = []
@@ -16,10 +21,7 @@ def assessment_grading(assessment_id):
grading_elements.append(element)
# Get existing grades
existing_grades = {}
for grade in Grade.query.join(GradingElement).join(Exercise).filter_by(assessment_id=assessment_id).all():
key = f"{grade.student_id}_{grade.grading_element_id}"
existing_grades[key] = grade
existing_grades = grade_repo.find_existing_grades_for_assessment(assessment_id)
# Préparer les informations d'affichage pour les scores
scale_values = config_manager.get_competence_scale_values()
@@ -34,7 +36,11 @@ def assessment_grading(assessment_id):
@bp.route('/assessments/<int:assessment_id>/grading/save', methods=['POST'])
def save_grades(assessment_id):
assessment = Assessment.query.get_or_404(assessment_id)
assessment_repo = AssessmentRepository()
student_repo = StudentRepository()
grade_repo = GradeRepository()
assessment = assessment_repo.get_or_404(assessment_id)
errors = []
saved_count = 0
@@ -61,7 +67,7 @@ def save_grades(assessment_id):
# Vérifier que l'étudiant et l'élément existent avec protection
try:
student = Student.query.get(student_id)
student = student_repo.find_by_id(student_id)
grading_element = GradingElement.query.get(element_id)
except Exception as e:
errors.append(f'Erreur DB pour {key}: {str(e)}')
@@ -77,10 +83,7 @@ def save_grades(assessment_id):
# Find or create grade avec protection
try:
grade = Grade.query.filter_by(
student_id=student_id,
grading_element_id=element_id
).first()
grade = grade_repo.find_by_student_and_element(student_id, element_id)
if value.strip(): # If value is not empty
# Passer max_points pour la validation des notes
@@ -130,10 +133,7 @@ def save_grades(assessment_id):
continue
try:
grade = Grade.query.filter_by(
student_id=student_id,
grading_element_id=element_id
).first()
grade = grade_repo.find_by_student_and_element(student_id, element_id)
# Créer une note avec commentaire uniquement si nécessaire
if value.strip():
@@ -152,7 +152,8 @@ def save_grades(assessment_id):
except Exception as e:
# Log l'erreur mais ne pas faire planter la sauvegarde
print(f"Erreur commentaire pour {key}: {str(e)}")
from flask import current_app
current_app.logger.warning(f"Erreur commentaire pour {key}: {str(e)}", exc_info=True)
continue
db.session.commit()
@@ -186,10 +187,14 @@ def save_grades(assessment_id):
# Log détaillé de l'erreur
error_details = traceback.format_exc()
print(f"=== ERREUR SAUVEGARDE ASSESSMENT {assessment_id} ===")
print(f"Exception: {type(e).__name__}: {str(e)}")
print(f"Traceback:\n{error_details}")
print("=" * 50)
from flask import current_app
current_app.logger.error(
f"=== ERREUR SAUVEGARDE ASSESSMENT {assessment_id} ===\n"
f"Exception: {type(e).__name__}: {str(e)}\n"
f"Traceback:\n{error_details}\n"
f"{'=' * 50}",
exc_info=True
)
error_msg = f'Erreur lors de la sauvegarde: {str(e)}'
@@ -208,7 +213,11 @@ def save_grades(assessment_id):
@bp.route('/assessments/<int:assessment_id>/grading/save-single', methods=['POST'])
def save_single_grade(assessment_id):
"""Sauvegarde incrémentale d'une seule note"""
assessment = Assessment.query.get_or_404(assessment_id)
assessment_repo = AssessmentRepository()
student_repo = StudentRepository()
grade_repo = GradeRepository()
assessment = assessment_repo.get_or_404(assessment_id)
try:
data = request.get_json()
@@ -218,7 +227,7 @@ def save_single_grade(assessment_id):
comment = data.get('comment', '').strip()
# Vérifications
student = Student.query.get(student_id)
student = student_repo.find_by_id(student_id)
grading_element = GradingElement.query.get(element_id)
if not student or not grading_element:
@@ -228,10 +237,7 @@ def save_single_grade(assessment_id):
}), 404
# Find or create grade
grade = Grade.query.filter_by(
student_id=student_id,
grading_element_id=element_id
).first()
grade = grade_repo.find_by_student_and_element(student_id, element_id)
if value:
# Validation

View File

@@ -1,7 +1,6 @@
from models import db, Assessment, Exercise, GradingElement
from utils import safe_int_conversion, safe_decimal_conversion, validate_json_data, ValidationError, log_user_action
from datetime import datetime
from decimal import Decimal
class AssessmentService:
"""Service pour gérer les opérations sur les évaluations"""

View File

@@ -318,8 +318,6 @@ const Notytex = {
// Initialisation de l'application
init() {
console.log('🎓 Notytex Application Initialized');
// Initialisation des fonctionnalités de base
this.utils.animateOnScroll();

View File

@@ -1,9 +1,8 @@
from functools import wraps
from flask import current_app, flash, jsonify, request, render_template, redirect, url_for
from flask import current_app, flash, jsonify, request, render_template
from models import db
from sqlalchemy.exc import SQLAlchemyError, IntegrityError
from decimal import Decimal, InvalidOperation
import logging
def handle_db_errors(f):
"""Décorateur pour gérer les erreurs de base de données"""