Feat: reactivate output in task

This commit is contained in:
2022-04-09 06:32:56 +02:00
parent 211cee2f4f
commit 4d76dc8992
4 changed files with 57 additions and 52 deletions

View File

@@ -3,7 +3,7 @@ from bopytex.planner import activate_corr_on, clean, compile_pdf, generate, join
def test_build_task_generate():
template = "tpl_source.tex"
task = generate(template, {"subject": "01"})
task = generate(template, {"subject": "01"}, output="source.tex")
assert task.action == "GENERATE"
assert task.args == {"subject": "01"}
assert task.deps == [template]
@@ -11,7 +11,7 @@ def test_build_task_generate():
def test_build_task_activate_corr_on():
src = "source.tex"
task = activate_corr_on(src)
task = activate_corr_on(src, output="corr_source.tex")
assert task.action == "ACTIVATE_CORR"
assert task.args == {}
assert task.deps == [src]
@@ -19,7 +19,7 @@ def test_build_task_activate_corr_on():
def test_build_task_compile():
src = "source.tex"
task = compile_pdf(src)
task = compile_pdf(src, output="source.pdf")
assert task.action == "COMPILE"
assert task.args == {}
assert task.deps == [src]
@@ -27,13 +27,13 @@ def test_build_task_compile():
def test_build_task_join():
pdfs = [f"{i}_source.pdf" for i in range(3)]
task = join_pdfs(pdfs)
task = join_pdfs(pdfs, output="joined.pdf")
assert task.action == "JOIN"
assert task.args == {}
assert task.deps == pdfs
def test_build_task_compile():
def test_build_task_clean():
files = ["source.aux", "source.log"]
task = clean(files)
assert task.action == "CLEAN"

View File

@@ -3,91 +3,91 @@ from bopytex.scheduler import Scheduler
import pytest
def action_done(deps, args, output):
return f"{deps} - {args} - {output} - done"
actions = {"DO": action_done}
def test_schedule_append():
actions = {"DO": lambda deps, args: "done"}
scheduler = Scheduler(actions)
tasks = [Task(action="DO", args={}, deps=[])]
tasks = [Task(action="DO", args={}, deps=[], output="end")]
scheduler.append(tasks)
assert scheduler.tasks == tasks
def test_schedule_dispatch():
actions = {"DO": lambda deps, args: "done"}
scheduler = Scheduler(actions)
task = Task(action="DO", args={}, deps=[])
task = Task(action="DO", args={}, deps=[], output="end")
result = scheduler.dispatch(task)
assert result == "done"
assert result == "[] - {} - end - done"
def test_schedule_one_task():
actions = {"DO": lambda deps, args: "done"}
scheduler = Scheduler(actions)
scheduler.append([Task(action="DO", args={}, deps=[])])
scheduler.append([Task(action="DO", args={}, deps=[], output="end")])
result = scheduler.next()
assert result == "done"
assert result == "[] - {} - end - done"
assert scheduler.tasks == []
assert scheduler.done == ["done"]
assert scheduler.done == ["end"]
def test_schedule_one_task_with_args():
actions = {"DO": lambda deps, args: f"{args['task']} done"}
scheduler = Scheduler(actions)
scheduler.append([Task(action="DO", args={"task": "one"}, deps=[])])
scheduler.append([Task(action="DO", args={"task": "one"}, deps=[], output="one")])
result = scheduler.next()
assert result == "one done"
assert result == "[] - {'task': 'one'} - one - done"
assert scheduler.tasks == []
assert scheduler.done == ["one done"]
assert scheduler.done == ["one"]
def test_schedule_multiple_tasks():
actions = {"DO": lambda deps, args: f"{args['task']} done"}
scheduler = Scheduler(actions)
t1 = Task(action="DO", args={"task": "one"}, deps=[])
t2 = Task(action="DO", args={"task": "two"}, deps=[])
t3 = Task(action="DO", args={"task": "three"}, deps=[])
t1 = Task(action="DO", args={"task": "one"}, deps=[], output="one")
t2 = Task(action="DO", args={"task": "two"}, deps=[], output="two")
t3 = Task(action="DO", args={"task": "three"}, deps=[], output="three")
scheduler.append([t1, t2, t3])
result = scheduler.next()
assert result == "one done"
assert result == "[] - {'task': 'one'} - one - done"
assert scheduler.tasks == [t2, t3]
assert scheduler.done == ["one done"]
assert scheduler.done == ["one"]
result = scheduler.next()
assert result == "two done"
assert result == "[] - {'task': 'two'} - two - done"
assert scheduler.tasks == [t3]
assert scheduler.done == ["one done", "two done"]
assert scheduler.done == ["one", "two"]
result = scheduler.next()
assert result == "three done"
assert result == "[] - {'task': 'three'} - three - done"
assert scheduler.tasks == []
assert scheduler.done == ["one done", "two done", "three done"]
assert scheduler.done == ["one", "two", "three"]
def test_schedule_multiple_tasks_with_dependencies():
actions = {"DO": lambda deps, args: f"{args['task']} done"}
scheduler = Scheduler(actions)
t1 = Task(action="DO", args={"task": "one"}, deps=["three done"])
t2 = Task(action="DO", args={"task": "two"}, deps=["one done"])
t3 = Task(action="DO", args={"task": "three"}, deps=[])
t1 = Task(action="DO", args={"task": "one"}, deps=["three"], output="one")
t2 = Task(action="DO", args={"task": "two"}, deps=["one"], output="two")
t3 = Task(action="DO", args={"task": "three"}, deps=[], output="three")
scheduler.append([t1, t2, t3])
result = scheduler.next()
assert result == "three done"
assert result == "[] - {'task': 'three'} - three - done"
assert scheduler.tasks == [t1, t2]
assert scheduler.done == ["three done"]
assert scheduler.done == ["three"]
result = scheduler.next()
assert result == "one done"
assert result == "['three'] - {'task': 'one'} - one - done"
assert scheduler.tasks == [t2]
assert scheduler.done == ["three done", "one done"]
assert scheduler.done == ["three", "one"]
result = scheduler.next()
assert result == "two done"
assert result == "['one'] - {'task': 'two'} - two - done"
assert scheduler.tasks == []
assert scheduler.done == ["three done", "one done", "two done"]
assert scheduler.done == ["three", "one", "two"]
def test_schedule_empty_task():
actions = {"DO": lambda deps, args: f"{args['task']} done"}
scheduler = Scheduler(actions)
scheduler.append([])
with pytest.raises(StopIteration):
@@ -95,12 +95,10 @@ def test_schedule_empty_task():
def test_schedule_multiple_tasks_with_undoable_dependencies():
actions = {"DO": lambda deps, args: f"{args['task']} done"}
scheduler = Scheduler(actions)
t1 = Task(action="DO", args={"task": "one"}, deps=["three done"])
t2 = Task(action="DO", args={"task": "two"}, deps=[])
t1 = Task(action="DO", args={"task": "one"}, deps=["three"], output="one")
t2 = Task(action="DO", args={"task": "two"}, deps=[], output="two")
scheduler.append([t1, t2])
scheduler.run()
assert scheduler.tasks == [t1]
assert scheduler.done == ["two done"]
assert scheduler.done == ["two"]