""" Scheduler for action to make """ from bopytex.tasks import Task class Scheduler: def __init__(self, actions: list, done: list[str] = None): self.actions = actions if done is None: self._done = [] else: self._done = done self._tasks = [] @property def tasks(self): return self._tasks @property def all_deps(self): return {d for task in self.tasks for d in task.deps} @property def all_output(self): return {task.output for task in self.tasks} @property def done(self): return self._done def append(self, tasks: list[Task]): self._tasks += tasks def dispatch(self, task): """Do a task""" ans = self.actions[task.action](task.deps, task.args, task.output) return ans def __iter__(self): return self def __next__(self): return self.next() def next(self): undoable = [] try: task = self._tasks.pop(0) except IndexError: raise StopIteration while not all([d in self.done for d in task.deps]): undoable.append(task) try: task = self._tasks.pop(0) except IndexError: self.append(undoable) raise StopIteration self.append(undoable) ans = self.dispatch(task) self._done.append(task.output) return ans def run(self): for _ in self: pass def is_finishable(self): return self.all_deps.issubset(self.all_output)