Feat: Integrate message in service, scheduler and dispatcher

This commit is contained in:
Bertrand Benjamin 2022-04-13 15:09:08 +02:00
parent 9c05ef1551
commit b455ef23c4
10 changed files with 100 additions and 52 deletions

View File

@ -16,6 +16,9 @@ class Message():
def err(self): def err(self):
return self._err return self._err
def __repr__(self):
return f"Message(status={self.status}, out={self.out}, err={self.err})"
class SubprocessMessage(Message): class SubprocessMessage(Message):
def __init__(self, process): def __init__(self, process):
self._process = process self._process = process

View File

@ -2,12 +2,14 @@
from bopytex.tasks import Task from bopytex.tasks import Task
from bopytex.worker import Dispatcher
class Scheduler: class Scheduler:
"""Scheduler is responsible of getting tasks (the tasks) and yield those that can be done""" """Scheduler is responsible of getting tasks (the tasks) and yield those that can be done"""
def __init__(self, done: list[str] = None): def __init__(self, dispatcher:Dispatcher, done: list[str] = None):
self._dispatcher = dispatcher
if done is None: if done is None:
self._done = [] self._done = []
@ -56,9 +58,13 @@ class Scheduler:
except IndexError: except IndexError:
raise StopIteration raise StopIteration
self._done.append(task.output)
self._tasks.remove(task) self._tasks.remove(task)
return task message = self._dispatcher(task)
if message.status == 0:
self._done.append(task.output)
return message
def backlog(self): def backlog(self):
""" Yield tasks sorted according to dependencies """ """ Yield tasks sorted according to dependencies """

View File

@ -15,11 +15,11 @@ def orcherstrator(
): ):
tasks = planner(options) tasks = planner(options)
scheduler = Scheduler([options["template"]]) scheduler = Scheduler(dispatcher, [options["template"]])
scheduler.append(tasks) scheduler.append(tasks)
for task in scheduler.backlog(): for message in scheduler.backlog():
yield from dispatcher(task) yield message
# ----------------------------- # -----------------------------

View File

@ -10,14 +10,9 @@ class Dispatcher:
try: try:
choosen_action = self._actions[task.action] choosen_action = self._actions[task.action]
except KeyError: except KeyError:
raise ActionNotFound(f"The action {task.action} is not in {self._actions.keys()}") raise ActionNotFound(
f"The action {task.action} is not in {self._actions.keys()}"
)
return choosen_action( return choosen_action(args=task.args, deps=task.deps, output=task.output)
args=task.args,
deps=task.deps,
output=task.output
)
def fake_worker(args, deps, output):
yield f"FAKE - {args} - {deps} - {output}"

0
test/fakes/__init__.py Normal file
View File

10
test/fakes/dispatcher.py Normal file
View File

@ -0,0 +1,10 @@
from bopytex.worker import Dispatcher
from .workers import fake_worker, success_worker, fail_worker
fake_dispatcher = Dispatcher(
{
"FAKE": fake_worker,
"SUCCESS": success_worker,
"FAILURE": fail_worker,
}
)

9
test/fakes/planner.py Normal file
View File

@ -0,0 +1,9 @@
from bopytex.tasks import Task
def simple(options: dict) -> list[Task]:
"""Simple planner with options['quantity'] tasks and no dependencies"""
return [
Task("FAKE", args={"number": i}, deps=[], output=f"{i}")
for i in range(options["quantity"])
]

13
test/fakes/workers.py Normal file
View File

@ -0,0 +1,13 @@
from bopytex.message import Message
def fake_worker(args, deps, output):
return Message(0, [f"FAKE - {args} - {deps} - {output}"], [])
def success_worker(args, deps, output):
return Message(0, [f"SUCCESS - {args} - {deps} - {output}"], [])
def fail_worker():
return Message(1, [f"FAILURE - {args} - {deps} - {output}"], [])

View File

