Bopytex/test/test_scheduler.py

39 lines
1.2 KiB
Python

from bopytex.planner import Task
from bopytex.scheduler import Scheduler
def test_schedule_append():
actions = {"DO": lambda deps, args: "done"}
scheduler = Scheduler(actions)
tasks = [Task(action="DO", args={}, deps=[])]
scheduler.append(tasks)
assert scheduler.tasks == tasks
def test_schedule_dispatch():
actions = {"DO": lambda deps, args: "done"}
scheduler = Scheduler(actions)
task = Task(action="DO", args={}, deps=[])
result = scheduler.dispatch(task)
assert result == "done"
def test_schedule_one_task():
actions = {"DO": lambda deps, args: "done"}
scheduler = Scheduler(actions)
scheduler.append([Task(action="DO", args={}, deps=[])])
result = scheduler.__next__()
assert result == "done"
assert scheduler.tasks == []
assert scheduler.done == ["done"]
def test_schedule_one_task_with_args():
actions = {"DO": lambda deps, args: f"{args['task']} done"}
scheduler = Scheduler(actions)
scheduler.append([Task(action="DO", args={"task": "one"}, deps=[])])
result = scheduler.__next__()
assert result == "one done"
assert scheduler.tasks == []
assert scheduler.done == ["one done"]