import pytest from bopytex.message import Message from bopytex.scheduler import Scheduler from bopytex.tasks import Task from .fakes.dispatcher import fake_dispatcher def test_schedule_append(): scheduler = Scheduler(dispatcher=fake_dispatcher) tasks = [ Task(action="FAKE", args={}, deps=["dep1", "dep2"], output="end1"), Task(action="FAKE", args={}, deps=["dep1", "dep3"], output="end2"), ] scheduler.append(tasks) assert scheduler.tasks == tasks assert scheduler.all_deps == {"dep1", "dep2", "dep3"} assert scheduler.all_output == {"end1", "end2"} def test_schedule_one_task(): scheduler = Scheduler(dispatcher=fake_dispatcher) tasks = [Task(action="FAKE", args={}, deps=[], output="end")] scheduler.append(tasks) assert scheduler.doable_tasks == tasks result = scheduler.next_task() assert result.status == 0 assert result.out == ["FAKE - {} - [] - end"] assert result.err == [] assert scheduler.tasks == [] assert scheduler.output_done == ["end"] def test_schedule_one_task_with_args(): scheduler = Scheduler(dispatcher=fake_dispatcher) tasks = [Task(action="FAKE", args={"task": "one"}, deps=[], output="one")] scheduler.append(tasks) result = scheduler.next_task() assert result.status == 0 assert result.out == ["FAKE - {'task': 'one'} - [] - one"] assert result.err == [] assert scheduler.tasks == [] assert scheduler.output_done == ["one"] def test_schedule_multiple_tasks(): scheduler = Scheduler(dispatcher=fake_dispatcher) t1 = Task(action="FAKE", args={"task": "one"}, deps=[], output="one") t2 = Task(action="FAKE", args={"task": "two"}, deps=[], output="two") t3 = Task(action="FAKE", args={"task": "three"}, deps=[], output="three") scheduler.append([t1, t2, t3]) assert scheduler.doable_tasks == [t1, t2, t3] assert scheduler.is_finishable() result = scheduler.next_task() assert result.status == 0 assert scheduler.tasks == [t2, t3] assert scheduler.output_done == ["one"] result = scheduler.next_task() assert result.status == 0 assert scheduler.tasks == [t3] assert scheduler.output_done == ["one", "two"] result = scheduler.next_task() assert result.status == 0 assert scheduler.tasks == [] assert scheduler.output_done == ["one", "two", "three"] def test_schedule_multiple_tasks_with_dependencies(): scheduler = Scheduler(dispatcher=fake_dispatcher) t1 = Task(action="FAKE", args={"task": "one"}, deps=["three"], output="one") t2 = Task(action="FAKE", args={"task": "two"}, deps=["one"], output="two") t3 = Task(action="FAKE", args={"task": "three"}, deps=[], output="three") scheduler.append([t1, t2, t3]) assert scheduler.doable_tasks == [t3] assert scheduler.is_finishable() result = scheduler.next_task() assert result.status == 0 assert scheduler.tasks == [t1, t2] assert scheduler.output_done == ["three"] assert scheduler.doable_tasks == [t1] result = scheduler.next_task() assert result.status == 0 assert scheduler.tasks == [t2] assert scheduler.output_done == ["three", "one"] assert scheduler.doable_tasks == [t2] result = scheduler.next_task() assert result.status == 0 assert scheduler.tasks == [] assert scheduler.output_done == ["three", "one", "two"] def test_schedule_multiple_tasks_with_dependencies_loop(): scheduler = Scheduler(dispatcher=fake_dispatcher) t1 = Task(action="FAKE", args={"task": "one"}, deps=["three"], output="one") t2 = Task(action="FAKE", args={"task": "two"}, deps=["one"], output="two") t3 = Task(action="FAKE", args={"task": "three"}, deps=[], output="three") scheduler.append([t1, t2, t3]) for task in scheduler.backlog(): pass assert scheduler.output_done == ["three", "one", "two"] def test_schedule_empty_task(): scheduler = Scheduler(dispatcher=fake_dispatcher) scheduler.append([]) with pytest.raises(StopIteration): scheduler.next_task() def test_schedule_multiple_tasks_with_undoable_dependencies(): scheduler = Scheduler(dispatcher=fake_dispatcher) t1 = Task(action="FAKE", args={"task": "one"}, deps=["three"], output="one") t2 = Task(action="FAKE", args={"task": "two"}, deps=[], output="two") scheduler.append([t1, t2]) assert scheduler.doable_tasks == [t2] assert not scheduler.is_finishable() for _ in scheduler.backlog(): pass assert scheduler.tasks == [t1] assert scheduler.output_done == ["two"] assert scheduler.doable_tasks == [] def test_schedule_multiple_tasks_with_failling_tasks(): scheduler = Scheduler(dispatcher=fake_dispatcher) t1 = Task(action="FAILURE", args={"task": "one"}, deps=["three"], output="one") t2 = Task(action="FAKE", args={"task": "two"}, deps=["one"], output="two") t3 = Task(action="FAKE", args={"task": "three"}, deps=[], output="three") t4 = Task(action="FAILURE", args={"task": "four"}, deps=[], output="four") scheduler.append([t1, t2, t3, t4]) assert scheduler.doable_tasks == [t3, t4] assert scheduler.is_finishable() status = [] for message in scheduler.backlog(): status.append(message.status) assert status == [0, 1, 1] assert scheduler.tasks == [t2] assert scheduler.failed_tasks == [t1, t4] assert scheduler.output_done == ["three"] assert scheduler.doable_tasks == []