28 lines
808 B
Python
28 lines
808 B
Python
from bopytex.planner import Task
|
|
from bopytex.scheduler import Scheduler
|
|
|
|
|
|
def test_schedule_append():
|
|
actions = {"DO": lambda _: "done"}
|
|
scheduler = Scheduler(actions)
|
|
tasks = [Task(action="DO", args={}, deps=[])]
|
|
scheduler.append(tasks)
|
|
assert scheduler.tasks == tasks
|
|
|
|
def test_schedule_dispatch():
|
|
actions = {"DO": lambda _: "done"}
|
|
scheduler = Scheduler(actions)
|
|
task = Task(action="DO", args={}, deps=[])
|
|
result = scheduler.dispatch(task)
|
|
assert result == "done"
|
|
|
|
|
|
def test_schedule_one_task():
|
|
actions = {"DO": lambda _: "done"}
|
|
scheduler = Scheduler(actions)
|
|
scheduler.append([Task(action="DO", args={}, deps=[])])
|
|
result = scheduler.__next__()
|
|
assert result == "done"
|
|
assert scheduler.tasks == []
|
|
assert scheduler.done == ["done"]
|