Feat: manage failing tasks in scheduler

This commit is contained in:
Bertrand Benjamin 2022-04-13 15:20:34 +02:00
parent b455ef23c4
commit fe18dc4ef1
3 changed files with 48 additions and 20 deletions

View File

@ -8,15 +8,16 @@ from bopytex.worker import Dispatcher
class Scheduler: class Scheduler:
"""Scheduler is responsible of getting tasks (the tasks) and yield those that can be done""" """Scheduler is responsible of getting tasks (the tasks) and yield those that can be done"""
def __init__(self, dispatcher:Dispatcher, done: list[str] = None): def __init__(self, dispatcher:Dispatcher, output_done: list[str] = None):
self._dispatcher = dispatcher self._dispatcher = dispatcher
if done is None: if output_done is None:
self._done = [] self._output_done = []
else: else:
self._done = done self._output_done = output_done
self._tasks = [] self._tasks = []
self._failed_tasks = []
@property @property
def tasks(self) -> list[Task]: def tasks(self) -> list[Task]:
@ -29,7 +30,7 @@ class Scheduler:
return [ return [
task task
for task in self.tasks for task in self.tasks
if not task.deps or all([d in self.done for d in task.deps]) if not task.deps or all([d in self.output_done for d in task.deps])
] ]
@property @property
@ -43,8 +44,12 @@ class Scheduler:
return {task.output for task in self.tasks} return {task.output for task in self.tasks}
@property @property
def done(self) -> list[str]: def output_done(self) -> list[str]:
return self._done return self._output_done
@property
def failed_tasks(self) -> list[Task]:
return self._failed_tasks
def append(self, tasks: list[Task]): def append(self, tasks: list[Task]):
self._tasks += tasks self._tasks += tasks
@ -60,9 +65,10 @@ class Scheduler:
self._tasks.remove(task) self._tasks.remove(task)
message = self._dispatcher(task) message = self._dispatcher(task)
if message.status == 0: if message.status == 0:
self._done.append(task.output) self._output_done.append(task.output)
else:
self._failed_tasks.append(task)
return message return message

View File

@ -8,6 +8,6 @@ def success_worker(args, deps, output):
return Message(0, [f"SUCCESS - {args} - {deps} - {output}"], []) return Message(0, [f"SUCCESS - {args} - {deps} - {output}"], [])
def fail_worker(): def fail_worker(args, deps, output):
return Message(1, [f"FAILURE - {args} - {deps} - {output}"], []) return Message(1, [f"FAILURE - {args} - {deps} - {output}"], [])

View File

@ -30,7 +30,7 @@ def test_schedule_one_task():
assert result.err == [] assert result.err == []
assert scheduler.tasks == [] assert scheduler.tasks == []
assert scheduler.done == ["end"] assert scheduler.output_done == ["end"]
def test_schedule_one_task_with_args(): def test_schedule_one_task_with_args():
@ -45,7 +45,7 @@ def test_schedule_one_task_with_args():
assert result.err == [] assert result.err == []
assert scheduler.tasks == [] assert scheduler.tasks == []
assert scheduler.done == ["one"] assert scheduler.output_done == ["one"]
def test_schedule_multiple_tasks(): def test_schedule_multiple_tasks():
@ -61,17 +61,17 @@ def test_schedule_multiple_tasks():
result = scheduler.next_task() result = scheduler.next_task()
assert result.status == 0 assert result.status == 0
assert scheduler.tasks == [t2, t3] assert scheduler.tasks == [t2, t3]
assert scheduler.done == ["one"] assert scheduler.output_done == ["one"]
result = scheduler.next_task() result = scheduler.next_task()
assert result.status == 0 assert result.status == 0
assert scheduler.tasks == [t3] assert scheduler.tasks == [t3]
assert scheduler.done == ["one", "two"] assert scheduler.output_done == ["one", "two"]
result = scheduler.next_task() result = scheduler.next_task()
assert result.status == 0 assert result.status == 0
assert scheduler.tasks == [] assert scheduler.tasks == []
assert scheduler.done == ["one", "two", "three"] assert scheduler.output_done == ["one", "two", "three"]
def test_schedule_multiple_tasks_with_dependencies(): def test_schedule_multiple_tasks_with_dependencies():
@ -87,19 +87,19 @@ def test_schedule_multiple_tasks_with_dependencies():
result = scheduler.next_task() result = scheduler.next_task()
assert result.status == 0 assert result.status == 0
assert scheduler.tasks == [t1, t2] assert scheduler.tasks == [t1, t2]
assert scheduler.done == ["three"] assert scheduler.output_done == ["three"]
assert scheduler.doable_tasks == [t1] assert scheduler.doable_tasks == [t1]
result = scheduler.next_task() result = scheduler.next_task()
assert result.status == 0 assert result.status == 0
assert scheduler.tasks == [t2] assert scheduler.tasks == [t2]
assert scheduler.done == ["three", "one"] assert scheduler.output_done == ["three", "one"]
assert scheduler.doable_tasks == [t2] assert scheduler.doable_tasks == [t2]
result = scheduler.next_task() result = scheduler.next_task()
assert result.status == 0 assert result.status == 0
assert scheduler.tasks == [] assert scheduler.tasks == []
assert scheduler.done == ["three", "one", "two"] assert scheduler.output_done == ["three", "one", "two"]
def test_schedule_multiple_tasks_with_dependencies_loop(): def test_schedule_multiple_tasks_with_dependencies_loop():
@ -112,7 +112,7 @@ def test_schedule_multiple_tasks_with_dependencies_loop():
for task in scheduler.backlog(): for task in scheduler.backlog():
pass pass
assert scheduler.done == ["three", "one", "two"] assert scheduler.output_done == ["three", "one", "two"]
def test_schedule_empty_task(): def test_schedule_empty_task():
@ -135,5 +135,27 @@ def test_schedule_multiple_tasks_with_undoable_dependencies():
pass pass
assert scheduler.tasks == [t1] assert scheduler.tasks == [t1]
assert scheduler.done == ["two"] assert scheduler.output_done == ["two"]
assert scheduler.doable_tasks == []
def test_schedule_multiple_tasks_with_failling_tasks():
scheduler = Scheduler(dispatcher=fake_dispatcher)
t1 = Task(action="FAILURE", args={"task": "one"}, deps=["three"], output="one")
t2 = Task(action="FAKE", args={"task": "two"}, deps=["one"], output="two")
t3 = Task(action="FAKE", args={"task": "three"}, deps=[], output="three")
t4 = Task(action="FAILURE", args={"task": "four"}, deps=[], output="four")
scheduler.append([t1, t2, t3, t4])
assert scheduler.doable_tasks == [t3, t4]
assert scheduler.is_finishable()
status = []
for message in scheduler.backlog():
status.append(message.status)
assert status == [0, 1, 1]
assert scheduler.tasks == [t2]
assert scheduler.failed_tasks == [t1, t4]
assert scheduler.output_done == ["three"]
assert scheduler.doable_tasks == [] assert scheduler.doable_tasks == []