@ -1,13 +1,15 @@
from bopytex.message import Message
from bopytex.tasks import Task from bopytex.tasks import Task
from bopytex.scheduler import Scheduler from bopytex.scheduler import Scheduler
from .fakes.dispatcher import fake_dispatcher
import pytest import pytest
def test_schedule_append(): def test_schedule_append():
scheduler = Scheduler() scheduler = Scheduler(dispatcher=fake_dispatcher)
tasks = [ tasks = [
Task(action="FOO", args={}, deps=["dep1", "dep2"], output="end1"), Task(action="FAKE", args={}, deps=["dep1", "dep2"], output="end1"),
Task(action="FOO", args={}, deps=["dep1", "dep3"], output="end2"), Task(action="FAKE", args={}, deps=["dep1", "dep3"], output="end2"),
] ]
scheduler.append(tasks) scheduler.append(tasks)
assert scheduler.tasks == tasks assert scheduler.tasks == tasks
@ -16,106 +18,114 @@ def test_schedule_append():
def test_schedule_one_task(): def test_schedule_one_task():
scheduler = Scheduler() scheduler = Scheduler(dispatcher=fake_dispatcher)
tasks = [Task(action="FOO", args={}, deps=[], output="end")] tasks = [Task(action="FAKE", args={}, deps=[], output="end")]
scheduler.append(tasks) scheduler.append(tasks)
assert scheduler.doable_tasks == tasks assert scheduler.doable_tasks == tasks
result = scheduler.next_task() result = scheduler.next_task()
assert result == tasks[0] assert result.status == 0
assert result.out == ["FAKE - {} - [] - end"]
assert result.err == []
assert scheduler.tasks == [] assert scheduler.tasks == []
assert scheduler.done == ["end"] assert scheduler.done == ["end"]
def test_schedule_one_task_with_args(): def test_schedule_one_task_with_args():
scheduler = Scheduler() scheduler = Scheduler(dispatcher=fake_dispatcher)
tasks = [Task(action="FOO", args={"task": "one"}, deps=[], output="one")] tasks = [Task(action="FAKE", args={"task": "one"}, deps=[], output="one")]
scheduler.append(tasks) scheduler.append(tasks)
result = scheduler.next_task() result = scheduler.next_task()
assert result == tasks[0] assert result.status == 0
assert result.out == ["FAKE - {'task': 'one'} - [] - one"]
assert result.err == []
assert scheduler.tasks == [] assert scheduler.tasks == []
assert scheduler.done == ["one"] assert scheduler.done == ["one"]
def test_schedule_multiple_tasks(): def test_schedule_multiple_tasks():
scheduler = Scheduler() scheduler = Scheduler(dispatcher=fake_dispatcher)
t1 = Task(action="FOO", args={"task": "one"}, deps=[], output="one") t1 = Task(action="FAKE", args={"task": "one"}, deps=[], output="one")
t2 = Task(action="FOO", args={"task": "two"}, deps=[], output="two") t2 = Task(action="FAKE", args={"task": "two"}, deps=[], output="two")
t3 = Task(action="FOO", args={"task": "three"}, deps=[], output="three") t3 = Task(action="FAKE", args={"task": "three"}, deps=[], output="three")
scheduler.append([t1, t2, t3]) scheduler.append([t1, t2, t3])
assert scheduler.doable_tasks == [t1, t2, t3] assert scheduler.doable_tasks == [t1, t2, t3]
assert scheduler.is_finishable() assert scheduler.is_finishable()
result = scheduler.next_task() result = scheduler.next_task()
assert result == t1 assert result.status == 0
assert scheduler.tasks == [t2, t3] assert scheduler.tasks == [t2, t3]
assert scheduler.done == ["one"] assert scheduler.done == ["one"]
result = scheduler.next_task() result = scheduler.next_task()
assert result == t2 assert result.status == 0
assert scheduler.tasks == [t3] assert scheduler.tasks == [t3]
assert scheduler.done == ["one", "two"] assert scheduler.done == ["one", "two"]
result = scheduler.next_task() result = scheduler.next_task()
assert result == t3 assert result.status == 0
assert scheduler.tasks == [] assert scheduler.tasks == []
assert scheduler.done == ["one", "two", "three"] assert scheduler.done == ["one", "two", "three"]
def test_schedule_multiple_tasks_with_dependencies(): def test_schedule_multiple_tasks_with_dependencies():
scheduler = Scheduler() scheduler = Scheduler(dispatcher=fake_dispatcher)
t1 = Task(action="FOO", args={"task": "one"}, deps=["three"], output="one") t1 = Task(action="FAKE", args={"task": "one"}, deps=["three"], output="one")
t2 = Task(action="FOO", args={"task": "two"}, deps=["one"], output="two") t2 = Task(action="FAKE", args={"task": "two"}, deps=["one"], output="two")
t3 = Task(action="FOO", args={"task": "three"}, deps=[], output="three") t3 = Task(action="FAKE", args={"task": "three"}, deps=[], output="three")
scheduler.append([t1, t2, t3]) scheduler.append([t1, t2, t3])
assert scheduler.doable_tasks == [t3] assert scheduler.doable_tasks == [t3]
assert scheduler.is_finishable() assert scheduler.is_finishable()
result = scheduler.next_task() result = scheduler.next_task()
assert result == t3 assert result.status == 0
assert scheduler.tasks == [t1, t2] assert scheduler.tasks == [t1, t2]
assert scheduler.done == ["three"] assert scheduler.done == ["three"]
assert scheduler.doable_tasks == [t1] assert scheduler.doable_tasks == [t1]
result = scheduler.next_task() result = scheduler.next_task()
assert result == t1 assert result.status == 0
assert scheduler.tasks == [t2] assert scheduler.tasks == [t2]
assert scheduler.done == ["three", "one"] assert scheduler.done == ["three", "one"]
assert scheduler.doable_tasks == [t2] assert scheduler.doable_tasks == [t2]
result = scheduler.next_task() result = scheduler.next_task()
assert result == t2 assert result.status == 0
assert scheduler.tasks == [] assert scheduler.tasks == []
assert scheduler.done == ["three", "one", "two"] assert scheduler.done == ["three", "one", "two"]
def test_schedule_multiple_tasks_with_dependencies_loop(): def test_schedule_multiple_tasks_with_dependencies_loop():
scheduler = Scheduler() scheduler = Scheduler(dispatcher=fake_dispatcher)
t1 = Task(action="FOO", args={"task": "one"}, deps=["three"], output="one") t1 = Task(action="FAKE", args={"task": "one"}, deps=["three"], output="one")
t2 = Task(action="FOO", args={"task": "two"}, deps=["one"], output="two") t2 = Task(action="FAKE", args={"task": "two"}, deps=["one"], output="two")
t3 = Task(action="FOO", args={"task": "three"}, deps=[], output="three") t3 = Task(action="FAKE", args={"task": "three"}, deps=[], output="three")
scheduler.append([t1, t2, t3]) scheduler.append([t1, t2, t3])
ordered_tasks = []
for task in scheduler.backlog(): for task in scheduler.backlog():
ordered_tasks.append(task) pass
assert ordered_tasks == [t3, t1, t2]
assert scheduler.done == ["three", "one", "two"]
def test_schedule_empty_task(): def test_schedule_empty_task():
scheduler = Scheduler() scheduler = Scheduler(dispatcher=fake_dispatcher)
scheduler.append([]) scheduler.append([])
with pytest.raises(StopIteration): with pytest.raises(StopIteration):
scheduler.next_task() scheduler.next_task()
def test_schedule_multiple_tasks_with_undoable_dependencies(): def test_schedule_multiple_tasks_with_undoable_dependencies():
scheduler = Scheduler() scheduler = Scheduler(dispatcher=fake_dispatcher)
t1 = Task(action="FOO", args={"task": "one"}, deps=["three"], output="one") t1 = Task(action="FAKE", args={"task": "one"}, deps=["three"], output="one")
t2 = Task(action="FOO", args={"task": "two"}, deps=[], output="two") t2 = Task(action="FAKE", args={"task": "two"}, deps=[], output="two")
scheduler.append([t1, t2]) scheduler.append([t1, t2])
assert scheduler.doable_tasks == [t2] assert scheduler.doable_tasks == [t2]

View File

@ -1,7 +1,8 @@
from bopytex.planner import fake_planner from bopytex.planner import fake_planner
from bopytex.service import orcherstrator from bopytex.service import orcherstrator
from bopytex.tasks import Task from bopytex.tasks import Task
from bopytex.worker import Dispatcher, fake_worker from bopytex.worker import Dispatcher
from .fakes.workers import fake_worker
def test_service(): def test_service():
@ -10,5 +11,6 @@ def test_service():
service = orcherstrator(options, fake_planner.simple, dispatcher) service = orcherstrator(options, fake_planner.simple, dispatcher)
for i, task_output in enumerate(service): for i, message in enumerate(service):
assert task_output == f"FAKE - {{'number': {i}}} - [] - {i}" assert message.status == 0
assert message.out == [f"FAKE - {{'number': {i}}} - [] - {i}"]