""" Tests de performance spécialisés pour AssessmentProgressService (JOUR 4 - Étape 2.2) Ce module teste spécifiquement les améliorations de performance apportées par AssessmentProgressService en remplaçant les requêtes N+1 par des requêtes optimisées. Métriques mesurées : - Nombre de requêtes SQL exécutées - Temps d'exécution - Utilisation mémoire - Scalabilité avec le volume de données Ces tests permettent de quantifier l'amélioration avant/après migration. """ import pytest import time import statistics from contextlib import contextmanager from typing import List, Dict, Any from unittest.mock import patch from datetime import date from sqlalchemy import event from models import db, Assessment, ClassGroup, Student, Exercise, GradingElement, Grade from config.feature_flags import FeatureFlag class QueryCounter: """Utilitaire pour compter les requêtes SQL.""" def __init__(self): self.query_count = 0 self.queries = [] def count_query(self, conn, cursor, statement, parameters, context, executemany): """Callback pour compter les requêtes.""" self.query_count += 1 self.queries.append({ 'statement': statement, 'parameters': parameters, 'executemany': executemany }) @contextmanager def measure(self): """Context manager pour mesurer les requêtes.""" self.query_count = 0 self.queries = [] event.listen(db.engine, "before_cursor_execute", self.count_query) try: yield self finally: event.remove(db.engine, "before_cursor_execute", self.count_query) class PerformanceBenchmark: """Classe pour mesurer les performances.""" @staticmethod def measure_execution_time(func, *args, **kwargs) -> Dict[str, Any]: """Mesure le temps d'exécution d'une fonction.""" start_time = time.perf_counter() result = func(*args, **kwargs) end_time = time.perf_counter() return { 'result': result, 'execution_time': end_time - start_time, 'execution_time_ms': (end_time - start_time) * 1000 } @staticmethod def compare_implementations(assessment, iterations: int = 5) -> Dict[str, Any]: """ Compare les performances entre legacy et service. Args: assessment: L'assessment à tester iterations: Nombre d'itérations pour la moyenne Returns: Dict avec les statistiques de comparaison """ legacy_times = [] service_times = [] legacy_queries = [] service_queries = [] counter = QueryCounter() # Mesure des performances legacy for _ in range(iterations): with counter.measure(): benchmark_result = PerformanceBenchmark.measure_execution_time( assessment._grading_progress_legacy ) legacy_times.append(benchmark_result['execution_time_ms']) legacy_queries.append(counter.query_count) # Mesure des performances service for _ in range(iterations): with counter.measure(): benchmark_result = PerformanceBenchmark.measure_execution_time( assessment._grading_progress_with_service ) service_times.append(benchmark_result['execution_time_ms']) service_queries.append(counter.query_count) return { 'legacy': { 'avg_time_ms': statistics.mean(legacy_times), 'median_time_ms': statistics.median(legacy_times), 'min_time_ms': min(legacy_times), 'max_time_ms': max(legacy_times), 'std_dev_time_ms': statistics.stdev(legacy_times) if len(legacy_times) > 1 else 0, 'avg_queries': statistics.mean(legacy_queries), 'max_queries': max(legacy_queries), 'all_times': legacy_times, 'all_queries': legacy_queries }, 'service': { 'avg_time_ms': statistics.mean(service_times), 'median_time_ms': statistics.median(service_times), 'min_time_ms': min(service_times), 'max_time_ms': max(service_times), 'std_dev_time_ms': statistics.stdev(service_times) if len(service_times) > 1 else 0, 'avg_queries': statistics.mean(service_queries), 'max_queries': max(service_queries), 'all_times': service_times, 'all_queries': service_queries }, 'improvement': { 'time_ratio': statistics.mean(legacy_times) / statistics.mean(service_times) if statistics.mean(service_times) > 0 else float('inf'), 'queries_saved': statistics.mean(legacy_queries) - statistics.mean(service_queries), 'queries_ratio': statistics.mean(legacy_queries) / statistics.mean(service_queries) if statistics.mean(service_queries) > 0 else float('inf') } } class TestGradingProgressPerformance: """ Suite de tests de performance pour grading_progress. """ def test_small_dataset_performance(self, app): """ PERFORMANCE : Test sur un petit dataset (2 étudiants, 2 exercices, 4 éléments). """ assessment = self._create_assessment_with_data( students_count=2, exercises_count=2, elements_per_exercise=2 ) comparison = PerformanceBenchmark.compare_implementations(assessment) # ASSERTIONS print(f"\n=== SMALL DATASET PERFORMANCE ===") print(f"Legacy: {comparison['legacy']['avg_time_ms']:.2f}ms avg, {comparison['legacy']['avg_queries']:.1f} queries avg") print(f"Service: {comparison['service']['avg_time_ms']:.2f}ms avg, {comparison['service']['avg_queries']:.1f} queries avg") print(f"Improvement: {comparison['improvement']['time_ratio']:.2f}x faster, {comparison['improvement']['queries_saved']:.1f} queries saved") # Le service doit faire moins de requêtes assert comparison['service']['avg_queries'] < comparison['legacy']['avg_queries'], ( f"Service devrait faire moins de requêtes: {comparison['service']['avg_queries']} vs {comparison['legacy']['avg_queries']}" ) # Les résultats doivent être identiques legacy_result = assessment._grading_progress_legacy() service_result = assessment._grading_progress_with_service() assert legacy_result == service_result def test_medium_dataset_performance(self, app): """ PERFORMANCE : Test sur un dataset moyen (5 étudiants, 3 exercices, 6 éléments). """ assessment = self._create_assessment_with_data( students_count=5, exercises_count=3, elements_per_exercise=2 ) comparison = PerformanceBenchmark.compare_implementations(assessment) print(f"\n=== MEDIUM DATASET PERFORMANCE ===") print(f"Legacy: {comparison['legacy']['avg_time_ms']:.2f}ms avg, {comparison['legacy']['avg_queries']:.1f} queries avg") print(f"Service: {comparison['service']['avg_time_ms']:.2f}ms avg, {comparison['service']['avg_queries']:.1f} queries avg") print(f"Improvement: {comparison['improvement']['time_ratio']:.2f}x faster, {comparison['improvement']['queries_saved']:.1f} queries saved") # Le service doit faire significativement moins de requêtes avec plus de données queries_improvement = comparison['improvement']['queries_ratio'] assert queries_improvement > 1.5, ( f"Avec plus de données, l'amélioration devrait être plus significative: {queries_improvement:.2f}x" ) # Les résultats doivent être identiques legacy_result = assessment._grading_progress_legacy() service_result = assessment._grading_progress_with_service() assert legacy_result == service_result def test_large_dataset_performance(self, app): """ PERFORMANCE : Test sur un grand dataset (10 étudiants, 4 exercices, 12 éléments). """ assessment = self._create_assessment_with_data( students_count=10, exercises_count=4, elements_per_exercise=3 ) comparison = PerformanceBenchmark.compare_implementations(assessment) print(f"\n=== LARGE DATASET PERFORMANCE ===") print(f"Legacy: {comparison['legacy']['avg_time_ms']:.2f}ms avg, {comparison['legacy']['avg_queries']:.1f} queries avg") print(f"Service: {comparison['service']['avg_time_ms']:.2f}ms avg, {comparison['service']['avg_queries']:.1f} queries avg") print(f"Improvement: {comparison['improvement']['time_ratio']:.2f}x faster, {comparison['improvement']['queries_saved']:.1f} queries saved") # Avec beaucoup de données, l'amélioration doit être dramatique queries_improvement = comparison['improvement']['queries_ratio'] assert queries_improvement > 2.0, ( f"Avec beaucoup de données, l'amélioration devrait être dramatique: {queries_improvement:.2f}x" ) # Le service ne doit jamais dépasser un certain nombre de requêtes (peu importe la taille) max_service_queries = comparison['service']['max_queries'] assert max_service_queries <= 5, ( f"Le service optimisé ne devrait jamais dépasser 5 requêtes, trouvé: {max_service_queries}" ) # Les résultats doivent être identiques legacy_result = assessment._grading_progress_legacy() service_result = assessment._grading_progress_with_service() assert legacy_result == service_result def test_scalability_analysis(self, app): """ ANALYSE : Teste la scalabilité avec différentes tailles de datasets. """ dataset_configs = [ (2, 2, 1), # Petit : 2 étudiants, 2 exercices, 1 élément/ex (5, 3, 2), # Moyen : 5 étudiants, 3 exercices, 2 éléments/ex (8, 4, 2), # Grand : 8 étudiants, 4 exercices, 2 éléments/ex ] scalability_results = [] for students_count, exercises_count, elements_per_exercise in dataset_configs: assessment = self._create_assessment_with_data( students_count, exercises_count, elements_per_exercise ) comparison = PerformanceBenchmark.compare_implementations(assessment, iterations=3) total_elements = exercises_count * elements_per_exercise total_grades = students_count * total_elements scalability_results.append({ 'dataset_size': f"{students_count}s-{exercises_count}e-{total_elements}el", 'total_grades': total_grades, 'legacy_queries': comparison['legacy']['avg_queries'], 'service_queries': comparison['service']['avg_queries'], 'queries_ratio': comparison['improvement']['queries_ratio'], 'time_ratio': comparison['improvement']['time_ratio'] }) print(f"\n=== SCALABILITY ANALYSIS ===") for result in scalability_results: print(f"Dataset {result['dataset_size']}: " f"Legacy={result['legacy_queries']:.1f}q, " f"Service={result['service_queries']:.1f}q, " f"Improvement={result['queries_ratio']:.1f}x queries") # Le service doit avoir une complexité constante ou sous-linéaire service_queries = [r['service_queries'] for r in scalability_results] legacy_queries = [r['legacy_queries'] for r in scalability_results] # Les requêtes du service ne doivent pas croître linéairement service_growth = service_queries[-1] / service_queries[0] if service_queries[0] > 0 else 1 legacy_growth = legacy_queries[-1] / legacy_queries[0] if legacy_queries[0] > 0 else 1 print(f"Service queries growth: {service_growth:.2f}x") print(f"Legacy queries growth: {legacy_growth:.2f}x") assert service_growth < legacy_growth, ( f"Le service doit avoir une croissance plus lente que legacy: {service_growth:.2f} vs {legacy_growth:.2f}" ) def test_query_patterns_analysis(self, app): """ ANALYSE : Analyse des patterns de requêtes pour comprendre les optimisations. """ assessment = self._create_assessment_with_data( students_count=3, exercises_count=2, elements_per_exercise=2 ) counter = QueryCounter() # Analyse des requêtes legacy with counter.measure(): assessment._grading_progress_legacy() legacy_queries = counter.queries.copy() # Analyse des requêtes service with counter.measure(): assessment._grading_progress_with_service() service_queries = counter.queries.copy() print(f"\n=== QUERY PATTERNS ANALYSIS ===") print(f"Legacy executed {len(legacy_queries)} queries:") for i, query in enumerate(legacy_queries[:5]): # Montrer les 5 premières print(f" {i+1}: {query['statement'][:100]}...") print(f"\nService executed {len(service_queries)} queries:") for i, query in enumerate(service_queries): print(f" {i+1}: {query['statement'][:100]}...") # Le service ne doit pas avoir de requêtes dans des boucles # (heuristique : pas de requêtes identiques répétées) legacy_statements = [q['statement'] for q in legacy_queries] service_statements = [q['statement'] for q in service_queries] legacy_duplicates = len(legacy_statements) - len(set(legacy_statements)) service_duplicates = len(service_statements) - len(set(service_statements)) print(f"Legacy duplicate queries: {legacy_duplicates}") print(f"Service duplicate queries: {service_duplicates}") # Le service doit avoir moins de requêtes dupliquées (moins de boucles) assert service_duplicates < legacy_duplicates, ( f"Service devrait avoir moins de requêtes dupliquées: {service_duplicates} vs {legacy_duplicates}" ) def _create_assessment_with_data(self, students_count: int, exercises_count: int, elements_per_exercise: int) -> Assessment: """ Helper pour créer un assessment avec des données de test. Args: students_count: Nombre d'étudiants exercises_count: Nombre d'exercices elements_per_exercise: Nombre d'éléments de notation par exercice Returns: Assessment créé avec toutes les données associées """ # Créer la classe et les étudiants class_group = ClassGroup(name=f'Perf Test Class {students_count}', year='2025') students = [ Student( first_name=f'Student{i}', last_name=f'Test{i}', class_group=class_group ) for i in range(students_count) ] # Créer l'assessment assessment = Assessment( title=f'Performance Test {students_count}s-{exercises_count}e', date=date.today(), trimester=1, class_group=class_group ) db.session.add_all([class_group, assessment, *students]) db.session.commit() # Créer les exercices et éléments exercises = [] elements = [] grades = [] for ex_idx in range(exercises_count): exercise = Exercise( title=f'Exercise {ex_idx+1}', assessment=assessment, order=ex_idx+1 ) exercises.append(exercise) for elem_idx in range(elements_per_exercise): element = GradingElement( label=f'Question {ex_idx+1}.{elem_idx+1}', max_points=10, grading_type='notes', exercise=exercise ) elements.append(element) db.session.add_all(exercises + elements) db.session.commit() # Créer des notes partielles (environ 70% de completion) grade_probability = 0.7 for student in students: for element in elements: # Probabilité de 70% d'avoir une note import random if random.random() < grade_probability: grade = Grade( student=student, grading_element=element, value=str(random.randint(5, 10)) # Note entre 5 et 10 ) grades.append(grade) db.session.add_all(grades) db.session.commit() return assessment def test_memory_usage_comparison(self, app): """ MÉMOIRE : Comparer l'utilisation mémoire entre les deux implémentations. """ import tracemalloc assessment = self._create_assessment_with_data( students_count=8, exercises_count=4, elements_per_exercise=3 ) # Mesure mémoire legacy tracemalloc.start() legacy_result = assessment._grading_progress_legacy() _, legacy_peak = tracemalloc.get_traced_memory() tracemalloc.stop() # Mesure mémoire service tracemalloc.start() service_result = assessment._grading_progress_with_service() _, service_peak = tracemalloc.get_traced_memory() tracemalloc.stop() print(f"\n=== MEMORY USAGE COMPARISON ===") print(f"Legacy peak memory: {legacy_peak / 1024:.1f} KB") print(f"Service peak memory: {service_peak / 1024:.1f} KB") print(f"Memory improvement: {legacy_peak / service_peak:.2f}x") # Les résultats doivent être identiques assert legacy_result == service_result # Note: Il est difficile de garantir que le service utilise moins de mémoire # car la différence peut être minime et influencée par d'autres facteurs. # On vérifie juste que l'utilisation reste raisonnable. assert service_peak < 1024 * 1024, "L'utilisation mémoire ne devrait pas dépasser 1MB"