Merge branch 'decoupled' into dev
All checks were successful
continuous-integration/drone/push Build is passing
All checks were successful
continuous-integration/drone/push Build is passing
This commit is contained in:
@@ -1,10 +1,6 @@
|
||||
#!/usr/bin/env python
|
||||
# encoding: utf-8
|
||||
|
||||
|
||||
#from .bopytex import subject_metadatas, crazy_feed, pdfjoin
|
||||
|
||||
|
||||
# -----------------------------
|
||||
# Reglages pour 'vim'
|
||||
# vim:set autoindent expandtab tabstop=4 shiftwidth=4:
|
||||
|
239
bopytex/bopytex.py
Executable file → Normal file
239
bopytex/bopytex.py
Executable file → Normal file
@@ -1,237 +1,2 @@
|
||||
#!/usr/bin/env python
|
||||
# encoding: utf-8
|
||||
|
||||
"""
|
||||
Producing then compiling templates
|
||||
"""
|
||||
|
||||
import csv
|
||||
import os
|
||||
import logging
|
||||
|
||||
from pathlib import Path
|
||||
import pytex
|
||||
from mapytex import render, Expression, random, stat
|
||||
import bopytex.filters as filters
|
||||
import random as rd
|
||||
|
||||
formatter = logging.Formatter("%(name)s :: %(levelname)s :: %(message)s")
|
||||
steam_handler = logging.StreamHandler()
|
||||
steam_handler.setLevel(logging.DEBUG)
|
||||
steam_handler.setFormatter(formatter)
|
||||
logger = logging.getLogger(__name__)
|
||||
logger.setLevel(logging.DEBUG)
|
||||
logger.addHandler(steam_handler)
|
||||
|
||||
|
||||
def setup():
|
||||
render.set_render("tex")
|
||||
#logger.debug(f"Render for Expression is {Expression.RENDER}")
|
||||
mapytex_tools = {
|
||||
"Expression": Expression,
|
||||
"rdm": random,
|
||||
"stat": stat,
|
||||
"random": rd,
|
||||
}
|
||||
pytex.update_export_dict(mapytex_tools)
|
||||
|
||||
pytex.add_filter("calculus", filters.do_calculus)
|
||||
|
||||
|
||||
def get_working_dir(options):
|
||||
""" Get the working directory """
|
||||
if options["working_dir"]:
|
||||
working_dir = Path(options["working_dir"])
|
||||
else:
|
||||
try:
|
||||
template = Path(options["template"])
|
||||
except TypeError:
|
||||
raise ValueError(
|
||||
"Need to set the working directory \
|
||||
or to give a template"
|
||||
)
|
||||
else:
|
||||
working_dir = template.parent
|
||||
logger.debug(f"The output directory will be {working_dir}")
|
||||
return working_dir
|
||||
|
||||
|
||||
def activate_printanswers(
|
||||
texfile, noans=r"solution/print = false", ans=r"solution/print = true"
|
||||
):
|
||||
""" Activate printanswers mod in texfile """
|
||||
output_fname = "corr_" + texfile
|
||||
with open(texfile, "r") as input_f:
|
||||
with open(output_fname, "w") as output_f:
|
||||
for line in input_f.readlines():
|
||||
output_f.write(line.replace(noans, ans))
|
||||
return output_fname
|
||||
|
||||
|
||||
def deactivate_printanswers(corr_fname):
|
||||
""" Activate printanswers mod in texfile """
|
||||
Path(corr_fname).remove()
|
||||
|
||||
|
||||
def pdfjoin(pdf_files, destname, working_dir=".", rm_pdfs=1):
|
||||
"""TODO: Docstring for pdfjoin.
|
||||
|
||||
:param pdf_files: list of pdf files to join
|
||||
:param destname: name for joined pdf
|
||||
:param working_dir: the working directory
|
||||
:param rm_pdfs: Remove pdf_files after joining them
|
||||
:returns: TODO
|
||||
|
||||
"""
|
||||
joined_pdfs = Path(working_dir) / Path(destname)
|
||||
pdf_files_str = " ".join(pdf_files)
|
||||
pdfjam = f"pdfjam {pdf_files_str} -o {joined_pdfs}"
|
||||
logger.debug(f"Run {pdfjam}")
|
||||
logger.info("Joining pdf files")
|
||||
os.system(pdfjam)
|
||||
if rm_pdfs:
|
||||
logger.info(f"Remove {pdf_files_str}")
|
||||
os.system(f"rm {pdf_files_str}")
|
||||
|
||||
|
||||
def extract_student_csv(csv_filename):
|
||||
""" Extract student list from csv_filename """
|
||||
with open(csv_filename, "r") as csvfile:
|
||||
reader = csv.DictReader(csvfile)
|
||||
return [r for r in reader]
|
||||
|
||||
|
||||
def subject_metadatas(options):
|
||||
""" Return metadata on subject to produce
|
||||
|
||||
if csv is given it will based on is
|
||||
otherwise it will be based on quantity
|
||||
|
||||
:example:
|
||||
>>> subject_metadata(10)
|
||||
"""
|
||||
if options["students_csv"]:
|
||||
metadatas = []
|
||||
for (i, s) in enumerate(extract_student_csv(options["students_csv"])):
|
||||
d = {"num": f"{i+1:02d}"}
|
||||
d.update(s)
|
||||
metadatas.append(d)
|
||||
elif options["number_subjects"] > 0:
|
||||
metadatas = [{"num": f"{i+1:02d}"} for i in range(options["number_subjects"])]
|
||||
else:
|
||||
raise ValueError("Need metacsv or quantity to build subject metadata")
|
||||
|
||||
for meta in metadatas:
|
||||
meta.update(
|
||||
{
|
||||
"template": str(Path(options["template"]).name),
|
||||
"texfile": str(Path(options["template"]).name).replace(
|
||||
"tpl", meta["num"]
|
||||
),
|
||||
"directory": str(Path(options["template"]).parent),
|
||||
}
|
||||
)
|
||||
|
||||
return metadatas
|
||||
|
||||
|
||||
def feed(*args, **kwrds):
|
||||
""" Nice and smooth pytex feed """
|
||||
pytex.feed(*args, **kwrds)
|
||||
|
||||
|
||||
def crazy_feed(*args, **kwrds):
|
||||
""" Crazy mod for pytex feed """
|
||||
while True:
|
||||
try:
|
||||
pytex.feed(*args, **kwrds)
|
||||
except:
|
||||
logger.debug(f"Crazy feed is working hard...! {args} {kwrds}")
|
||||
else:
|
||||
break
|
||||
|
||||
|
||||
def clean(directory):
|
||||
pytex.clean(directory)
|
||||
|
||||
|
||||
def texcompile(filename):
|
||||
logger.debug(f"Start compiling {filename}")
|
||||
pytex.pdflatex(Path(filename))
|
||||
logger.debug(f"End compiling")
|
||||
|
||||
|
||||
def produce_and_compile(options):
|
||||
""" Produce and compile subjects
|
||||
"""
|
||||
logger.debug(f"CI parser gets {options}")
|
||||
|
||||
template = Path(options["template"]).name
|
||||
directory = Path(options["template"]).parent
|
||||
metadatas = subject_metadatas(options)
|
||||
logger.debug(f"Metadata {metadatas}")
|
||||
|
||||
setup()
|
||||
|
||||
for meta in metadatas:
|
||||
logger.debug(f"Feeding template toward {meta['texfile']}")
|
||||
if options["crazy"]:
|
||||
crazy_feed(
|
||||
template=Path(meta["directory"]) / meta["template"],
|
||||
data=meta,
|
||||
output=meta["texfile"],
|
||||
force=1,
|
||||
)
|
||||
else:
|
||||
feed(
|
||||
template=Path(meta["directory"]) / meta["template"],
|
||||
data=meta,
|
||||
output=meta["texfile"],
|
||||
force=1,
|
||||
)
|
||||
assert(Path(meta["texfile"]).exists())
|
||||
logger.debug(f"{meta['texfile']} fed")
|
||||
|
||||
if options["corr"]:
|
||||
logger.debug(f"Building correction for {meta['texfile']}")
|
||||
meta.update({
|
||||
"corr_texfile": activate_printanswers(meta["texfile"]),
|
||||
})
|
||||
|
||||
if not options["no_compile"]:
|
||||
logger.debug("Compiling")
|
||||
for prefix in ["", "corr_"]:
|
||||
key = prefix + "texfile"
|
||||
try:
|
||||
meta[key]
|
||||
except KeyError:
|
||||
pass
|
||||
else:
|
||||
texcompile(meta[key])
|
||||
meta.update({
|
||||
prefix+'pdffile': meta[key].replace('tex', 'pdf')
|
||||
})
|
||||
|
||||
if not options["no_join"]:
|
||||
for prefix in ["", "corr_"]:
|
||||
key = prefix + "pdffile"
|
||||
try:
|
||||
pdfs = [m[key] for m in metadatas]
|
||||
except KeyError:
|
||||
pass
|
||||
else:
|
||||
pdfjoin(
|
||||
pdfs,
|
||||
template.replace("tpl", prefix+"all").replace(".tex", ".pdf"),
|
||||
directory,
|
||||
rm_pdfs=1,
|
||||
)
|
||||
|
||||
if not options["dirty"]:
|
||||
clean(directory)
|
||||
|
||||
|
||||
# -----------------------------
|
||||
# Reglages pour 'vim'
|
||||
# vim:set autoindent expandtab tabstop=4 shiftwidth=4:
|
||||
# cursor: 16 del
|
||||
import bopytex.default_config as DEFAULT
|
||||
from bopytex.service import orcherstrator
|
||||
|
26
bopytex/default_config.py
Normal file
26
bopytex/default_config.py
Normal file
@@ -0,0 +1,26 @@
|
||||
from bopytex.planner.generate_compile_join_planner import planner
|
||||
from bopytex.worker import Dispatcher
|
||||
from bopytex.worker.activate_corr import activate_corr
|
||||
from bopytex.worker.clean import clean
|
||||
from bopytex.worker.compile import pdflatex
|
||||
from bopytex.worker.generate import generate
|
||||
from bopytex.worker.join_pdf import pdfjam
|
||||
from bopytex.jinja2_env.texenv import texenv
|
||||
|
||||
jinja2 = {
|
||||
"environment": texenv
|
||||
}
|
||||
|
||||
dispatcher = Dispatcher({
|
||||
"GENERATE": generate,
|
||||
"COMPILE": pdflatex,
|
||||
"JOIN": pdfjam,
|
||||
"CLEAN": clean,
|
||||
"ACTIVATE_CORR": activate_corr,
|
||||
})
|
||||
|
||||
latex = {
|
||||
"solution": r"solution/print = true",
|
||||
"no_solution": r"solution/print = false",
|
||||
}
|
||||
|
@@ -1,33 +0,0 @@
|
||||
#!/usr/bin/env python
|
||||
# encoding: utf-8
|
||||
|
||||
"""
|
||||
Custom filter for Bopytex
|
||||
"""
|
||||
|
||||
__all__ = ["do_calculus"]
|
||||
|
||||
def do_calculus(steps, name="A", sep="=", end="", joining=" \\\\ \n"):
|
||||
"""Display properly the calculus
|
||||
|
||||
Generate this form string:
|
||||
"name & sep & a_step end joining"
|
||||
|
||||
:param steps: list of steps
|
||||
:returns: latex string ready to be endbeded
|
||||
|
||||
|
||||
"""
|
||||
|
||||
ans = joining.join([
|
||||
name + " & "
|
||||
+ sep + " & "
|
||||
+ str(s) + end for s in steps
|
||||
])
|
||||
return ans
|
||||
|
||||
|
||||
# -----------------------------
|
||||
# Reglages pour 'vim'
|
||||
# vim:set autoindent expandtab tabstop=4 shiftwidth=4:
|
||||
# cursor: 16 del
|
31
bopytex/jinja2_env/texenv.py
Normal file
31
bopytex/jinja2_env/texenv.py
Normal file
@@ -0,0 +1,31 @@
|
||||
#!/usr/bin/env python
|
||||
# encoding: utf-8
|
||||
|
||||
import jinja2
|
||||
|
||||
__all__ = [
|
||||
"texenv",
|
||||
]
|
||||
|
||||
# Definition of jinja syntax for latex
|
||||
texenv = jinja2.Environment(
|
||||
block_start_string=r"\Block{",
|
||||
block_end_string="}",
|
||||
variable_start_string=r"\Var{",
|
||||
variable_end_string="}",
|
||||
comment_start_string=r"\#{",
|
||||
comment_end_string="}",
|
||||
line_statement_prefix="%-",
|
||||
line_comment_prefix="%#",
|
||||
loader=jinja2.ChoiceLoader(
|
||||
[
|
||||
jinja2.FileSystemLoader(["./"]),
|
||||
]
|
||||
),
|
||||
extensions=["jinja2.ext.do"],
|
||||
)
|
||||
|
||||
# -----------------------------
|
||||
# Reglages pour 'vim'
|
||||
# vim:set autoindent expandtab tabstop=4 shiftwidth=4:
|
||||
# cursor: 16 del
|
@@ -1,38 +0,0 @@
|
||||
#!/usr/bin/env python
|
||||
# encoding: utf-8
|
||||
|
||||
from random import randint
|
||||
|
||||
def pythagore_triplet(v_min = 1, v_max = 10):
|
||||
"""Random pythagore triplet generator
|
||||
|
||||
:param v_min: minimum in randint
|
||||
:param v_max: max in randint
|
||||
:returns: (a,b,c) such that a^2 + b^2 = c^2
|
||||
|
||||
"""
|
||||
u = randint(v_min,v_max)
|
||||
v = randint(v_min,v_max)
|
||||
while v == u:
|
||||
v = randint(v_min,v_max)
|
||||
|
||||
u, v = max(u,v), min(u,v)
|
||||
|
||||
return (u**2-v**2 , 2*u*v, u**2 + v**2)
|
||||
|
||||
if __name__ == '__main__':
|
||||
print(pythagore_triplet())
|
||||
|
||||
for j in range(1,10):
|
||||
for i in range(j,10):
|
||||
print((i**2-j**2 , 2*i*j, i**2 + j**2))
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
# -----------------------------
|
||||
# Reglages pour 'vim'
|
||||
# vim:set autoindent expandtab tabstop=4 shiftwidth=4:
|
||||
# cursor: 16 del
|
@@ -1,33 +0,0 @@
|
||||
\Block{macro solveEquation(P)}
|
||||
|
||||
On commence par calculer le discriminant de $P(x) = \Var{P}$.
|
||||
\begin{eqnarray*}
|
||||
\Delta & = & b^2-4ac \\
|
||||
\Var{P.delta.explain()|calculus(name="\\Delta")}
|
||||
\end{eqnarray*}
|
||||
|
||||
\Block{if P.delta > 0}
|
||||
comme $\Delta = \Var{P.delta} > 0$ donc $P$ a deux racines
|
||||
|
||||
\begin{eqnarray*}
|
||||
x_1 & = & \frac{-b - \sqrt{\Delta}}{2a} = \frac{\Var{-P.b} - \sqrt{\Var{P.delta}}}{2 \times \Var{P.a}} = \Var{P.roots()[0] } \\
|
||||
x_2 & = & \frac{-b + \sqrt{\Delta}}{2a} = \frac{\Var{-P.b} + \sqrt{\Var{P.delta}}}{2 \times \Var{P.a}} = \Var{P.roots()[1] }
|
||||
\end{eqnarray*}
|
||||
|
||||
Les solutions de l'équation $\Var{P} = 0$ sont donc $\mathcal{S} = \left\{ \Var{P.roots()[0]}; \Var{P.roots()[1]} \right\}$
|
||||
|
||||
\Block{elif P.delta == 0}
|
||||
Comme $\Delta = 0$ donc $P$ a une racine
|
||||
|
||||
\begin{eqnarray*}
|
||||
x_1 = \frac{-b}{2a} = \frac{-\Var{P.b}}{2\times \Var{P.a}} = \Var{P.roots()[0]} \\
|
||||
\end{eqnarray*}
|
||||
|
||||
La solution de $\Var{P} = 0$ est donc $\mathcal{S} = \left\{ \Var{P.roots()[0]}\right\}$
|
||||
|
||||
\Block{else}
|
||||
Alors $\Delta = \Var{P.delta} < 0$ donc $P$ n'a pas de racine donc l'équation $\Var{P} = 0$ n'a pas de solution.
|
||||
|
||||
\Block{endif}
|
||||
|
||||
\Block{endmacro}
|
36
bopytex/message.py
Normal file
36
bopytex/message.py
Normal file
@@ -0,0 +1,36 @@
|
||||
class Message():
|
||||
def __init__(self, status, out, err):
|
||||
self._status = status
|
||||
self._out = out
|
||||
self._err = err
|
||||
|
||||
@property
|
||||
def status(self):
|
||||
return self._status
|
||||
|
||||
@property
|
||||
def out(self):
|
||||
return self._out
|
||||
|
||||
@property
|
||||
def err(self):
|
||||
return self._err
|
||||
|
||||
def __repr__(self):
|
||||
return f"Message(status={self.status}, out={self.out}, err={self.err})"
|
||||
|
||||
class SubprocessMessage(Message):
|
||||
def __init__(self, process):
|
||||
self._process = process
|
||||
|
||||
@property
|
||||
def status(self):
|
||||
return self._process.wait()
|
||||
|
||||
@property
|
||||
def out(self):
|
||||
return self._process.stdout
|
||||
|
||||
@property
|
||||
def err(self):
|
||||
return self._process.stderr
|
0
bopytex/planner/__init__.py
Normal file
0
bopytex/planner/__init__.py
Normal file
58
bopytex/planner/activate_corr_compile_join_planner.py
Normal file
58
bopytex/planner/activate_corr_compile_join_planner.py
Normal file
@@ -0,0 +1,58 @@
|
||||
from bopytex.tasks import Task, activate_corr_on, compile_pdf, join_pdfs
|
||||
import bopytex.planner.naming as naming
|
||||
import os
|
||||
|
||||
|
||||
def list_files(dir=".", accept=lambda _: True, reject=lambda _: False):
|
||||
files = []
|
||||
for file in os.listdir(dir):
|
||||
if accept(file) and not reject(file):
|
||||
files.append(file)
|
||||
return files
|
||||
|
||||
|
||||
def planner(options: dict) -> list[Task]:
|
||||
sources = list_files(
|
||||
accept=lambda x: x.endswith(".tex"),
|
||||
reject=lambda x: x.startswith("tpl_"),
|
||||
)
|
||||
options["sources"] = sources
|
||||
return tasks_builder(options)
|
||||
|
||||
|
||||
def tasks_builder(
|
||||
options: dict,
|
||||
) -> list[Task]:
|
||||
opt = {
|
||||
"no_join": False,
|
||||
"no_pdf": False,
|
||||
}
|
||||
opt.update(options)
|
||||
|
||||
try:
|
||||
sources = opt["sources"]
|
||||
no_join = opt["no_join"]
|
||||
no_pdf = opt["no_pdf"]
|
||||
except KeyError:
|
||||
raise PlannerMissingOption("An option is missing")
|
||||
|
||||
tasks = []
|
||||
corr_pdfs = []
|
||||
|
||||
for source in sources:
|
||||
corr_source = naming.corr(source)
|
||||
tasks.append(activate_corr_on(source, opt, corr_source))
|
||||
|
||||
if not no_pdf:
|
||||
corr_pdf = naming.source2pdf(corr_source)
|
||||
tasks.append(compile_pdf(corr_source, corr_pdf))
|
||||
corr_pdfs.append(corr_pdf)
|
||||
|
||||
if not no_join:
|
||||
joined = "joined.pdf"
|
||||
|
||||
if corr_pdfs:
|
||||
corr_joined = naming.corr(joined)
|
||||
tasks.append(join_pdfs(corr_pdfs, corr_joined))
|
||||
|
||||
return tasks
|
2
bopytex/planner/exceptions.py
Normal file
2
bopytex/planner/exceptions.py
Normal file
@@ -0,0 +1,2 @@
|
||||
class PlannerMissingOption(Exception):
|
||||
pass
|
9
bopytex/planner/fake_planner.py
Normal file
9
bopytex/planner/fake_planner.py
Normal file
@@ -0,0 +1,9 @@
|
||||
from bopytex.tasks import Task
|
||||
|
||||
|
||||
def simple(options: dict) -> list[Task]:
|
||||
"""Simple planner with options['quantity'] tasks and no dependencies"""
|
||||
return [
|
||||
Task("DO", args={"number": i}, deps=[], output=f"{i}")
|
||||
for i in range(options["quantity"])
|
||||
]
|
103
bopytex/planner/generate_compile_join_planner.py
Normal file
103
bopytex/planner/generate_compile_join_planner.py
Normal file
@@ -0,0 +1,103 @@
|
||||
from bopytex.tasks import Task, activate_corr_on, compile_pdf, generate, join_pdfs
|
||||
import bopytex.planner.naming as naming
|
||||
from bopytex.planner.exceptions import PlannerMissingOption
|
||||
import csv
|
||||
|
||||
|
||||
def build_subject_list_from_infos(infos: list[dict]) -> list[dict]:
|
||||
subjects = []
|
||||
digit = len(str(len(infos)))
|
||||
for i, infos in enumerate(infos):
|
||||
subjects.append({"number": str(i + 1).zfill(digit), **infos})
|
||||
return subjects
|
||||
|
||||
|
||||
def build_subject_list_from_qty(qty: int) -> list[dict]:
|
||||
subjects = []
|
||||
digit = len(str(qty))
|
||||
for i in range(qty):
|
||||
subjects.append({"number": str(i + 1).zfill(digit)})
|
||||
return subjects
|
||||
|
||||
|
||||
def planner(options: dict) -> list[Task]:
|
||||
try:
|
||||
students_csv = options["students_csv"]
|
||||
assert options["students_csv"] != ""
|
||||
|
||||
except (KeyError, AssertionError):
|
||||
try:
|
||||
quantity_subjects = options["quantity_subjects"]
|
||||
assert options["quantity_subjects"] != 0
|
||||
except (KeyError, AssertionError):
|
||||
raise PlannerMissingOption("students_csv or quantity_subjects is required")
|
||||
else:
|
||||
options["subjects"] = build_subject_list_from_qty(qty=quantity_subjects)
|
||||
|
||||
else:
|
||||
with open(students_csv, "r") as csv_file:
|
||||
reader = csv.DictReader(csv_file)
|
||||
infos = [r for r in reader]
|
||||
options["subjects"] = build_subject_list_from_infos(infos)
|
||||
|
||||
return tasks_builder(options)
|
||||
|
||||
|
||||
def tasks_builder(
|
||||
options: dict,
|
||||
) -> list[Task]:
|
||||
|
||||
opt = {
|
||||
"corr": False,
|
||||
"no_join": False,
|
||||
"no_pdf": False,
|
||||
}
|
||||
opt.update(options)
|
||||
|
||||
try:
|
||||
template = opt["template"]
|
||||
subjects = opt["subjects"]
|
||||
corr = opt["corr"]
|
||||
no_join = opt["no_join"]
|
||||
no_pdf = opt["no_pdf"]
|
||||
except KeyError:
|
||||
raise PlannerMissingOption("An option is missing")
|
||||
|
||||
tasks = []
|
||||
|
||||
pdfs = []
|
||||
corr_pdfs = []
|
||||
|
||||
for subject in subjects:
|
||||
source = naming.template2source(template, subject)
|
||||
args = {
|
||||
"subject": subject,
|
||||
"options": options
|
||||
}
|
||||
|
||||
tasks.append(generate(template, args, source))
|
||||
|
||||
if not no_pdf:
|
||||
pdf = naming.source2pdf(source)
|
||||
tasks.append(compile_pdf(source, pdf))
|
||||
pdfs.append(pdf)
|
||||
|
||||
if corr:
|
||||
corr_source = naming.corr(source)
|
||||
tasks.append(activate_corr_on(source, opt, corr_source))
|
||||
|
||||
if not no_pdf:
|
||||
corr_pdf = naming.source2pdf(corr_source)
|
||||
tasks.append(compile_pdf(corr_source, corr_pdf))
|
||||
corr_pdfs.append(corr_pdf)
|
||||
|
||||
if not no_join:
|
||||
joined = naming.join(template)
|
||||
if pdfs:
|
||||
tasks.append(join_pdfs(pdfs, joined))
|
||||
|
||||
if corr_pdfs:
|
||||
corr_joined = naming.corr(joined)
|
||||
tasks.append(join_pdfs(corr_pdfs, corr_joined))
|
||||
|
||||
return tasks
|
15
bopytex/planner/naming.py
Normal file
15
bopytex/planner/naming.py
Normal file
@@ -0,0 +1,15 @@
|
||||
def template2source(template: str, metadatas: dict):
|
||||
return metadatas["number"] + template[3:]
|
||||
|
||||
|
||||
def corr(source):
|
||||
return "corr_" + source
|
||||
|
||||
|
||||
def source2pdf(source):
|
||||
return source[:-4] + ".pdf"
|
||||
|
||||
|
||||
def join(template):
|
||||
return source2pdf("joined" + template[3:])
|
||||
|
78
bopytex/scheduler.py
Normal file
78
bopytex/scheduler.py
Normal file
@@ -0,0 +1,78 @@
|
||||
""" Scheduler for action to make """
|
||||
|
||||
|
||||
from bopytex.tasks import Task
|
||||
from bopytex.worker import Dispatcher
|
||||
|
||||
|
||||
class Scheduler:
|
||||
"""Scheduler is responsible of getting tasks (the tasks) and yield those that can be done"""
|
||||
|
||||
def __init__(self, dispatcher:Dispatcher, output_done: list[str] = None):
|
||||
self._dispatcher = dispatcher
|
||||
|
||||
if output_done is None:
|
||||
self._output_done = []
|
||||
else:
|
||||
self._output_done = output_done
|
||||
|
||||
self._tasks = []
|
||||
self._failed_tasks = []
|
||||
|
||||
@property
|
||||
def tasks(self) -> list[Task]:
|
||||
"""List all the tasks todo"""
|
||||
return self._tasks
|
||||
|
||||
@property
|
||||
def doable_tasks(self) -> list[Task]:
|
||||
"""List all doable tasks"""
|
||||
return [
|
||||
task
|
||||
for task in self.tasks
|
||||
if not task.deps or all([d in self.output_done for d in task.deps])
|
||||
]
|
||||
|
||||
@property
|
||||
def all_deps(self) -> set[str]:
|
||||
"""List dependencies of all tasks"""
|
||||
return {d for task in self.tasks for d in task.deps}
|
||||
|
||||
@property
|
||||
def all_output(self) -> set[str]:
|
||||
"""List ouput of all tasks"""
|
||||
return {task.output for task in self.tasks}
|
||||
|
||||
@property
|
||||
def output_done(self) -> list[str]:
|
||||
return self._output_done
|
||||
|
||||
@property
|
||||
def failed_tasks(self) -> list[Task]:
|
||||
return self._failed_tasks
|
||||
|
||||
def append(self, tasks: list[Task]):
|
||||
self._tasks += tasks
|
||||
|
||||
def is_finishable(self):
|
||||
return self.all_deps.issubset(self.all_output)
|
||||
|
||||
def next_task(self):
|
||||
try:
|
||||
task = self.doable_tasks[0]
|
||||
except IndexError:
|
||||
raise StopIteration
|
||||
|
||||
self._tasks.remove(task)
|
||||
message = self._dispatcher(task)
|
||||
if message.status == 0:
|
||||
self._output_done.append(task.output)
|
||||
else:
|
||||
self._failed_tasks.append(task)
|
||||
|
||||
return message
|
||||
|
||||
def backlog(self):
|
||||
""" Yield tasks sorted according to dependencies """
|
||||
while self.doable_tasks:
|
||||
yield self.next_task()
|
@@ -3,20 +3,30 @@
|
||||
|
||||
|
||||
import click
|
||||
from .bopytex import produce_and_compile
|
||||
import logging
|
||||
|
||||
from bopytex.service import main
|
||||
|
||||
formatter = logging.Formatter("%(name)s :: %(levelname)s :: %(message)s")
|
||||
steam_handler = logging.StreamHandler()
|
||||
steam_handler.setLevel(logging.DEBUG)
|
||||
steam_handler.setFormatter(formatter)
|
||||
logger = logging.getLogger(__name__)
|
||||
logger.setLevel(logging.DEBUG)
|
||||
logger.addHandler(steam_handler)
|
||||
|
||||
|
||||
@click.command()
|
||||
@click.argument(
|
||||
"template",
|
||||
type=click.Path(exists=True),
|
||||
nargs=1,
|
||||
# help="File with the template. The name should have the following form tpl_... .",
|
||||
)
|
||||
@click.option(
|
||||
"-w",
|
||||
"--working-dir",
|
||||
default=".",
|
||||
type=click.Path(exists=True),
|
||||
help="Where fed templates and compiled files will be placed",
|
||||
)
|
||||
@click.option(
|
||||
"-s",
|
||||
@@ -25,9 +35,6 @@ from .bopytex import produce_and_compile
|
||||
default="",
|
||||
help="CSV containing list of students names",
|
||||
)
|
||||
@click.option(
|
||||
"-d", "--dirty", is_flag=True, default=False, help="Do not clean after compilation",
|
||||
)
|
||||
@click.option(
|
||||
"-n",
|
||||
"--no-compile",
|
||||
@@ -36,11 +43,18 @@ from .bopytex import produce_and_compile
|
||||
help="Do not compile source code",
|
||||
)
|
||||
@click.option(
|
||||
"-N",
|
||||
"--number_subjects",
|
||||
"-d",
|
||||
"--dirty",
|
||||
is_flag=True,
|
||||
default=False,
|
||||
help="Do not clean after compilation",
|
||||
)
|
||||
@click.option(
|
||||
"-q",
|
||||
"--quantity_subjects",
|
||||
type=int,
|
||||
default=1,
|
||||
help="The number of subjects to make",
|
||||
help="The quantity of subjects to make",
|
||||
)
|
||||
@click.option(
|
||||
"-j",
|
||||
@@ -54,29 +68,31 @@ from .bopytex import produce_and_compile
|
||||
"--only-corr",
|
||||
is_flag=True,
|
||||
default=False,
|
||||
help="Create and compile only correction from existing subjects",
|
||||
help="Activate correction and compile only from existing subjects",
|
||||
)
|
||||
@click.option(
|
||||
"-c",
|
||||
"-C",
|
||||
"--corr",
|
||||
is_flag=True,
|
||||
default=False,
|
||||
help="Create and compile correction while making subjects",
|
||||
)
|
||||
@click.option(
|
||||
"-C",
|
||||
"--crazy",
|
||||
is_flag=True,
|
||||
default=False,
|
||||
help="Crazy mode. Tries and tries again until template feeding success!",
|
||||
"-c",
|
||||
"--configfile",
|
||||
type=str,
|
||||
default="bopyptex_config.py",
|
||||
help="Config file path",
|
||||
)
|
||||
def new(**options):
|
||||
""" Bopytex
|
||||
|
||||
Feed the template (tpl_...) and then compile it with latex.
|
||||
|
||||
"""
|
||||
produce_and_compile(options)
|
||||
for message in main(**options):
|
||||
try:
|
||||
assert message.status == 0
|
||||
except AssertionError:
|
||||
logger.warning(message)
|
||||
break
|
||||
else:
|
||||
logger.info(message.out)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
99
bopytex/service.py
Executable file
99
bopytex/service.py
Executable file
@@ -0,0 +1,99 @@
|
||||
#!/usr/bin/env python
|
||||
# encoding: utf-8
|
||||
|
||||
"""
|
||||
Producing then compiling templates
|
||||
"""
|
||||
|
||||
import importlib.util
|
||||
import os
|
||||
from pathlib import Path
|
||||
from bopytex.scheduler import Scheduler
|
||||
from bopytex import default_config
|
||||
|
||||
|
||||
def orcherstrator(
|
||||
options: dict,
|
||||
planner,
|
||||
dispatcher,
|
||||
):
|
||||
tasks = planner(options)
|
||||
|
||||
scheduler = Scheduler(dispatcher, [options["template"]])
|
||||
scheduler.append(tasks)
|
||||
|
||||
for message in scheduler.backlog():
|
||||
yield message
|
||||
|
||||
|
||||
def load_module(modulefile: str):
|
||||
spec = importlib.util.spec_from_file_location("module.name", modulefile)
|
||||
module = importlib.util.module_from_spec(spec)
|
||||
spec.loader.exec_module(module)
|
||||
return module
|
||||
|
||||
|
||||
def clean_vars_keys(
|
||||
vars: dict,
|
||||
keys: list[str] = [
|
||||
"__name__",
|
||||
"__doc__",
|
||||
"__package__",
|
||||
"__loader__",
|
||||
"__spec__",
|
||||
"__file__",
|
||||
"__cached__",
|
||||
"__builtins__",
|
||||
],
|
||||
) -> dict:
|
||||
new_dict = vars.copy()
|
||||
for k in keys:
|
||||
del new_dict[k]
|
||||
return new_dict
|
||||
|
||||
|
||||
def config_from_file(filename: str) -> dict:
|
||||
if Path(filename).exists():
|
||||
local_config = vars(load_module(filename))
|
||||
return clean_vars_keys(local_config)
|
||||
else:
|
||||
return {}
|
||||
|
||||
|
||||
def build_config(options: dict) -> dict:
|
||||
"""Look for options["configfile"] to load it with default_config and options"""
|
||||
config = clean_vars_keys(vars(default_config))
|
||||
|
||||
configfile = ""
|
||||
try:
|
||||
configfile = options["configfile"]
|
||||
except KeyError:
|
||||
pass
|
||||
try:
|
||||
configfile = os.environ["BOPYTEXCONFIG"]
|
||||
except KeyError:
|
||||
pass
|
||||
if configfile:
|
||||
local_config = config_from_file(configfile)
|
||||
config.update(local_config)
|
||||
|
||||
config.update(options)
|
||||
|
||||
return config
|
||||
|
||||
|
||||
def main(**options):
|
||||
|
||||
config = build_config(options)
|
||||
|
||||
orcherstre = orcherstrator(
|
||||
config, planner=default_config.planner, dispatcher=default_config.dispatcher
|
||||
)
|
||||
for message in orcherstre:
|
||||
yield message
|
||||
|
||||
|
||||
# -----------------------------
|
||||
# Reglages pour 'vim'
|
||||
# vim:set autoindent expandtab tabstop=4 shiftwidth=4:
|
||||
# cursor: 16 del
|
64
bopytex/tasks.py
Normal file
64
bopytex/tasks.py
Normal file
@@ -0,0 +1,64 @@
|
||||
""" Produce tasks to do
|
||||
|
||||
It essentially place things at the right place.
|
||||
|
||||
"""
|
||||
from dataclasses import dataclass
|
||||
|
||||
|
||||
@dataclass
|
||||
class Task:
|
||||
action: str
|
||||
args: dict
|
||||
deps: list
|
||||
output: str
|
||||
|
||||
|
||||
def generate(template: str, meta: dict, output: str):
|
||||
"""Create a task to generate a subject"""
|
||||
return Task(
|
||||
action="GENERATE",
|
||||
args=meta,
|
||||
deps=[template],
|
||||
output=output,
|
||||
)
|
||||
|
||||
|
||||
def activate_corr_on(src: str, meta:dict, output: str):
|
||||
"""Create a task to activate correction for src"""
|
||||
return Task(
|
||||
action="ACTIVATE_CORR",
|
||||
args=meta,
|
||||
deps=[src],
|
||||
output=output,
|
||||
)
|
||||
|
||||
|
||||
def compile_pdf(src: str, output: str):
|
||||
"""Create a task to compile src"""
|
||||
return Task(
|
||||
action="COMPILE",
|
||||
args={},
|
||||
deps=[src],
|
||||
output=output,
|
||||
)
|
||||
|
||||
|
||||
def join_pdfs(pdfs: list, output: str):
|
||||
"""Create task to join pdf together"""
|
||||
return Task(
|
||||
action="JOIN",
|
||||
args={},
|
||||
deps=pdfs,
|
||||
output=output,
|
||||
)
|
||||
|
||||
|
||||
def clean(files: list):
|
||||
"""Create task to clean files"""
|
||||
return Task(
|
||||
action="CLEAN",
|
||||
args={},
|
||||
deps=files,
|
||||
output=None,
|
||||
)
|
18
bopytex/worker/__init__.py
Normal file
18
bopytex/worker/__init__.py
Normal file
@@ -0,0 +1,18 @@
|
||||
class ActionNotFound(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class Dispatcher:
|
||||
def __init__(self, actions: list):
|
||||
self._actions = actions
|
||||
|
||||
def __call__(self, task):
|
||||
try:
|
||||
choosen_action = self._actions[task.action]
|
||||
except KeyError:
|
||||
raise ActionNotFound(
|
||||
f"The action {task.action} is not in {self._actions.keys()}"
|
||||
)
|
||||
|
||||
return choosen_action(args=task.args, deps=task.deps, output=task.output)
|
||||
|
14
bopytex/worker/activate_corr.py
Normal file
14
bopytex/worker/activate_corr.py
Normal file
@@ -0,0 +1,14 @@
|
||||
from bopytex.message import Message
|
||||
|
||||
|
||||
def activate_corr(args, deps, output):
|
||||
no_solution = args["latex"]["no_solution"]
|
||||
solution = args["latex"]["solution"]
|
||||
|
||||
with open(deps[0], "r") as input_f:
|
||||
with open(output, "w") as output_f:
|
||||
for line in input_f.readlines():
|
||||
output_f.write(line.replace(no_solution, solution))
|
||||
|
||||
return Message(0, [f"ACTIVATE CORR - {deps[0]} to {output}"], [])
|
||||
|
2
bopytex/worker/clean.py
Normal file
2
bopytex/worker/clean.py
Normal file
@@ -0,0 +1,2 @@
|
||||
def clean(args: dict, deps, output):
|
||||
pass
|
25
bopytex/worker/compile.py
Normal file
25
bopytex/worker/compile.py
Normal file
@@ -0,0 +1,25 @@
|
||||
import subprocess
|
||||
|
||||
from bopytex.message import Message
|
||||
from ..message import SubprocessMessage
|
||||
|
||||
|
||||
def curstomtex(command: str, options: str):
|
||||
def func(args: dict, deps, output) -> Message:
|
||||
compile_process = subprocess.Popen(
|
||||
[command, options, deps[0]],
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
universal_newlines=True,
|
||||
)
|
||||
return Message(
|
||||
compile_process.wait(),
|
||||
list(compile_process.stdout),
|
||||
list(compile_process.stderr),
|
||||
)
|
||||
|
||||
return func
|
||||
|
||||
|
||||
latexmk = curstomtex("latexmk", "-f")
|
||||
pdflatex = curstomtex("pdflatex", "--interaction=nonstopmode")
|
32
bopytex/worker/generate.py
Normal file
32
bopytex/worker/generate.py
Normal file
@@ -0,0 +1,32 @@
|
||||
from jinja2.environment import Template
|
||||
|
||||
from bopytex.message import Message
|
||||
|
||||
|
||||
def generate(args, deps, output):
|
||||
env = args["options"]["jinja2"]["environment"]
|
||||
template = env.get_template(deps[0])
|
||||
|
||||
variables = {
|
||||
"options":args["options"],
|
||||
"subject":args["subject"],
|
||||
}
|
||||
|
||||
try:
|
||||
args["options"]["direct_access"]
|
||||
except KeyError:
|
||||
pass
|
||||
else:
|
||||
for (k,v) in args["options"]["direct_access"].items():
|
||||
if k not in ["options", "subject"]:
|
||||
variables[k] = v
|
||||
|
||||
try:
|
||||
with open(output, "w") as out:
|
||||
fed = template.render(variables)
|
||||
out.write(fed)
|
||||
|
||||
return Message(0, [f"GENERATE - {deps[0]} to {output}"], [])
|
||||
|
||||
except Exception as e:
|
||||
return Message(1, [], [e])
|
33
bopytex/worker/join_pdf.py
Normal file
33
bopytex/worker/join_pdf.py
Normal file
@@ -0,0 +1,33 @@
|
||||
import subprocess
|
||||
|
||||
from bopytex.message import Message, SubprocessMessage
|
||||
|
||||
|
||||
def pdfjam(args: dict, deps, output):
|
||||
joining_process = subprocess.Popen(
|
||||
["pdfjam"] + deps + ["-o", output],
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
universal_newlines=True,
|
||||
)
|
||||
return Message(
|
||||
joining_process.wait(),
|
||||
list(joining_process.stdout),
|
||||
list(joining_process.stderr),
|
||||
)
|
||||
|
||||
|
||||
def gs(args: dict, deps, output):
|
||||
""" Not working. The command works in terminal but not here """
|
||||
joining_process = subprocess.Popen(
|
||||
["gs", f"-dBATCH -dNOPAUSE -q -sDEVICE=pdfwrite -sOutputFile={output}"]
|
||||
+ deps,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
universal_newlines=True,
|
||||
)
|
||||
return Message(
|
||||
joining_process.wait(),
|
||||
list(joining_process.stdout),
|
||||
list(joining_process.stderr),
|
||||
)
|
Reference in New Issue
Block a user