Feat: scheduler don't manage actions

This commit is contained in:
Bertrand Benjamin 2022-04-09 21:46:24 +02:00
parent 1f0547c1ac
commit 963348611a
2 changed files with 70 additions and 76 deletions

View File

@ -5,8 +5,9 @@ from bopytex.tasks import Task
class Scheduler: class Scheduler:
def __init__(self, actions: dict, done: list[str] = None): """Scheduler is responsible of getting tasks (the tasks) and yield those that can be done"""
self.actions = actions
def __init__(self, done: list[str] = None):
if done is None: if done is None:
self._done = [] self._done = []
@ -17,12 +18,12 @@ class Scheduler:
@property @property
def tasks(self) -> list[Task]: def tasks(self) -> list[Task]:
""" list all the tasks """ """List all the tasks todo"""
return self._tasks return self._tasks
@property @property
def doable_tasks(self) -> list[Task]: def doable_tasks(self) -> list[Task]:
""" list all doable tasks """ """List all doable tasks"""
return [ return [
task task
for task in self.tasks for task in self.tasks
@ -46,31 +47,20 @@ class Scheduler:
def append(self, tasks: list[Task]): def append(self, tasks: list[Task]):
self._tasks += tasks self._tasks += tasks
def dispatch(self, task): def is_finishable(self):
"""Do a task""" return self.all_deps.issubset(self.all_output)
ans = self.actions[task.action](task.deps, task.args, task.output)
return ans
def __iter__(self): def next_task(self):
return self
def __next__(self):
return self.next()
def next(self):
try: try:
task = self.doable_tasks[0] task = self.doable_tasks[0]
except IndexError: except IndexError:
raise StopIteration raise StopIteration
ans = self.dispatch(task)
self._done.append(task.output) self._done.append(task.output)
self._tasks.remove(task) self._tasks.remove(task)
return ans return task
def run(self): def backlog(self):
for _ in self: """ Yield tasks sorted according to dependencies """
pass while self.doable_tasks:
yield self.next_task()
def is_finishable(self):
return self.all_deps.issubset(self.all_output)

View File

@ -3,18 +3,11 @@ from bopytex.scheduler import Scheduler
import pytest import pytest
def action_done(deps, args, output):
return f"{deps} - {args} - {output} - done"
actions = {"DO": action_done}
def test_schedule_append(): def test_schedule_append():
scheduler = Scheduler(actions) scheduler = Scheduler()
tasks = [ tasks = [
Task(action="DO", args={}, deps=["dep1", "dep2"], output="end1"), Task(action="FOO", args={}, deps=["dep1", "dep2"], output="end1"),
Task(action="DO", args={}, deps=["dep1", "dep3"], output="end2"), Task(action="FOO", args={}, deps=["dep1", "dep3"], output="end2"),
] ]
scheduler.append(tasks) scheduler.append(tasks)
assert scheduler.tasks == tasks assert scheduler.tasks == tasks
@ -22,104 +15,115 @@ def test_schedule_append():
assert scheduler.all_output == {"end1", "end2"} assert scheduler.all_output == {"end1", "end2"}
def test_schedule_dispatch():
scheduler = Scheduler(actions)
task = Task(action="DO", args={}, deps=[], output="end")
result = scheduler.dispatch(task)
assert result == "[] - {} - end - done"
def test_schedule_one_task(): def test_schedule_one_task():
scheduler = Scheduler(actions) scheduler = Scheduler()
tasks = [Task(action="DO", args={}, deps=[], output="end")] tasks = [Task(action="FOO", args={}, deps=[], output="end")]
scheduler.append(tasks) scheduler.append(tasks)
assert scheduler.doable_tasks == tasks assert scheduler.doable_tasks == tasks
result = scheduler.next()
assert result == "[] - {} - end - done" result = scheduler.next_task()
assert result == tasks[0]
assert scheduler.tasks == [] assert scheduler.tasks == []
assert scheduler.done == ["end"] assert scheduler.done == ["end"]
def test_schedule_one_task_with_args(): def test_schedule_one_task_with_args():
scheduler = Scheduler(actions) scheduler = Scheduler()
scheduler.append([Task(action="DO", args={"task": "one"}, deps=[], output="one")]) tasks = [Task(action="FOO", args={"task": "one"}, deps=[], output="one")]
result = scheduler.next() scheduler.append(tasks)
assert result == "[] - {'task': 'one'} - one - done"
result = scheduler.next_task()
assert result == tasks[0]
assert scheduler.tasks == [] assert scheduler.tasks == []
assert scheduler.done == ["one"] assert scheduler.done == ["one"]
def test_schedule_multiple_tasks(): def test_schedule_multiple_tasks():
scheduler = Scheduler(actions) scheduler = Scheduler()
t1 = Task(action="DO", args={"task": "one"}, deps=[], output="one") t1 = Task(action="FOO", args={"task": "one"}, deps=[], output="one")
t2 = Task(action="DO", args={"task": "two"}, deps=[], output="two") t2 = Task(action="FOO", args={"task": "two"}, deps=[], output="two")
t3 = Task(action="DO", args={"task": "three"}, deps=[], output="three") t3 = Task(action="FOO", args={"task": "three"}, deps=[], output="three")
scheduler.append([t1, t2, t3]) scheduler.append([t1, t2, t3])
assert scheduler.doable_tasks == [t1, t2, t3] assert scheduler.doable_tasks == [t1, t2, t3]
assert scheduler.is_finishable() assert scheduler.is_finishable()
result = scheduler.next() result = scheduler.next_task()
assert result == "[] - {'task': 'one'} - one - done" assert result == t1
assert scheduler.tasks == [t2, t3] assert scheduler.tasks == [t2, t3]
assert scheduler.done == ["one"] assert scheduler.done == ["one"]
result = scheduler.next() result = scheduler.next_task()
assert result == "[] - {'task': 'two'} - two - done" assert result == t2
assert scheduler.tasks == [t3] assert scheduler.tasks == [t3]
assert scheduler.done == ["one", "two"] assert scheduler.done == ["one", "two"]
result = scheduler.next() result = scheduler.next_task()
assert result == "[] - {'task': 'three'} - three - done" assert result == t3
assert scheduler.tasks == [] assert scheduler.tasks == []
assert scheduler.done == ["one", "two", "three"] assert scheduler.done == ["one", "two", "three"]
def test_schedule_multiple_tasks_with_dependencies(): def test_schedule_multiple_tasks_with_dependencies():
scheduler = Scheduler(actions) scheduler = Scheduler()
t1 = Task(action="DO", args={"task": "one"}, deps=["three"], output="one") t1 = Task(action="FOO", args={"task": "one"}, deps=["three"], output="one")
t2 = Task(action="DO", args={"task": "two"}, deps=["one"], output="two") t2 = Task(action="FOO", args={"task": "two"}, deps=["one"], output="two")
t3 = Task(action="DO", args={"task": "three"}, deps=[], output="three") t3 = Task(action="FOO", args={"task": "three"}, deps=[], output="three")
scheduler.append([t1, t2, t3]) scheduler.append([t1, t2, t3])
assert scheduler.doable_tasks == [t3] assert scheduler.doable_tasks == [t3]
assert scheduler.is_finishable() assert scheduler.is_finishable()
result = scheduler.next() result = scheduler.next_task()
assert result == "[] - {'task': 'three'} - three - done" assert result == t3
assert scheduler.tasks == [t1, t2] assert scheduler.tasks == [t1, t2]
assert scheduler.done == ["three"] assert scheduler.done == ["three"]
assert scheduler.doable_tasks == [t1] assert scheduler.doable_tasks == [t1]
result = scheduler.next() result = scheduler.next_task()
assert result == "['three'] - {'task': 'one'} - one - done" assert result == t1
assert scheduler.tasks == [t2] assert scheduler.tasks == [t2]
assert scheduler.done == ["three", "one"] assert scheduler.done == ["three", "one"]
assert scheduler.doable_tasks == [t2] assert scheduler.doable_tasks == [t2]
result = scheduler.next() result = scheduler.next_task()
assert result == "['one'] - {'task': 'two'} - two - done" assert result == t2
assert scheduler.tasks == [] assert scheduler.tasks == []
assert scheduler.done == ["three", "one", "two"] assert scheduler.done == ["three", "one", "two"]
def test_schedule_multiple_tasks_with_dependencies_loop():
scheduler = Scheduler()
t1 = Task(action="FOO", args={"task": "one"}, deps=["three"], output="one")
t2 = Task(action="FOO", args={"task": "two"}, deps=["one"], output="two")
t3 = Task(action="FOO", args={"task": "three"}, deps=[], output="three")
scheduler.append([t1, t2, t3])
ordered_tasks = []
for task in scheduler.backlog():
ordered_tasks.append(task)
assert ordered_tasks == [t3, t1, t2]
def test_schedule_empty_task(): def test_schedule_empty_task():
scheduler = Scheduler(actions) scheduler = Scheduler()
scheduler.append([]) scheduler.append([])
with pytest.raises(StopIteration): with pytest.raises(StopIteration):
scheduler.next() scheduler.next_task()
def test_schedule_multiple_tasks_with_undoable_dependencies(): def test_schedule_multiple_tasks_with_undoable_dependencies():
scheduler = Scheduler(actions) scheduler = Scheduler()
t1 = Task(action="DO", args={"task": "one"}, deps=["three"], output="one") t1 = Task(action="FOO", args={"task": "one"}, deps=["three"], output="one")
t2 = Task(action="DO", args={"task": "two"}, deps=[], output="two") t2 = Task(action="FOO", args={"task": "two"}, deps=[], output="two")
scheduler.append([t1, t2]) scheduler.append([t1, t2])
assert scheduler.doable_tasks == [t2] assert scheduler.doable_tasks == [t2]
assert not scheduler.is_finishable() assert not scheduler.is_finishable()
scheduler.run() for _ in scheduler.backlog():
pass
assert scheduler.tasks == [t1] assert scheduler.tasks == [t1]
assert scheduler.done == ["two"] assert scheduler.done == ["two"]
assert scheduler.doable_tasks == [] assert scheduler.doable_tasks == []