Compare commits
6 Commits
2a387a1bc8
...
dags
Author | SHA1 | Date | |
---|---|---|---|
ec19534094 | |||
d4428187d1 | |||
9118feb4c6 | |||
d7716a4b8e | |||
478a8c2403 | |||
8882317a47 |
@@ -32,24 +32,28 @@ class DataPlateform:
|
||||
def repository(self, id: str) -> DataRepository:
|
||||
return self._repositories[id]
|
||||
|
||||
def add_flux(self, name: str, flux: Flux) -> str:
|
||||
if name in self._fluxes:
|
||||
raise DataPlateformError("The flux {name} already exists")
|
||||
def is_valid_flux(self, flux: Flux) -> bool:
|
||||
return True
|
||||
|
||||
self._fluxes[name] = flux
|
||||
return name
|
||||
def add_flux(self, flux: Flux) -> str:
|
||||
if flux.id in self._fluxes:
|
||||
raise DataPlateformError("The flux {flux} already exists")
|
||||
|
||||
assert self.is_valid_flux(flux)
|
||||
self._fluxes[flux.id] = flux
|
||||
return flux.id
|
||||
|
||||
@property
|
||||
def fluxes(self) -> list[str]:
|
||||
return list(self._fluxes)
|
||||
|
||||
def flux(self, name: str) -> Flux:
|
||||
return self._fluxes[name]
|
||||
def flux(self, flux_id: str) -> Flux:
|
||||
return self._fluxes[flux_id]
|
||||
|
||||
def execute_flux(self, name: str) -> FluxMetaData:
|
||||
if name not in self._fluxes:
|
||||
raise DataPlateformError("The flux {name} is not registered")
|
||||
return consume_flux(self._fluxes[name])
|
||||
def execute_flux(self, flux_id: str) -> FluxMetaData:
|
||||
if flux_id not in self._fluxes:
|
||||
raise DataPlateformError("The flux {flux_id} is not registered")
|
||||
return consume_flux(self._fluxes[flux_id])
|
||||
|
||||
def graphset(
|
||||
self,
|
||||
|
@@ -2,7 +2,9 @@ from pathlib import Path
|
||||
from datetime import datetime
|
||||
import csv
|
||||
import json
|
||||
from typing import Iterable
|
||||
|
||||
from plesna.libs.string_tools import StringToolsError, extract_values_from_pattern
|
||||
from plesna.storage.metadata_repository.metadata_repository import (
|
||||
ExecutionLog,
|
||||
MetaDataRepository,
|
||||
@@ -21,9 +23,9 @@ class FSMetaDataRepository(MetaDataRepository):
|
||||
|
||||
"""
|
||||
|
||||
FILEMODEL = {
|
||||
"execution": "{flux_id}_execution.csv",
|
||||
"modification": "{table_id}_modification.csv",
|
||||
OBJECTS = {
|
||||
"flux": {"filemodel": "{id}_execution.csv", "logmodel": ExecutionLog},
|
||||
"table": {"filemodel": "{id}_execution.csv", "logmodel": ModificationLog},
|
||||
}
|
||||
|
||||
def __init__(self, basepath: str):
|
||||
@@ -32,53 +34,99 @@ class FSMetaDataRepository(MetaDataRepository):
|
||||
self._basepath = Path(basepath)
|
||||
assert self._basepath.exists()
|
||||
|
||||
def get_things(self, what: str) -> list[str]:
|
||||
"""List all ids for 'what'"""
|
||||
whats = []
|
||||
for filepath in self._basepath.iterdir():
|
||||
try:
|
||||
founded = extract_values_from_pattern(
|
||||
self.OBJECTS[what]["filemodel"], filepath.name
|
||||
)
|
||||
except StringToolsError:
|
||||
pass
|
||||
else:
|
||||
whats.append(founded["id"])
|
||||
return whats
|
||||
|
||||
def fluxes(self) -> list[str]:
|
||||
"""List fluxes's ids"""
|
||||
raise NotImplementedError
|
||||
return self.get_things(what="flux")
|
||||
|
||||
def tables(
|
||||
self,
|
||||
) -> list[str]:
|
||||
"""List all table's ids"""
|
||||
return self.get_things(what="table")
|
||||
|
||||
def _add_thing(self, what: str, id: str) -> str:
|
||||
"""Add the new things 'what'"""
|
||||
filepath = self._basepath / self.OBJECTS[what]["filemodel"].format(id=id)
|
||||
filepath.touch()
|
||||
with open(filepath, "a") as csvfile:
|
||||
writer = csv.DictWriter(
|
||||
csvfile, fieldnames=self.OBJECTS[what]["logmodel"].model_fields.keys()
|
||||
)
|
||||
writer.writeheader()
|
||||
return id
|
||||
|
||||
def add_flux(self, flux_id: str) -> str:
|
||||
"""Get the flux metadata"""
|
||||
filepath = self._basepath / self.FILEMODEL["execution"].format(flux_id=flux_id)
|
||||
filepath.touch()
|
||||
with open(filepath, "a") as csvfile:
|
||||
writer = csv.DictWriter(csvfile, fieldnames=ExecutionLog.model_fields.keys())
|
||||
writer.writeheader()
|
||||
return flux_id
|
||||
return self._add_thing(what="flux", id=flux_id)
|
||||
|
||||
def register_flux_execution(self, flux_id: str, dt: datetime, output: dict) -> ExecutionLog:
|
||||
"""Get the flux metadata"""
|
||||
filepath = self._basepath / self.FILEMODEL["execution"].format(flux_id=flux_id)
|
||||
metadata_ = ExecutionLog(datetime=dt, output={"data": output})
|
||||
def add_table(self, table_id: str) -> str:
|
||||
"""Get the table metadata"""
|
||||
return self._add_thing(what="table", id=table_id)
|
||||
|
||||
def _register_things_event(self, what: str, id: str, dt: datetime, event: dict) -> ExecutionLog:
|
||||
filepath = self._basepath / self.OBJECTS[what]["filemodel"].format(id=id)
|
||||
if not filepath.exists:
|
||||
raise FSMetaDataRepositoryError(f"The flux {flux_id} hasn't been added yet.")
|
||||
raise FSMetaDataRepositoryError(f"The {what} {id} hasn't been added yet.")
|
||||
|
||||
metadata_ = self.OBJECTS[what]["logmodel"](datetime=dt, **event)
|
||||
|
||||
with open(filepath, "a") as csvfile:
|
||||
writer = csv.DictWriter(csvfile, fieldnames=ExecutionLog.model_fields.keys())
|
||||
writer = csv.DictWriter(
|
||||
csvfile, fieldnames=self.OBJECTS[what]["logmodel"].model_fields.keys()
|
||||
)
|
||||
writer.writerow(metadata_.to_flat_dict())
|
||||
|
||||
return metadata_
|
||||
|
||||
def register_flux_execution(self, flux_id: str, dt: datetime, output: dict) -> ExecutionLog:
|
||||
"""Get the flux metadata"""
|
||||
return self._register_things_event("flux", flux_id, dt, {"output": {"data": output}})
|
||||
|
||||
def register_table_modification(self, table_id: str, dt: datetime, flux_id: str) -> str:
|
||||
"""Get the table metadata"""
|
||||
return self._register_things_event("table", table_id, dt, {"flux_id": flux_id})
|
||||
|
||||
def _get_all_log(self, what: str, id: str) -> Iterable[dict]:
|
||||
"""Generate log dict from history"""
|
||||
filepath = self._basepath / self.OBJECTS[what]["filemodel"].format(id=id)
|
||||
if not filepath.exists:
|
||||
raise FSMetaDataRepositoryError(f"The {what} {id} hasn't been added yet.")
|
||||
with open(filepath, "r") as csvfile:
|
||||
reader = csv.DictReader(csvfile)
|
||||
for row in reader:
|
||||
yield row
|
||||
|
||||
def flux_logs(self, flux_id: str) -> list[ExecutionLog]:
|
||||
"""Get all flux logs"""
|
||||
logs = []
|
||||
for logline in self._get_all_log("flux", flux_id):
|
||||
logline["output"] = json.loads(logline["output"])
|
||||
logs.append(self.OBJECTS["flux"]["logmodel"](**logline))
|
||||
|
||||
return logs
|
||||
|
||||
def flux(self, flux_id: str) -> ExecutionLog:
|
||||
"""Get the last flux log"""
|
||||
return max(self.flux_logs(flux_id), key=lambda l: l.datetime)
|
||||
|
||||
def flux_logs(self, flux_id: str) -> list[ExecutionLog]:
|
||||
"""Get all flux logs"""
|
||||
filepath = self._basepath / self.FILEMODEL["execution"].format(flux_id=flux_id)
|
||||
if not filepath.exists:
|
||||
raise FSMetaDataRepositoryError(f"The flux {flux_id} hasn't been added yet.")
|
||||
with open(filepath, "r") as csvfile:
|
||||
logs = []
|
||||
reader = csv.DictReader(csvfile)
|
||||
for row in reader:
|
||||
row["output"] = json.loads(row["output"])
|
||||
logs.append(ExecutionLog(**row))
|
||||
return logs
|
||||
|
||||
def tables(self) -> list[str]:
|
||||
"""List table's name in schema (the id)"""
|
||||
raise NotImplementedError
|
||||
def table_logs(self, table_id: str) -> list[ModificationLog]:
|
||||
"""Get all table's modification metadatas"""
|
||||
return [ModificationLog(**log) for log in self._get_all_log("table", table_id)]
|
||||
|
||||
def table(self, table_id: str) -> ModificationLog:
|
||||
"""Get table's metadatas"""
|
||||
raise NotImplementedError
|
||||
"""Get the last table's modification metadatas"""
|
||||
return max(self.table_logs(table_id), key=lambda l: l.datetime)
|
||||
|
@@ -10,6 +10,9 @@ class ModificationLog(BaseModel):
|
||||
datetime: datetime
|
||||
flux_id: str
|
||||
|
||||
def to_flat_dict(self):
|
||||
return {"datetime": self.datetime.isoformat(), "flux_id": self.flux_id}
|
||||
|
||||
|
||||
class ExecutionLog(BaseModel):
|
||||
datetime: datetime
|
||||
|
16
pyproject.toml
Normal file
16
pyproject.toml
Normal file
@@ -0,0 +1,16 @@
|
||||
[project]
|
||||
name = "plesna"
|
||||
version = "0.1.0"
|
||||
description = "Add your description here"
|
||||
readme = "README.md"
|
||||
requires-python = ">=3.13"
|
||||
dependencies = [
|
||||
"ruff>=0.8.5",
|
||||
]
|
||||
|
||||
[tool.ruff]
|
||||
line-length = 100
|
||||
indent-width = 4
|
||||
[tool.ruff.lint]
|
||||
select = ["E", "F"]
|
||||
ignore = ["F401"]
|
@@ -84,17 +84,17 @@ def foo_flux(repository: FSDataRepository) -> Flux:
|
||||
return flux
|
||||
|
||||
|
||||
def test_add_flux(repository: FSDataRepository, copy_flux: Flux):
|
||||
def test_add_flux(repository: FSDataRepository, copy_flux: Flux, foo_flux: Flux):
|
||||
dataplatform = DataPlateform()
|
||||
dataplatform.add_repository(repository)
|
||||
|
||||
dataplatform.add_flux(name="copy_flux", flux=copy_flux)
|
||||
dataplatform.add_flux(flux=copy_flux)
|
||||
assert dataplatform.fluxes == ["copy_flux"]
|
||||
dataplatform.add_flux(name="copy_flux_bis", flux=copy_flux)
|
||||
assert dataplatform.fluxes == ["copy_flux", "copy_flux_bis"]
|
||||
dataplatform.add_flux(flux=foo_flux)
|
||||
assert dataplatform.fluxes == ["copy_flux", "foo_flux"]
|
||||
|
||||
assert dataplatform.flux("copy_flux") == copy_flux
|
||||
assert dataplatform.flux("copy_flux_bis") == copy_flux
|
||||
assert dataplatform.flux("foo_flux") == foo_flux
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
@@ -107,8 +107,8 @@ def dataplatform(
|
||||
|
||||
dp.add_repository(repository)
|
||||
|
||||
dp.add_flux("foo", foo_flux)
|
||||
dp.add_flux("raw_brz_copy_username", copy_flux)
|
||||
dp.add_flux(foo_flux)
|
||||
dp.add_flux(copy_flux)
|
||||
return dp
|
||||
|
||||
|
||||
@@ -269,12 +269,12 @@ def test_content_from_graph_arguments(dataplatform: DataPlateform):
|
||||
|
||||
|
||||
def test_execute_flux(dataplatform: DataPlateform):
|
||||
meta = dataplatform.execute_flux("foo")
|
||||
meta = dataplatform.execute_flux("foo_flux")
|
||||
assert meta.data == {"who": "foo"}
|
||||
|
||||
assert dataplatform.repository("test").schema("test-bronze").tables == []
|
||||
|
||||
meta = dataplatform.execute_flux("raw_brz_copy_username")
|
||||
meta = dataplatform.execute_flux("copy_flux")
|
||||
assert meta.data == {"src_size": 283, "tgt_size": 283}
|
||||
|
||||
assert dataplatform.repository("test").schema("test-bronze").tables == ["test-bronze-username"]
|
||||
|
@@ -1,39 +0,0 @@
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
from plesna.dataplatform import DataPlateform
|
||||
from plesna.datastore.fs_datacatalogue import FSDataCatalogue
|
||||
|
||||
FIXTURE_DIR = Path(__file__).parent / Path("raw_data")
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def raw_catalogue(tmp_path):
|
||||
raw_path = Path(tmp_path) / "raw"
|
||||
return FSDataCatalogue(raw_path)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def bronze_catalogue(tmp_path):
|
||||
bronze_path = Path(tmp_path) / "bronze"
|
||||
return FSDataCatalogue(bronze_path)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def silver_catalogue(tmp_path):
|
||||
silver_path = Path(tmp_path) / "silver"
|
||||
return FSDataCatalogue(silver_path)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def dataplateform(
|
||||
raw_catalogue: FSDataCatalogue,
|
||||
bronze_catalogue: FSDataCatalogue,
|
||||
silver_catalogue: FSDataCatalogue,
|
||||
):
|
||||
dp = DataPlateform()
|
||||
dp.add_datacatalague("raw", raw_catalogue)
|
||||
dp.add_datacatalague("bronze", bronze_catalogue)
|
||||
dp.add_datacatalague("silver", silver_catalogue)
|
||||
pass
|
||||
|
@@ -6,7 +6,7 @@ import pytest
|
||||
|
||||
from plesna.models.flux import FluxMetaData
|
||||
from plesna.storage.metadata_repository.fs_metadata_repository import FSMetaDataRepository
|
||||
from plesna.storage.metadata_repository.metadata_repository import ExecutionLog
|
||||
from plesna.storage.metadata_repository.metadata_repository import ExecutionLog, ModificationLog
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
@@ -30,8 +30,8 @@ def test_add_flux(location, metadata_repository):
|
||||
flux_id = "my_flux"
|
||||
metadata_repository.add_flux(flux_id)
|
||||
|
||||
metadata_filepath = location / metadata_repository.FILEMODEL["execution"].format(
|
||||
flux_id=flux_id
|
||||
metadata_filepath = location / metadata_repository.OBJECTS["flux"]["filemodel"].format(
|
||||
id=flux_id
|
||||
)
|
||||
assert metadata_filepath.exists()
|
||||
|
||||
@@ -40,6 +40,13 @@ def test_add_flux(location, metadata_repository):
|
||||
assert content == "datetime,output\n"
|
||||
|
||||
|
||||
def test_add_and_list_fluxes(metadata_repository):
|
||||
flux_ids = ["my_flux", "flux2", "blahblah"]
|
||||
for f in flux_ids:
|
||||
metadata_repository.add_flux(f)
|
||||
assert metadata_repository.fluxes() == flux_ids
|
||||
|
||||
|
||||
def test_register_flux_execution(location, metadata_repository):
|
||||
flux_id = "my_flux"
|
||||
metadata_repository.add_flux(flux_id)
|
||||
@@ -52,8 +59,8 @@ def test_register_flux_execution(location, metadata_repository):
|
||||
},
|
||||
)
|
||||
|
||||
metadata_filepath = location / metadata_repository.FILEMODEL["execution"].format(
|
||||
flux_id=flux_id
|
||||
metadata_filepath = location / metadata_repository.OBJECTS["flux"]["filemodel"].format(
|
||||
id=flux_id
|
||||
)
|
||||
with open(metadata_filepath, "r") as csvfile:
|
||||
content = csvfile.read()
|
||||
@@ -62,7 +69,7 @@ def test_register_flux_execution(location, metadata_repository):
|
||||
)
|
||||
|
||||
|
||||
def test_register_and_get_logs(metadata_repository):
|
||||
def test_register_and_get_exec_logs(metadata_repository):
|
||||
flux_id = "my_flux"
|
||||
metadata_repository.add_flux(flux_id)
|
||||
|
||||
@@ -92,7 +99,7 @@ def test_register_and_get_logs(metadata_repository):
|
||||
]
|
||||
|
||||
|
||||
def test_register_and_get_last_log(metadata_repository):
|
||||
def test_register_and_get_last_exec_log(metadata_repository):
|
||||
flux_id = "my_flux"
|
||||
metadata_repository.add_flux(flux_id)
|
||||
|
||||
@@ -114,3 +121,62 @@ def test_register_and_get_last_log(metadata_repository):
|
||||
datetime=datetime(2024, 3, 15, 14, 30),
|
||||
output=FluxMetaData(data={"truc": "chose"}),
|
||||
)
|
||||
|
||||
|
||||
def test_add_and_list_tables(metadata_repository):
|
||||
table_ids = ["my_table", "table2", "blahblah"]
|
||||
for f in table_ids:
|
||||
metadata_repository.add_table(f)
|
||||
assert metadata_repository.tables() == table_ids
|
||||
|
||||
|
||||
def test_register_table_modification(location, metadata_repository):
|
||||
table_id = "my_table"
|
||||
flux_id = "my_flux"
|
||||
metadata_repository.add_table(table_id)
|
||||
|
||||
metadata_repository.register_table_modification(
|
||||
table_id, datetime(2023, 3, 15, 14, 30), flux_id
|
||||
)
|
||||
|
||||
metadata_filepath = location / metadata_repository.OBJECTS["table"]["filemodel"].format(
|
||||
id=table_id
|
||||
)
|
||||
with open(metadata_filepath, "r") as csvfile:
|
||||
content = csvfile.read()
|
||||
assert content == "datetime,flux_id\n2023-03-15T14:30:00,my_flux\n"
|
||||
|
||||
|
||||
def test_register_and_get_mod_logs(metadata_repository):
|
||||
table_id = "my_table"
|
||||
flux_id = "my_flux"
|
||||
metadata_repository.add_table(table_id)
|
||||
|
||||
metadata_repository.register_table_modification(
|
||||
table_id, datetime(2023, 3, 15, 14, 30), flux_id
|
||||
)
|
||||
metadata_repository.register_table_modification(
|
||||
table_id, datetime(2024, 3, 15, 14, 30), flux_id
|
||||
)
|
||||
|
||||
logs = metadata_repository.table_logs(table_id)
|
||||
assert logs == [
|
||||
ModificationLog(datetime=datetime(2023, 3, 15, 14, 30), flux_id=flux_id),
|
||||
ModificationLog(datetime=datetime(2024, 3, 15, 14, 30), flux_id=flux_id),
|
||||
]
|
||||
|
||||
|
||||
def test_register_and_get_last_log(metadata_repository):
|
||||
table_id = "my_table"
|
||||
flux_id = "my_flux"
|
||||
metadata_repository.add_table(table_id)
|
||||
|
||||
metadata_repository.register_table_modification(
|
||||
table_id, datetime(2023, 3, 15, 14, 30), flux_id
|
||||
)
|
||||
metadata_repository.register_table_modification(
|
||||
table_id, datetime(2024, 3, 15, 14, 30), flux_id
|
||||
)
|
||||
|
||||
logs = metadata_repository.table(table_id)
|
||||
assert logs == ModificationLog(datetime=datetime(2024, 3, 15, 14, 30), flux_id=flux_id)
|
||||
|
Reference in New Issue
Block a user