162 lines
5.4 KiB
Python
162 lines
5.4 KiB
Python
from bopytex.message import Message
|
|
from bopytex.tasks import Task
|
|
from bopytex.scheduler import Scheduler
|
|
from .fakes.dispatcher import fake_dispatcher
|
|
import pytest
|
|
|
|
|
|
def test_schedule_append():
|
|
scheduler = Scheduler(dispatcher=fake_dispatcher)
|
|
tasks = [
|
|
Task(action="FAKE", args={}, deps=["dep1", "dep2"], output="end1"),
|
|
Task(action="FAKE", args={}, deps=["dep1", "dep3"], output="end2"),
|
|
]
|
|
scheduler.append(tasks)
|
|
assert scheduler.tasks == tasks
|
|
assert scheduler.all_deps == {"dep1", "dep2", "dep3"}
|
|
assert scheduler.all_output == {"end1", "end2"}
|
|
|
|
|
|
def test_schedule_one_task():
|
|
scheduler = Scheduler(dispatcher=fake_dispatcher)
|
|
tasks = [Task(action="FAKE", args={}, deps=[], output="end")]
|
|
scheduler.append(tasks)
|
|
|
|
assert scheduler.doable_tasks == tasks
|
|
|
|
result = scheduler.next_task()
|
|
assert result.status == 0
|
|
assert result.out == ["FAKE - {} - [] - end"]
|
|
assert result.err == []
|
|
|
|
assert scheduler.tasks == []
|
|
assert scheduler.output_done == ["end"]
|
|
|
|
|
|
def test_schedule_one_task_with_args():
|
|
scheduler = Scheduler(dispatcher=fake_dispatcher)
|
|
tasks = [Task(action="FAKE", args={"task": "one"}, deps=[], output="one")]
|
|
scheduler.append(tasks)
|
|
|
|
result = scheduler.next_task()
|
|
|
|
assert result.status == 0
|
|
assert result.out == ["FAKE - {'task': 'one'} - [] - one"]
|
|
assert result.err == []
|
|
|
|
assert scheduler.tasks == []
|
|
assert scheduler.output_done == ["one"]
|
|
|
|
|
|
def test_schedule_multiple_tasks():
|
|
scheduler = Scheduler(dispatcher=fake_dispatcher)
|
|
t1 = Task(action="FAKE", args={"task": "one"}, deps=[], output="one")
|
|
t2 = Task(action="FAKE", args={"task": "two"}, deps=[], output="two")
|
|
t3 = Task(action="FAKE", args={"task": "three"}, deps=[], output="three")
|
|
scheduler.append([t1, t2, t3])
|
|
|
|
assert scheduler.doable_tasks == [t1, t2, t3]
|
|
assert scheduler.is_finishable()
|
|
|
|
result = scheduler.next_task()
|
|
assert result.status == 0
|
|
assert scheduler.tasks == [t2, t3]
|
|
assert scheduler.output_done == ["one"]
|
|
|
|
result = scheduler.next_task()
|
|
assert result.status == 0
|
|
assert scheduler.tasks == [t3]
|
|
assert scheduler.output_done == ["one", "two"]
|
|
|
|
result = scheduler.next_task()
|
|
assert result.status == 0
|
|
assert scheduler.tasks == []
|
|
assert scheduler.output_done == ["one", "two", "three"]
|
|
|
|
|
|
def test_schedule_multiple_tasks_with_dependencies():
|
|
scheduler = Scheduler(dispatcher=fake_dispatcher)
|
|
t1 = Task(action="FAKE", args={"task": "one"}, deps=["three"], output="one")
|
|
t2 = Task(action="FAKE", args={"task": "two"}, deps=["one"], output="two")
|
|
t3 = Task(action="FAKE", args={"task": "three"}, deps=[], output="three")
|
|
scheduler.append([t1, t2, t3])
|
|
|
|
assert scheduler.doable_tasks == [t3]
|
|
assert scheduler.is_finishable()
|
|
|
|
result = scheduler.next_task()
|
|
assert result.status == 0
|
|
assert scheduler.tasks == [t1, t2]
|
|
assert scheduler.output_done == ["three"]
|
|
assert scheduler.doable_tasks == [t1]
|
|
|
|
result = scheduler.next_task()
|
|
assert result.status == 0
|
|
assert scheduler.tasks == [t2]
|
|
assert scheduler.output_done == ["three", "one"]
|
|
assert scheduler.doable_tasks == [t2]
|
|
|
|
result = scheduler.next_task()
|
|
assert result.status == 0
|
|
assert scheduler.tasks == []
|
|
assert scheduler.output_done == ["three", "one", "two"]
|
|
|
|
|
|
def test_schedule_multiple_tasks_with_dependencies_loop():
|
|
scheduler = Scheduler(dispatcher=fake_dispatcher)
|
|
t1 = Task(action="FAKE", args={"task": "one"}, deps=["three"], output="one")
|
|
t2 = Task(action="FAKE", args={"task": "two"}, deps=["one"], output="two")
|
|
t3 = Task(action="FAKE", args={"task": "three"}, deps=[], output="three")
|
|
scheduler.append([t1, t2, t3])
|
|
|
|
for task in scheduler.backlog():
|
|
pass
|
|
|
|
assert scheduler.output_done == ["three", "one", "two"]
|
|
|
|
|
|
def test_schedule_empty_task():
|
|
scheduler = Scheduler(dispatcher=fake_dispatcher)
|
|
scheduler.append([])
|
|
with pytest.raises(StopIteration):
|
|
scheduler.next_task()
|
|
|
|
|
|
def test_schedule_multiple_tasks_with_undoable_dependencies():
|
|
scheduler = Scheduler(dispatcher=fake_dispatcher)
|
|
t1 = Task(action="FAKE", args={"task": "one"}, deps=["three"], output="one")
|
|
t2 = Task(action="FAKE", args={"task": "two"}, deps=[], output="two")
|
|
scheduler.append([t1, t2])
|
|
|
|
assert scheduler.doable_tasks == [t2]
|
|
assert not scheduler.is_finishable()
|
|
|
|
for _ in scheduler.backlog():
|
|
pass
|
|
|
|
assert scheduler.tasks == [t1]
|
|
assert scheduler.output_done == ["two"]
|
|
assert scheduler.doable_tasks == []
|
|
|
|
|
|
def test_schedule_multiple_tasks_with_failling_tasks():
|
|
scheduler = Scheduler(dispatcher=fake_dispatcher)
|
|
t1 = Task(action="FAILURE", args={"task": "one"}, deps=["three"], output="one")
|
|
t2 = Task(action="FAKE", args={"task": "two"}, deps=["one"], output="two")
|
|
t3 = Task(action="FAKE", args={"task": "three"}, deps=[], output="three")
|
|
t4 = Task(action="FAILURE", args={"task": "four"}, deps=[], output="four")
|
|
scheduler.append([t1, t2, t3, t4])
|
|
|
|
assert scheduler.doable_tasks == [t3, t4]
|
|
assert scheduler.is_finishable()
|
|
|
|
status = []
|
|
for message in scheduler.backlog():
|
|
status.append(message.status)
|
|
|
|
assert status == [0, 1, 1]
|
|
assert scheduler.tasks == [t2]
|
|
assert scheduler.failed_tasks == [t1, t4]
|
|
assert scheduler.output_done == ["three"]
|
|
assert scheduler.doable_tasks == []
|