From b455ef23c41631956f328513ab19e8bf360d813f Mon Sep 17 00:00:00 2001 From: Bertrand Benjamin Date: Wed, 13 Apr 2022 15:09:08 +0200 Subject: [PATCH] Feat: Integrate message in service, scheduler and dispatcher --- bopytex/message.py | 3 ++ bopytex/scheduler.py | 12 ++++-- bopytex/service.py | 6 +-- bopytex/worker/__init__.py | 13 ++----- test/fakes/__init__.py | 0 test/fakes/dispatcher.py | 10 +++++ test/fakes/planner.py | 9 +++++ test/fakes/workers.py | 13 +++++++ test/test_scheduler.py | 78 +++++++++++++++++++++----------------- test/test_service.py | 8 ++-- 10 files changed, 100 insertions(+), 52 deletions(-) create mode 100644 test/fakes/__init__.py create mode 100644 test/fakes/dispatcher.py create mode 100644 test/fakes/planner.py create mode 100644 test/fakes/workers.py diff --git a/bopytex/message.py b/bopytex/message.py index bdfb69c..0c2a54f 100644 --- a/bopytex/message.py +++ b/bopytex/message.py @@ -16,6 +16,9 @@ class Message(): def err(self): return self._err + def __repr__(self): + return f"Message(status={self.status}, out={self.out}, err={self.err})" + class SubprocessMessage(Message): def __init__(self, process): self._process = process diff --git a/bopytex/scheduler.py b/bopytex/scheduler.py index 1004136..ab8b518 100644 --- a/bopytex/scheduler.py +++ b/bopytex/scheduler.py @@ -2,12 +2,14 @@ from bopytex.tasks import Task +from bopytex.worker import Dispatcher class Scheduler: """Scheduler is responsible of getting tasks (the tasks) and yield those that can be done""" - def __init__(self, done: list[str] = None): + def __init__(self, dispatcher:Dispatcher, done: list[str] = None): + self._dispatcher = dispatcher if done is None: self._done = [] @@ -56,9 +58,13 @@ class Scheduler: except IndexError: raise StopIteration - self._done.append(task.output) self._tasks.remove(task) - return task + message = self._dispatcher(task) + + if message.status == 0: + self._done.append(task.output) + + return message def backlog(self): """ Yield tasks sorted according to dependencies """ diff --git a/bopytex/service.py b/bopytex/service.py index 217ed67..4e1a429 100755 --- a/bopytex/service.py +++ b/bopytex/service.py @@ -15,11 +15,11 @@ def orcherstrator( ): tasks = planner(options) - scheduler = Scheduler([options["template"]]) + scheduler = Scheduler(dispatcher, [options["template"]]) scheduler.append(tasks) - for task in scheduler.backlog(): - yield from dispatcher(task) + for message in scheduler.backlog(): + yield message # ----------------------------- diff --git a/bopytex/worker/__init__.py b/bopytex/worker/__init__.py index fa48f63..cf084dd 100644 --- a/bopytex/worker/__init__.py +++ b/bopytex/worker/__init__.py @@ -10,14 +10,9 @@ class Dispatcher: try: choosen_action = self._actions[task.action] except KeyError: - raise ActionNotFound(f"The action {task.action} is not in {self._actions.keys()}") + raise ActionNotFound( + f"The action {task.action} is not in {self._actions.keys()}" + ) - return choosen_action( - args=task.args, - deps=task.deps, - output=task.output - ) - -def fake_worker(args, deps, output): - yield f"FAKE - {args} - {deps} - {output}" + return choosen_action(args=task.args, deps=task.deps, output=task.output) diff --git a/test/fakes/__init__.py b/test/fakes/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/test/fakes/dispatcher.py b/test/fakes/dispatcher.py new file mode 100644 index 0000000..a0fa672 --- /dev/null +++ b/test/fakes/dispatcher.py @@ -0,0 +1,10 @@ +from bopytex.worker import Dispatcher +from .workers import fake_worker, success_worker, fail_worker + +fake_dispatcher = Dispatcher( + { + "FAKE": fake_worker, + "SUCCESS": success_worker, + "FAILURE": fail_worker, + } +) diff --git a/test/fakes/planner.py b/test/fakes/planner.py new file mode 100644 index 0000000..dd50afe --- /dev/null +++ b/test/fakes/planner.py @@ -0,0 +1,9 @@ +from bopytex.tasks import Task + + +def simple(options: dict) -> list[Task]: + """Simple planner with options['quantity'] tasks and no dependencies""" + return [ + Task("FAKE", args={"number": i}, deps=[], output=f"{i}") + for i in range(options["quantity"]) + ] diff --git a/test/fakes/workers.py b/test/fakes/workers.py new file mode 100644 index 0000000..fbae507 --- /dev/null +++ b/test/fakes/workers.py @@ -0,0 +1,13 @@ +from bopytex.message import Message + +def fake_worker(args, deps, output): + return Message(0, [f"FAKE - {args} - {deps} - {output}"], []) + + +def success_worker(args, deps, output): + return Message(0, [f"SUCCESS - {args} - {deps} - {output}"], []) + + +def fail_worker(): + return Message(1, [f"FAILURE - {args} - {deps} - {output}"], []) + diff --git a/test/test_scheduler.py b/test/test_scheduler.py index 4ac1475..f293cba 100644 --- a/test/test_scheduler.py +++ b/test/test_scheduler.py @@ -1,13 +1,15 @@ +from bopytex.message import Message from bopytex.tasks import Task from bopytex.scheduler import Scheduler +from .fakes.dispatcher import fake_dispatcher import pytest def test_schedule_append(): - scheduler = Scheduler() + scheduler = Scheduler(dispatcher=fake_dispatcher) tasks = [ - Task(action="FOO", args={}, deps=["dep1", "dep2"], output="end1"), - Task(action="FOO", args={}, deps=["dep1", "dep3"], output="end2"), + Task(action="FAKE", args={}, deps=["dep1", "dep2"], output="end1"), + Task(action="FAKE", args={}, deps=["dep1", "dep3"], output="end2"), ] scheduler.append(tasks) assert scheduler.tasks == tasks @@ -16,106 +18,114 @@ def test_schedule_append(): def test_schedule_one_task(): - scheduler = Scheduler() - tasks = [Task(action="FOO", args={}, deps=[], output="end")] + scheduler = Scheduler(dispatcher=fake_dispatcher) + tasks = [Task(action="FAKE", args={}, deps=[], output="end")] scheduler.append(tasks) assert scheduler.doable_tasks == tasks result = scheduler.next_task() - assert result == tasks[0] + assert result.status == 0 + assert result.out == ["FAKE - {} - [] - end"] + assert result.err == [] + assert scheduler.tasks == [] assert scheduler.done == ["end"] def test_schedule_one_task_with_args(): - scheduler = Scheduler() - tasks = [Task(action="FOO", args={"task": "one"}, deps=[], output="one")] + scheduler = Scheduler(dispatcher=fake_dispatcher) + tasks = [Task(action="FAKE", args={"task": "one"}, deps=[], output="one")] scheduler.append(tasks) result = scheduler.next_task() - assert result == tasks[0] + assert result.status == 0 + assert result.out == ["FAKE - {'task': 'one'} - [] - one"] + assert result.err == [] + assert scheduler.tasks == [] assert scheduler.done == ["one"] def test_schedule_multiple_tasks(): - scheduler = Scheduler() - t1 = Task(action="FOO", args={"task": "one"}, deps=[], output="one") - t2 = Task(action="FOO", args={"task": "two"}, deps=[], output="two") - t3 = Task(action="FOO", args={"task": "three"}, deps=[], output="three") + scheduler = Scheduler(dispatcher=fake_dispatcher) + t1 = Task(action="FAKE", args={"task": "one"}, deps=[], output="one") + t2 = Task(action="FAKE", args={"task": "two"}, deps=[], output="two") + t3 = Task(action="FAKE", args={"task": "three"}, deps=[], output="three") scheduler.append([t1, t2, t3]) assert scheduler.doable_tasks == [t1, t2, t3] assert scheduler.is_finishable() result = scheduler.next_task() - assert result == t1 + assert result.status == 0 assert scheduler.tasks == [t2, t3] assert scheduler.done == ["one"] result = scheduler.next_task() - assert result == t2 + assert result.status == 0 assert scheduler.tasks == [t3] assert scheduler.done == ["one", "two"] result = scheduler.next_task() - assert result == t3 + assert result.status == 0 assert scheduler.tasks == [] assert scheduler.done == ["one", "two", "three"] def test_schedule_multiple_tasks_with_dependencies(): - scheduler = Scheduler() - t1 = Task(action="FOO", args={"task": "one"}, deps=["three"], output="one") - t2 = Task(action="FOO", args={"task": "two"}, deps=["one"], output="two") - t3 = Task(action="FOO", args={"task": "three"}, deps=[], output="three") + scheduler = Scheduler(dispatcher=fake_dispatcher) + t1 = Task(action="FAKE", args={"task": "one"}, deps=["three"], output="one") + t2 = Task(action="FAKE", args={"task": "two"}, deps=["one"], output="two") + t3 = Task(action="FAKE", args={"task": "three"}, deps=[], output="three") scheduler.append([t1, t2, t3]) assert scheduler.doable_tasks == [t3] assert scheduler.is_finishable() result = scheduler.next_task() - assert result == t3 + assert result.status == 0 assert scheduler.tasks == [t1, t2] assert scheduler.done == ["three"] assert scheduler.doable_tasks == [t1] result = scheduler.next_task() - assert result == t1 + assert result.status == 0 assert scheduler.tasks == [t2] assert scheduler.done == ["three", "one"] assert scheduler.doable_tasks == [t2] result = scheduler.next_task() - assert result == t2 + assert result.status == 0 assert scheduler.tasks == [] assert scheduler.done == ["three", "one", "two"] + def test_schedule_multiple_tasks_with_dependencies_loop(): - scheduler = Scheduler() - t1 = Task(action="FOO", args={"task": "one"}, deps=["three"], output="one") - t2 = Task(action="FOO", args={"task": "two"}, deps=["one"], output="two") - t3 = Task(action="FOO", args={"task": "three"}, deps=[], output="three") + scheduler = Scheduler(dispatcher=fake_dispatcher) + t1 = Task(action="FAKE", args={"task": "one"}, deps=["three"], output="one") + t2 = Task(action="FAKE", args={"task": "two"}, deps=["one"], output="two") + t3 = Task(action="FAKE", args={"task": "three"}, deps=[], output="three") scheduler.append([t1, t2, t3]) - ordered_tasks = [] for task in scheduler.backlog(): - ordered_tasks.append(task) - assert ordered_tasks == [t3, t1, t2] + pass + + assert scheduler.done == ["three", "one", "two"] + def test_schedule_empty_task(): - scheduler = Scheduler() + scheduler = Scheduler(dispatcher=fake_dispatcher) scheduler.append([]) with pytest.raises(StopIteration): scheduler.next_task() def test_schedule_multiple_tasks_with_undoable_dependencies(): - scheduler = Scheduler() - t1 = Task(action="FOO", args={"task": "one"}, deps=["three"], output="one") - t2 = Task(action="FOO", args={"task": "two"}, deps=[], output="two") + scheduler = Scheduler(dispatcher=fake_dispatcher) + t1 = Task(action="FAKE", args={"task": "one"}, deps=["three"], output="one") + t2 = Task(action="FAKE", args={"task": "two"}, deps=[], output="two") scheduler.append([t1, t2]) assert scheduler.doable_tasks == [t2] diff --git a/test/test_service.py b/test/test_service.py index 2e6970a..80138f3 100644 --- a/test/test_service.py +++ b/test/test_service.py @@ -1,7 +1,8 @@ from bopytex.planner import fake_planner from bopytex.service import orcherstrator from bopytex.tasks import Task -from bopytex.worker import Dispatcher, fake_worker +from bopytex.worker import Dispatcher +from .fakes.workers import fake_worker def test_service(): @@ -10,5 +11,6 @@ def test_service(): service = orcherstrator(options, fake_planner.simple, dispatcher) - for i, task_output in enumerate(service): - assert task_output == f"FAKE - {{'number': {i}}} - [] - {i}" + for i, message in enumerate(service): + assert message.status == 0 + assert message.out == [f"FAKE - {{'number': {i}}} - [] - {i}"]