Compare commits
15 Commits
5ebde14be9
...
dags
Author | SHA1 | Date | |
---|---|---|---|
ec19534094 | |||
d4428187d1 | |||
9118feb4c6 | |||
d7716a4b8e | |||
478a8c2403 | |||
8882317a47 | |||
2a387a1bc8 | |||
eec3a13dbb | |||
8623cd5960 | |||
543b3fe98e | |||
1a49158afa | |||
bb691acc14 | |||
90472ac868 | |||
0ae6439217 | |||
2f170d91b6 |
@@ -1,9 +1,11 @@
|
|||||||
|
from collections.abc import Callable
|
||||||
from plesna.compute.consume_flux import consume_flux
|
from plesna.compute.consume_flux import consume_flux
|
||||||
from plesna.graph.graph import Graph, Node
|
from plesna.graph.graph import Graph
|
||||||
from plesna.graph.graph_set import EdgeOnSet, GraphSet
|
from plesna.graph.graph_set import GraphSet
|
||||||
from plesna.models.flux import Flux, FluxMetaData
|
from plesna.models.flux import Flux, FluxMetaData
|
||||||
|
from plesna.models.graphs import Node
|
||||||
from plesna.models.libs.flux_graph import flux_to_edgeonset
|
from plesna.models.libs.flux_graph import flux_to_edgeonset
|
||||||
from plesna.storage.repository.repository import Repository
|
from plesna.storage.data_repository.data_repository import DataRepository
|
||||||
|
|
||||||
|
|
||||||
class DataPlateformError(Exception):
|
class DataPlateformError(Exception):
|
||||||
@@ -16,7 +18,7 @@ class DataPlateform:
|
|||||||
self._fluxes = {}
|
self._fluxes = {}
|
||||||
self._repositories = {}
|
self._repositories = {}
|
||||||
|
|
||||||
def add_repository(self, repository: Repository) -> str:
|
def add_repository(self, repository: DataRepository) -> str:
|
||||||
if repository.id in self._repositories:
|
if repository.id in self._repositories:
|
||||||
raise DataPlateformError("The repository {repository.id} already exists")
|
raise DataPlateformError("The repository {repository.id} already exists")
|
||||||
|
|
||||||
@@ -27,42 +29,65 @@ class DataPlateform:
|
|||||||
def repositories(self) -> list[str]:
|
def repositories(self) -> list[str]:
|
||||||
return list(self._repositories)
|
return list(self._repositories)
|
||||||
|
|
||||||
def repository(self, id: str) -> Repository:
|
def repository(self, id: str) -> DataRepository:
|
||||||
return self._repositories[id]
|
return self._repositories[id]
|
||||||
|
|
||||||
def add_flux(self, name: str, flux: Flux) -> str:
|
def is_valid_flux(self, flux: Flux) -> bool:
|
||||||
if name in self._fluxes:
|
return True
|
||||||
raise DataPlateformError("The flux {name} already exists")
|
|
||||||
|
|
||||||
self._fluxes[name] = flux
|
def add_flux(self, flux: Flux) -> str:
|
||||||
return name
|
if flux.id in self._fluxes:
|
||||||
|
raise DataPlateformError("The flux {flux} already exists")
|
||||||
|
|
||||||
|
assert self.is_valid_flux(flux)
|
||||||
|
self._fluxes[flux.id] = flux
|
||||||
|
return flux.id
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def fluxes(self) -> list[str]:
|
def fluxes(self) -> list[str]:
|
||||||
return list(self._fluxes)
|
return list(self._fluxes)
|
||||||
|
|
||||||
def flux(self, name: str) -> Flux:
|
def flux(self, flux_id: str) -> Flux:
|
||||||
return self._fluxes[name]
|
return self._fluxes[flux_id]
|
||||||
|
|
||||||
def execute_flux(self, name: str) -> FluxMetaData:
|
def execute_flux(self, flux_id: str) -> FluxMetaData:
|
||||||
if name not in self._fluxes:
|
if flux_id not in self._fluxes:
|
||||||
raise DataPlateformError("The flux {name} is not registered")
|
raise DataPlateformError("The flux {flux_id} is not registered")
|
||||||
return consume_flux(self._fluxes[name])
|
return consume_flux(self._fluxes[flux_id])
|
||||||
|
|
||||||
@property
|
def graphset(
|
||||||
def graphset(self) -> GraphSet:
|
self,
|
||||||
|
name_flux: Callable = lambda flux: flux.id,
|
||||||
|
meta_flux: Callable = lambda _: {},
|
||||||
|
name_table: Callable = lambda table: table.id,
|
||||||
|
meta_table: Callable = lambda _: {},
|
||||||
|
) -> GraphSet:
|
||||||
graphset = GraphSet()
|
graphset = GraphSet()
|
||||||
for flux in self._fluxes.values():
|
for flux in self._fluxes.values():
|
||||||
edge = flux_to_edgeonset(flux)
|
edge = flux_to_edgeonset(flux, name_flux, meta_flux, name_table, meta_table)
|
||||||
graphset.append(edge)
|
graphset.append(edge)
|
||||||
|
|
||||||
return graphset
|
return graphset
|
||||||
|
|
||||||
@property
|
def graph(
|
||||||
def graph(self) -> Graph:
|
self,
|
||||||
graph = Graph()
|
name_flux: Callable = lambda flux: flux.id,
|
||||||
|
meta_flux: Callable = lambda _: {},
|
||||||
|
name_table: Callable = lambda table: table.id,
|
||||||
|
meta_table: Callable = lambda _: {},
|
||||||
|
) -> Graph:
|
||||||
|
"""Get the graph of fluxes and tables
|
||||||
|
|
||||||
|
:param name_flux: function on flux to name the edge
|
||||||
|
:param meta_flux: function on flux to attribute metadata to edge
|
||||||
|
:param name_table: function on table to name nodes
|
||||||
|
:param meta_table: function on flux to attribute metadata to nodes
|
||||||
|
|
||||||
|
"""
|
||||||
|
graph = self.graphset(name_flux, meta_flux, name_table, meta_table).to_graph()
|
||||||
for repo in self._repositories.values():
|
for repo in self._repositories.values():
|
||||||
for schema in repo.schemas():
|
for schema in repo.schemas():
|
||||||
for table in repo.tables(schema):
|
for table in repo.tables(schema):
|
||||||
graph.add_node(Node(name=table))
|
t = repo.table(table)
|
||||||
|
graph.add_node(Node(name=name_table(t), metadata=meta_table(t)))
|
||||||
return graph
|
return graph
|
||||||
|
@@ -1,3 +1,4 @@
|
|||||||
|
from typing import Set
|
||||||
from pydantic import BaseModel
|
from pydantic import BaseModel
|
||||||
from functools import reduce
|
from functools import reduce
|
||||||
from plesna.models.graphs import Node, Edge
|
from plesna.models.graphs import Node, Edge
|
||||||
@@ -5,8 +6,8 @@ from plesna.models.graphs import Node, Edge
|
|||||||
|
|
||||||
class Graph:
|
class Graph:
|
||||||
def __init__(self, nodes: list[Node] = [], edges: list[Edge] = []):
|
def __init__(self, nodes: list[Node] = [], edges: list[Edge] = []):
|
||||||
self._edges = []
|
self._edges: list[Edge] = []
|
||||||
self._nodes = set()
|
self._nodes: Set[Node] = set()
|
||||||
self.add_edges(edges)
|
self.add_edges(edges)
|
||||||
self.add_nodes(nodes)
|
self.add_nodes(nodes)
|
||||||
|
|
||||||
|
@@ -14,6 +14,10 @@ class GraphSet:
|
|||||||
self._node_sets.add(frozenset(edge.sources))
|
self._node_sets.add(frozenset(edge.sources))
|
||||||
self._node_sets.add(frozenset(edge.targets))
|
self._node_sets.add(frozenset(edge.targets))
|
||||||
|
|
||||||
|
@property
|
||||||
|
def edges(self) -> Set[EdgeOnSet]:
|
||||||
|
return self._edges
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def node_sets(self) -> Set[frozenset]:
|
def node_sets(self) -> Set[frozenset]:
|
||||||
return self._node_sets
|
return self._node_sets
|
||||||
@@ -24,7 +28,7 @@ class GraphSet:
|
|||||||
graph.add_nodes(node_set)
|
graph.add_nodes(node_set)
|
||||||
for edge in self._edges:
|
for edge in self._edges:
|
||||||
flatten_edge = [
|
flatten_edge = [
|
||||||
Edge(arrow=edge.arrow, source=s, target=t, edge_kwrds=edge.edge_kwrds)
|
Edge(arrow=edge.arrow, source=s, target=t, metadata=edge.metadata)
|
||||||
for (s, t) in product(edge.sources, edge.targets)
|
for (s, t) in product(edge.sources, edge.targets)
|
||||||
]
|
]
|
||||||
graph.add_edges(flatten_edge)
|
graph.add_edges(flatten_edge)
|
||||||
|
@@ -3,6 +3,7 @@ from pydantic import BaseModel
|
|||||||
|
|
||||||
class Node(BaseModel):
|
class Node(BaseModel):
|
||||||
name: str
|
name: str
|
||||||
|
metadata: dict = {}
|
||||||
|
|
||||||
def __hash__(self):
|
def __hash__(self):
|
||||||
return hash(self.name)
|
return hash(self.name)
|
||||||
@@ -12,11 +13,11 @@ class Edge(BaseModel):
|
|||||||
arrow: str
|
arrow: str
|
||||||
source: Node
|
source: Node
|
||||||
target: Node
|
target: Node
|
||||||
edge_kwrds: dict = {}
|
metadata: dict = {}
|
||||||
|
|
||||||
|
|
||||||
class EdgeOnSet(BaseModel):
|
class EdgeOnSet(BaseModel):
|
||||||
arrow: str
|
arrow: str
|
||||||
sources: list[Node]
|
sources: list[Node]
|
||||||
targets: list[Node]
|
targets: list[Node]
|
||||||
edge_kwrds: dict = {}
|
metadata: dict = {}
|
||||||
|
29
plesna/models/libs/flux_graph.py
Normal file
29
plesna/models/libs/flux_graph.py
Normal file
@@ -0,0 +1,29 @@
|
|||||||
|
from collections.abc import Callable
|
||||||
|
from plesna.models.flux import Flux
|
||||||
|
from plesna.models.graphs import EdgeOnSet, Node
|
||||||
|
|
||||||
|
|
||||||
|
def flux_to_edgeonset(
|
||||||
|
flux: Flux,
|
||||||
|
name_flux: Callable = lambda flux: flux.id,
|
||||||
|
meta_flux: Callable = lambda _: {},
|
||||||
|
name_table: Callable = lambda table: table.id,
|
||||||
|
meta_table: Callable = lambda _: {},
|
||||||
|
) -> EdgeOnSet:
|
||||||
|
"""Convert a flux to an EdgeOnSet
|
||||||
|
|
||||||
|
:param flux: the flux
|
||||||
|
:name_flux: function on flux which returns the name of the arrow from flux
|
||||||
|
:meta_flux: function on flux which returns a dict to store in metadata field
|
||||||
|
:name_table: function on table which returns the name of node
|
||||||
|
:meta_table: function on table which returns metadata of node
|
||||||
|
|
||||||
|
"""
|
||||||
|
sources = [Node(name=name_table(s), metadata=meta_table(s)) for s in flux.sources]
|
||||||
|
targets = [Node(name=name_table(s), metadata=meta_table(s)) for s in flux.targets]
|
||||||
|
return EdgeOnSet(
|
||||||
|
arrow=name_flux(flux),
|
||||||
|
sources=sources,
|
||||||
|
targets=targets,
|
||||||
|
metadata=meta_flux(flux),
|
||||||
|
)
|
@@ -36,8 +36,9 @@ class Table(BaseModel):
|
|||||||
schema_id: str
|
schema_id: str
|
||||||
name: str
|
name: str
|
||||||
value: str
|
value: str
|
||||||
partitions: list[str] = []
|
|
||||||
datas: list[str]
|
datas: list[str]
|
||||||
|
partitions: list[str] = []
|
||||||
|
metadata: dict = {}
|
||||||
|
|
||||||
|
|
||||||
class Partition(BaseModel):
|
class Partition(BaseModel):
|
||||||
|
@@ -3,7 +3,7 @@ import abc
|
|||||||
from plesna.models.storage import Partition, Schema, Table
|
from plesna.models.storage import Partition, Schema, Table
|
||||||
|
|
||||||
|
|
||||||
class Repository:
|
class DataRepository:
|
||||||
def __init__(self, id: str, name: str):
|
def __init__(self, id: str, name: str):
|
||||||
self._id = id
|
self._id = id
|
||||||
self._name = name
|
self._name = name
|
@@ -3,8 +3,8 @@ from pathlib import Path
|
|||||||
from pydantic import BaseModel, computed_field
|
from pydantic import BaseModel, computed_field
|
||||||
|
|
||||||
from plesna.libs.string_tools import extract_values_from_pattern
|
from plesna.libs.string_tools import extract_values_from_pattern
|
||||||
from plesna.models.storage import Partition, Schema, Table
|
from plesna.models.storage import Schema, Table
|
||||||
from plesna.storage.repository.repository import Repository
|
from plesna.storage.data_repository.data_repository import DataRepository
|
||||||
|
|
||||||
|
|
||||||
class FSTable(BaseModel):
|
class FSTable(BaseModel):
|
||||||
@@ -58,8 +58,8 @@ class FSRepositoryError(ValueError):
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
class FSRepository(Repository):
|
class FSDataRepository(DataRepository):
|
||||||
"""Repository based on files tree structure
|
"""Data Repository based on files tree structure
|
||||||
|
|
||||||
- first level: schemas
|
- first level: schemas
|
||||||
- second level: tables
|
- second level: tables
|
0
plesna/storage/metadata_repository/__init__.py
Normal file
0
plesna/storage/metadata_repository/__init__.py
Normal file
132
plesna/storage/metadata_repository/fs_metadata_repository.py
Normal file
132
plesna/storage/metadata_repository/fs_metadata_repository.py
Normal file
@@ -0,0 +1,132 @@
|
|||||||
|
from pathlib import Path
|
||||||
|
from datetime import datetime
|
||||||
|
import csv
|
||||||
|
import json
|
||||||
|
from typing import Iterable
|
||||||
|
|
||||||
|
from plesna.libs.string_tools import StringToolsError, extract_values_from_pattern
|
||||||
|
from plesna.storage.metadata_repository.metadata_repository import (
|
||||||
|
ExecutionLog,
|
||||||
|
MetaDataRepository,
|
||||||
|
ModificationLog,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class FSMetaDataRepositoryError(ValueError):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class FSMetaDataRepository(MetaDataRepository):
|
||||||
|
"""MetaData Repository based on csv files
|
||||||
|
|
||||||
|
Files organisations: executions and modifications are stored in csv file according to ***_FILEMODEL
|
||||||
|
|
||||||
|
"""
|
||||||
|
|
||||||
|
OBJECTS = {
|
||||||
|
"flux": {"filemodel": "{id}_execution.csv", "logmodel": ExecutionLog},
|
||||||
|
"table": {"filemodel": "{id}_execution.csv", "logmodel": ModificationLog},
|
||||||
|
}
|
||||||
|
|
||||||
|
def __init__(self, basepath: str):
|
||||||
|
super().__init__()
|
||||||
|
|
||||||
|
self._basepath = Path(basepath)
|
||||||
|
assert self._basepath.exists()
|
||||||
|
|
||||||
|
def get_things(self, what: str) -> list[str]:
|
||||||
|
"""List all ids for 'what'"""
|
||||||
|
whats = []
|
||||||
|
for filepath in self._basepath.iterdir():
|
||||||
|
try:
|
||||||
|
founded = extract_values_from_pattern(
|
||||||
|
self.OBJECTS[what]["filemodel"], filepath.name
|
||||||
|
)
|
||||||
|
except StringToolsError:
|
||||||
|
pass
|
||||||
|
else:
|
||||||
|
whats.append(founded["id"])
|
||||||
|
return whats
|
||||||
|
|
||||||
|
def fluxes(self) -> list[str]:
|
||||||
|
"""List fluxes's ids"""
|
||||||
|
return self.get_things(what="flux")
|
||||||
|
|
||||||
|
def tables(
|
||||||
|
self,
|
||||||
|
) -> list[str]:
|
||||||
|
"""List all table's ids"""
|
||||||
|
return self.get_things(what="table")
|
||||||
|
|
||||||
|
def _add_thing(self, what: str, id: str) -> str:
|
||||||
|
"""Add the new things 'what'"""
|
||||||
|
filepath = self._basepath / self.OBJECTS[what]["filemodel"].format(id=id)
|
||||||
|
filepath.touch()
|
||||||
|
with open(filepath, "a") as csvfile:
|
||||||
|
writer = csv.DictWriter(
|
||||||
|
csvfile, fieldnames=self.OBJECTS[what]["logmodel"].model_fields.keys()
|
||||||
|
)
|
||||||
|
writer.writeheader()
|
||||||
|
return id
|
||||||
|
|
||||||
|
def add_flux(self, flux_id: str) -> str:
|
||||||
|
"""Get the flux metadata"""
|
||||||
|
return self._add_thing(what="flux", id=flux_id)
|
||||||
|
|
||||||
|
def add_table(self, table_id: str) -> str:
|
||||||
|
"""Get the table metadata"""
|
||||||
|
return self._add_thing(what="table", id=table_id)
|
||||||
|
|
||||||
|
def _register_things_event(self, what: str, id: str, dt: datetime, event: dict) -> ExecutionLog:
|
||||||
|
filepath = self._basepath / self.OBJECTS[what]["filemodel"].format(id=id)
|
||||||
|
if not filepath.exists:
|
||||||
|
raise FSMetaDataRepositoryError(f"The {what} {id} hasn't been added yet.")
|
||||||
|
|
||||||
|
metadata_ = self.OBJECTS[what]["logmodel"](datetime=dt, **event)
|
||||||
|
|
||||||
|
with open(filepath, "a") as csvfile:
|
||||||
|
writer = csv.DictWriter(
|
||||||
|
csvfile, fieldnames=self.OBJECTS[what]["logmodel"].model_fields.keys()
|
||||||
|
)
|
||||||
|
writer.writerow(metadata_.to_flat_dict())
|
||||||
|
|
||||||
|
return metadata_
|
||||||
|
|
||||||
|
def register_flux_execution(self, flux_id: str, dt: datetime, output: dict) -> ExecutionLog:
|
||||||
|
"""Get the flux metadata"""
|
||||||
|
return self._register_things_event("flux", flux_id, dt, {"output": {"data": output}})
|
||||||
|
|
||||||
|
def register_table_modification(self, table_id: str, dt: datetime, flux_id: str) -> str:
|
||||||
|
"""Get the table metadata"""
|
||||||
|
return self._register_things_event("table", table_id, dt, {"flux_id": flux_id})
|
||||||
|
|
||||||
|
def _get_all_log(self, what: str, id: str) -> Iterable[dict]:
|
||||||
|
"""Generate log dict from history"""
|
||||||
|
filepath = self._basepath / self.OBJECTS[what]["filemodel"].format(id=id)
|
||||||
|
if not filepath.exists:
|
||||||
|
raise FSMetaDataRepositoryError(f"The {what} {id} hasn't been added yet.")
|
||||||
|
with open(filepath, "r") as csvfile:
|
||||||
|
reader = csv.DictReader(csvfile)
|
||||||
|
for row in reader:
|
||||||
|
yield row
|
||||||
|
|
||||||
|
def flux_logs(self, flux_id: str) -> list[ExecutionLog]:
|
||||||
|
"""Get all flux logs"""
|
||||||
|
logs = []
|
||||||
|
for logline in self._get_all_log("flux", flux_id):
|
||||||
|
logline["output"] = json.loads(logline["output"])
|
||||||
|
logs.append(self.OBJECTS["flux"]["logmodel"](**logline))
|
||||||
|
|
||||||
|
return logs
|
||||||
|
|
||||||
|
def flux(self, flux_id: str) -> ExecutionLog:
|
||||||
|
"""Get the last flux log"""
|
||||||
|
return max(self.flux_logs(flux_id), key=lambda l: l.datetime)
|
||||||
|
|
||||||
|
def table_logs(self, table_id: str) -> list[ModificationLog]:
|
||||||
|
"""Get all table's modification metadatas"""
|
||||||
|
return [ModificationLog(**log) for log in self._get_all_log("table", table_id)]
|
||||||
|
|
||||||
|
def table(self, table_id: str) -> ModificationLog:
|
||||||
|
"""Get the last table's modification metadatas"""
|
||||||
|
return max(self.table_logs(table_id), key=lambda l: l.datetime)
|
81
plesna/storage/metadata_repository/metadata_repository.py
Normal file
81
plesna/storage/metadata_repository/metadata_repository.py
Normal file
@@ -0,0 +1,81 @@
|
|||||||
|
import abc
|
||||||
|
from datetime import datetime
|
||||||
|
|
||||||
|
from pydantic import BaseModel
|
||||||
|
|
||||||
|
from plesna.models.flux import FluxMetaData
|
||||||
|
|
||||||
|
|
||||||
|
class ModificationLog(BaseModel):
|
||||||
|
datetime: datetime
|
||||||
|
flux_id: str
|
||||||
|
|
||||||
|
def to_flat_dict(self):
|
||||||
|
return {"datetime": self.datetime.isoformat(), "flux_id": self.flux_id}
|
||||||
|
|
||||||
|
|
||||||
|
class ExecutionLog(BaseModel):
|
||||||
|
datetime: datetime
|
||||||
|
output: FluxMetaData
|
||||||
|
|
||||||
|
def to_flat_dict(self):
|
||||||
|
return {"datetime": self.datetime.isoformat(), "output": self.output.model_dump_json()}
|
||||||
|
|
||||||
|
|
||||||
|
class MetaDataRepository:
|
||||||
|
"""Object that stores metadata about flux, schema, tables"""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
pass
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
def fluxes(self) -> list[str]:
|
||||||
|
"""List fluxes's ids"""
|
||||||
|
raise NotImplementedError
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
def add_flux(self, flux_id: str) -> str:
|
||||||
|
"""Get the flux metadata"""
|
||||||
|
raise NotImplementedError
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
def register_flux_execution(self, flux_id: str, dt: datetime, metadata: dict) -> str:
|
||||||
|
"""Get the flux metadata"""
|
||||||
|
raise NotImplementedError
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
def flux(self, schema_id: str) -> ExecutionLog:
|
||||||
|
"""Get the flux last execution metadata"""
|
||||||
|
raise NotImplementedError
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
def flux_logs(self, schema_id: str) -> list[ExecutionLog]:
|
||||||
|
"""Get all the flux execution metadata"""
|
||||||
|
raise NotImplementedError
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
def tables(
|
||||||
|
self,
|
||||||
|
) -> list[str]:
|
||||||
|
"""List all table's ids"""
|
||||||
|
raise NotImplementedError
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
def add_table(self, table_id: str) -> str:
|
||||||
|
"""Get the table metadata"""
|
||||||
|
raise NotImplementedError
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
def register_table_modification(self, table_id: str, dt: datetime, metadata: dict) -> str:
|
||||||
|
"""Get the table metadata"""
|
||||||
|
raise NotImplementedError
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
def table(self, table_id: str) -> ModificationLog:
|
||||||
|
"""Get the last table's modification metadatas"""
|
||||||
|
raise NotImplementedError
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
def table_logs(self, table_id: str) -> list[ModificationLog]:
|
||||||
|
"""Get all table's modification metadatas"""
|
||||||
|
raise NotImplementedError
|
16
pyproject.toml
Normal file
16
pyproject.toml
Normal file
@@ -0,0 +1,16 @@
|
|||||||
|
[project]
|
||||||
|
name = "plesna"
|
||||||
|
version = "0.1.0"
|
||||||
|
description = "Add your description here"
|
||||||
|
readme = "README.md"
|
||||||
|
requires-python = ">=3.13"
|
||||||
|
dependencies = [
|
||||||
|
"ruff>=0.8.5",
|
||||||
|
]
|
||||||
|
|
||||||
|
[tool.ruff]
|
||||||
|
line-length = 100
|
||||||
|
indent-width = 4
|
||||||
|
[tool.ruff.lint]
|
||||||
|
select = ["E", "F"]
|
||||||
|
ignore = ["F401"]
|
@@ -4,15 +4,15 @@ from pathlib import Path
|
|||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
from plesna.dataplatform import DataPlateform
|
from plesna.dataplatform import DataPlateform
|
||||||
from plesna.models.graphs import Node
|
from plesna.models.graphs import Edge, EdgeOnSet, Node
|
||||||
from plesna.models.flux import Flux, Transformation
|
from plesna.models.flux import Flux, Transformation
|
||||||
from plesna.storage.repository.fs_repository import FSRepository
|
from plesna.storage.data_repository.fs_data_repository import FSDataRepository
|
||||||
|
|
||||||
FIXTURE_DIR = Path(__file__).parent.parent / Path("raw_datas")
|
FIXTURE_DIR = Path(__file__).parent.parent / Path("raw_datas")
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def repository(tmp_path) -> FSRepository:
|
def repository(tmp_path) -> FSDataRepository:
|
||||||
example_src = FIXTURE_DIR
|
example_src = FIXTURE_DIR
|
||||||
assert example_src.exists()
|
assert example_src.exists()
|
||||||
|
|
||||||
@@ -24,11 +24,11 @@ def repository(tmp_path) -> FSRepository:
|
|||||||
silver_path = Path(tmp_path) / "silver"
|
silver_path = Path(tmp_path) / "silver"
|
||||||
silver_path.mkdir()
|
silver_path.mkdir()
|
||||||
|
|
||||||
return FSRepository("test", "test", tmp_path)
|
return FSDataRepository("test", "test", tmp_path)
|
||||||
|
|
||||||
|
|
||||||
def test_add_repository(
|
def test_add_repository(
|
||||||
repository: FSRepository,
|
repository: FSDataRepository,
|
||||||
):
|
):
|
||||||
dp = DataPlateform()
|
dp = DataPlateform()
|
||||||
dp.add_repository(repository)
|
dp.add_repository(repository)
|
||||||
@@ -39,7 +39,7 @@ def test_add_repository(
|
|||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def copy_flux(repository: FSRepository) -> Flux:
|
def copy_flux(repository: FSDataRepository) -> Flux:
|
||||||
raw_username = [repository.table("test-raw-username")]
|
raw_username = [repository.table("test-raw-username")]
|
||||||
bronze_username = [repository.table("test-bronze-username")]
|
bronze_username = [repository.table("test-bronze-username")]
|
||||||
|
|
||||||
@@ -62,7 +62,7 @@ def copy_flux(repository: FSRepository) -> Flux:
|
|||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def foo_flux(repository: FSRepository) -> Flux:
|
def foo_flux(repository: FSDataRepository) -> Flux:
|
||||||
src = [
|
src = [
|
||||||
repository.table("test-raw-username"),
|
repository.table("test-raw-username"),
|
||||||
repository.table("test-raw-recovery"),
|
repository.table("test-raw-recovery"),
|
||||||
@@ -84,22 +84,22 @@ def foo_flux(repository: FSRepository) -> Flux:
|
|||||||
return flux
|
return flux
|
||||||
|
|
||||||
|
|
||||||
def test_add_flux(repository: FSRepository, copy_flux: Flux):
|
def test_add_flux(repository: FSDataRepository, copy_flux: Flux, foo_flux: Flux):
|
||||||
dataplatform = DataPlateform()
|
dataplatform = DataPlateform()
|
||||||
dataplatform.add_repository(repository)
|
dataplatform.add_repository(repository)
|
||||||
|
|
||||||
dataplatform.add_flux(name="copy_flux", flux=copy_flux)
|
dataplatform.add_flux(flux=copy_flux)
|
||||||
assert dataplatform.fluxes == ["copy_flux"]
|
assert dataplatform.fluxes == ["copy_flux"]
|
||||||
dataplatform.add_flux(name="copy_flux_bis", flux=copy_flux)
|
dataplatform.add_flux(flux=foo_flux)
|
||||||
assert dataplatform.fluxes == ["copy_flux", "copy_flux_bis"]
|
assert dataplatform.fluxes == ["copy_flux", "foo_flux"]
|
||||||
|
|
||||||
assert dataplatform.flux("copy_flux") == copy_flux
|
assert dataplatform.flux("copy_flux") == copy_flux
|
||||||
assert dataplatform.flux("copy_flux_bis") == copy_flux
|
assert dataplatform.flux("foo_flux") == foo_flux
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def dataplatform(
|
def dataplatform(
|
||||||
repository: FSRepository,
|
repository: FSDataRepository,
|
||||||
foo_flux: Flux,
|
foo_flux: Flux,
|
||||||
copy_flux: Flux,
|
copy_flux: Flux,
|
||||||
) -> DataPlateform:
|
) -> DataPlateform:
|
||||||
@@ -107,8 +107,8 @@ def dataplatform(
|
|||||||
|
|
||||||
dp.add_repository(repository)
|
dp.add_repository(repository)
|
||||||
|
|
||||||
dp.add_flux("foo", foo_flux)
|
dp.add_flux(foo_flux)
|
||||||
dp.add_flux("raw_brz_copy_username", copy_flux)
|
dp.add_flux(copy_flux)
|
||||||
return dp
|
return dp
|
||||||
|
|
||||||
|
|
||||||
@@ -127,14 +127,8 @@ def test_listing_content(dataplatform: DataPlateform):
|
|||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
def test_content_from_graph(dataplatform: DataPlateform):
|
def test_content_from_graphset(dataplatform: DataPlateform):
|
||||||
assert dataplatform.graph.nodes == {
|
assert dataplatform.graphset().node_sets == {
|
||||||
Node(name="test-raw-recovery", infos={}),
|
|
||||||
Node(name="test-raw-salary", infos={}),
|
|
||||||
Node(name="test-raw-username", infos={}),
|
|
||||||
}
|
|
||||||
|
|
||||||
assert dataplatform.graphset.node_sets == {
|
|
||||||
frozenset(
|
frozenset(
|
||||||
{
|
{
|
||||||
Node(name="test-bronze-username"),
|
Node(name="test-bronze-username"),
|
||||||
@@ -157,15 +151,130 @@ def test_content_from_graph(dataplatform: DataPlateform):
|
|||||||
}
|
}
|
||||||
),
|
),
|
||||||
}
|
}
|
||||||
|
assert dataplatform.graphset().edges == [
|
||||||
|
EdgeOnSet(
|
||||||
|
arrow="foo_flux",
|
||||||
|
sources=[Node(name="test-raw-username"), Node(name="test-raw-recovery")],
|
||||||
|
targets=[Node(name="test-bronze-foo")],
|
||||||
|
metadata={},
|
||||||
|
),
|
||||||
|
EdgeOnSet(
|
||||||
|
arrow="copy_flux",
|
||||||
|
sources=[Node(name="test-raw-username")],
|
||||||
|
targets=[Node(name="test-bronze-username")],
|
||||||
|
metadata={},
|
||||||
|
),
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
def test_content_from_graph(dataplatform: DataPlateform):
|
||||||
|
assert dataplatform.graph().nodes == {
|
||||||
|
Node(name="test-raw-recovery", metadata={}),
|
||||||
|
Node(name="test-raw-salary", metadata={}),
|
||||||
|
Node(name="test-raw-username", metadata={}),
|
||||||
|
Node(name="test-bronze-username", metadata={}),
|
||||||
|
Node(name="test-bronze-foo", metadata={}),
|
||||||
|
Node(name="test-raw-username", metadata={}),
|
||||||
|
}
|
||||||
|
assert dataplatform.graph().edges == [
|
||||||
|
Edge(
|
||||||
|
arrow="foo_flux",
|
||||||
|
source=Node(name="test-raw-username"),
|
||||||
|
target=Node(name="test-bronze-foo"),
|
||||||
|
metadata={},
|
||||||
|
),
|
||||||
|
Edge(
|
||||||
|
arrow="foo_flux",
|
||||||
|
source=Node(name="test-raw-recovery"),
|
||||||
|
target=Node(name="test-bronze-foo"),
|
||||||
|
metadata={},
|
||||||
|
),
|
||||||
|
Edge(
|
||||||
|
arrow="copy_flux",
|
||||||
|
source=Node(name="test-raw-username"),
|
||||||
|
target=Node(name="test-bronze-username"),
|
||||||
|
metadata={},
|
||||||
|
),
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
def test_content_from_graph_arguments(dataplatform: DataPlateform):
|
||||||
|
name_flux = lambda flux: f"flux-{flux.id}"
|
||||||
|
meta_flux = lambda flux: {"name": flux.name}
|
||||||
|
meta_table = lambda table: {"id": table.id, "partitions": table.partitions}
|
||||||
|
assert dataplatform.graph(
|
||||||
|
name_flux=name_flux, meta_flux=meta_flux, meta_table=meta_table
|
||||||
|
).nodes == {
|
||||||
|
Node(name="test-bronze-foo", metadata={"id": "test-bronze-foo", "partitions": []}),
|
||||||
|
Node(
|
||||||
|
name="test-raw-salary", metadata={"id": "test-raw-salary", "partitions": ["salary.pdf"]}
|
||||||
|
),
|
||||||
|
Node(
|
||||||
|
name="test-raw-recovery",
|
||||||
|
metadata={
|
||||||
|
"id": "test-raw-recovery",
|
||||||
|
"partitions": ["2022.csv", "2023.csv", "2024.csv"],
|
||||||
|
},
|
||||||
|
),
|
||||||
|
Node(
|
||||||
|
name="test-bronze-username", metadata={"id": "test-bronze-username", "partitions": []}
|
||||||
|
),
|
||||||
|
Node(
|
||||||
|
name="test-raw-username",
|
||||||
|
metadata={"id": "test-raw-username", "partitions": ["username.csv"]},
|
||||||
|
),
|
||||||
|
}
|
||||||
|
|
||||||
|
assert dataplatform.graph(
|
||||||
|
name_flux=name_flux, meta_flux=meta_flux, meta_table=meta_table
|
||||||
|
).edges == [
|
||||||
|
Edge(
|
||||||
|
arrow="flux-foo_flux",
|
||||||
|
source=Node(
|
||||||
|
name="test-raw-username",
|
||||||
|
metadata={"id": "test-raw-username", "partitions": ["username.csv"]},
|
||||||
|
),
|
||||||
|
target=Node(
|
||||||
|
name="test-bronze-foo", metadata={"id": "test-bronze-foo", "partitions": []}
|
||||||
|
),
|
||||||
|
metadata={"name": "foo"},
|
||||||
|
),
|
||||||
|
Edge(
|
||||||
|
arrow="flux-foo_flux",
|
||||||
|
source=Node(
|
||||||
|
name="test-raw-recovery",
|
||||||
|
metadata={
|
||||||
|
"id": "test-raw-recovery",
|
||||||
|
"partitions": ["2022.csv", "2023.csv", "2024.csv"],
|
||||||
|
},
|
||||||
|
),
|
||||||
|
target=Node(
|
||||||
|
name="test-bronze-foo", metadata={"id": "test-bronze-foo", "partitions": []}
|
||||||
|
),
|
||||||
|
metadata={"name": "foo"},
|
||||||
|
),
|
||||||
|
Edge(
|
||||||
|
arrow="flux-copy_flux",
|
||||||
|
source=Node(
|
||||||
|
name="test-raw-username",
|
||||||
|
metadata={"id": "test-raw-username", "partitions": ["username.csv"]},
|
||||||
|
),
|
||||||
|
target=Node(
|
||||||
|
name="test-bronze-username",
|
||||||
|
metadata={"id": "test-bronze-username", "partitions": []},
|
||||||
|
),
|
||||||
|
metadata={"name": "copy"},
|
||||||
|
),
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
def test_execute_flux(dataplatform: DataPlateform):
|
def test_execute_flux(dataplatform: DataPlateform):
|
||||||
meta = dataplatform.execute_flux("foo")
|
meta = dataplatform.execute_flux("foo_flux")
|
||||||
assert meta.data == {"who": "foo"}
|
assert meta.data == {"who": "foo"}
|
||||||
|
|
||||||
assert dataplatform.repository("test").schema("test-bronze").tables == []
|
assert dataplatform.repository("test").schema("test-bronze").tables == []
|
||||||
|
|
||||||
meta = dataplatform.execute_flux("raw_brz_copy_username")
|
meta = dataplatform.execute_flux("copy_flux")
|
||||||
assert meta.data == {"src_size": 283, "tgt_size": 283}
|
assert meta.data == {"src_size": 283, "tgt_size": 283}
|
||||||
|
|
||||||
assert dataplatform.repository("test").schema("test-bronze").tables == ["test-bronze-username"]
|
assert dataplatform.repository("test").schema("test-bronze").tables == ["test-bronze-username"]
|
||||||
|
@@ -1,39 +0,0 @@
|
|||||||
from pathlib import Path
|
|
||||||
|
|
||||||
import pytest
|
|
||||||
|
|
||||||
from plesna.dataplatform import DataPlateform
|
|
||||||
from plesna.datastore.fs_datacatalogue import FSDataCatalogue
|
|
||||||
|
|
||||||
FIXTURE_DIR = Path(__file__).parent / Path("raw_data")
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
|
||||||
def raw_catalogue(tmp_path):
|
|
||||||
raw_path = Path(tmp_path) / "raw"
|
|
||||||
return FSDataCatalogue(raw_path)
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
|
||||||
def bronze_catalogue(tmp_path):
|
|
||||||
bronze_path = Path(tmp_path) / "bronze"
|
|
||||||
return FSDataCatalogue(bronze_path)
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
|
||||||
def silver_catalogue(tmp_path):
|
|
||||||
silver_path = Path(tmp_path) / "silver"
|
|
||||||
return FSDataCatalogue(silver_path)
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
|
||||||
def dataplateform(
|
|
||||||
raw_catalogue: FSDataCatalogue,
|
|
||||||
bronze_catalogue: FSDataCatalogue,
|
|
||||||
silver_catalogue: FSDataCatalogue,
|
|
||||||
):
|
|
||||||
dp = DataPlateform()
|
|
||||||
dp.add_datacatalague("raw", raw_catalogue)
|
|
||||||
dp.add_datacatalague("bronze", bronze_catalogue)
|
|
||||||
dp.add_datacatalague("silver", silver_catalogue)
|
|
||||||
pass
|
|
||||||
|
@@ -3,7 +3,7 @@ from pathlib import Path
|
|||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
from plesna.storage.repository.fs_repository import FSRepository
|
from plesna.storage.data_repository.fs_data_repository import FSDataRepository
|
||||||
|
|
||||||
FIXTURE_DIR = Path(__file__).parent.parent / Path("./raw_datas/")
|
FIXTURE_DIR = Path(__file__).parent.parent / Path("./raw_datas/")
|
||||||
|
|
||||||
@@ -20,7 +20,7 @@ def location(tmp_path):
|
|||||||
|
|
||||||
|
|
||||||
def test_init(location):
|
def test_init(location):
|
||||||
repo = FSRepository("example", "example", location)
|
repo = FSDataRepository("example", "example", location)
|
||||||
assert repo.ls() == [
|
assert repo.ls() == [
|
||||||
"schema",
|
"schema",
|
||||||
]
|
]
|
||||||
@@ -44,8 +44,8 @@ def test_init(location):
|
|||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def repository(location) -> FSRepository:
|
def repository(location) -> FSDataRepository:
|
||||||
return FSRepository("repo_id", "example", location)
|
return FSDataRepository("repo_id", "example", location)
|
||||||
|
|
||||||
|
|
||||||
def test_list_schemas(repository):
|
def test_list_schemas(repository):
|
182
tests/storage/test_fs_metadata_repository.py
Normal file
182
tests/storage/test_fs_metadata_repository.py
Normal file
@@ -0,0 +1,182 @@
|
|||||||
|
from datetime import datetime
|
||||||
|
import shutil
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from plesna.models.flux import FluxMetaData
|
||||||
|
from plesna.storage.metadata_repository.fs_metadata_repository import FSMetaDataRepository
|
||||||
|
from plesna.storage.metadata_repository.metadata_repository import ExecutionLog, ModificationLog
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def location(tmp_path):
|
||||||
|
catalogpath = tmp_path / "catalog"
|
||||||
|
catalogpath.mkdir()
|
||||||
|
|
||||||
|
return catalogpath
|
||||||
|
|
||||||
|
|
||||||
|
def test_init(location):
|
||||||
|
repo = FSMetaDataRepository(location)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def metadata_repository(location) -> FSMetaDataRepository:
|
||||||
|
return FSMetaDataRepository(location)
|
||||||
|
|
||||||
|
|
||||||
|
def test_add_flux(location, metadata_repository):
|
||||||
|
flux_id = "my_flux"
|
||||||
|
metadata_repository.add_flux(flux_id)
|
||||||
|
|
||||||
|
metadata_filepath = location / metadata_repository.OBJECTS["flux"]["filemodel"].format(
|
||||||
|
id=flux_id
|
||||||
|
)
|
||||||
|
assert metadata_filepath.exists()
|
||||||
|
|
||||||
|
with open(metadata_filepath, "r") as csvfile:
|
||||||
|
content = csvfile.read()
|
||||||
|
assert content == "datetime,output\n"
|
||||||
|
|
||||||
|
|
||||||
|
def test_add_and_list_fluxes(metadata_repository):
|
||||||
|
flux_ids = ["my_flux", "flux2", "blahblah"]
|
||||||
|
for f in flux_ids:
|
||||||
|
metadata_repository.add_flux(f)
|
||||||
|
assert metadata_repository.fluxes() == flux_ids
|
||||||
|
|
||||||
|
|
||||||
|
def test_register_flux_execution(location, metadata_repository):
|
||||||
|
flux_id = "my_flux"
|
||||||
|
metadata_repository.add_flux(flux_id)
|
||||||
|
|
||||||
|
metadata_repository.register_flux_execution(
|
||||||
|
flux_id,
|
||||||
|
datetime(2023, 3, 15, 14, 30),
|
||||||
|
output={
|
||||||
|
"truc": "machin",
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
metadata_filepath = location / metadata_repository.OBJECTS["flux"]["filemodel"].format(
|
||||||
|
id=flux_id
|
||||||
|
)
|
||||||
|
with open(metadata_filepath, "r") as csvfile:
|
||||||
|
content = csvfile.read()
|
||||||
|
assert (
|
||||||
|
content == 'datetime,output\n2023-03-15T14:30:00,"{""data"":{""truc"":""machin""}}"\n'
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def test_register_and_get_exec_logs(metadata_repository):
|
||||||
|
flux_id = "my_flux"
|
||||||
|
metadata_repository.add_flux(flux_id)
|
||||||
|
|
||||||
|
metadata_repository.register_flux_execution(
|
||||||
|
flux_id,
|
||||||
|
datetime(2023, 3, 15, 14, 30),
|
||||||
|
output={"truc": "machin"},
|
||||||
|
)
|
||||||
|
metadata_repository.register_flux_execution(
|
||||||
|
flux_id,
|
||||||
|
datetime(2024, 3, 15, 14, 30),
|
||||||
|
output={
|
||||||
|
"truc": "chose",
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
logs = metadata_repository.flux_logs(flux_id)
|
||||||
|
assert logs == [
|
||||||
|
ExecutionLog(
|
||||||
|
datetime=datetime(2023, 3, 15, 14, 30),
|
||||||
|
output=FluxMetaData(data={"truc": "machin"}),
|
||||||
|
),
|
||||||
|
ExecutionLog(
|
||||||
|
datetime=datetime(2024, 3, 15, 14, 30),
|
||||||
|
output=FluxMetaData(data={"truc": "chose"}),
|
||||||
|
),
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
def test_register_and_get_last_exec_log(metadata_repository):
|
||||||
|
flux_id = "my_flux"
|
||||||
|
metadata_repository.add_flux(flux_id)
|
||||||
|
|
||||||
|
metadata_repository.register_flux_execution(
|
||||||
|
flux_id,
|
||||||
|
datetime(2023, 3, 15, 14, 30),
|
||||||
|
output={"truc": "machin"},
|
||||||
|
)
|
||||||
|
metadata_repository.register_flux_execution(
|
||||||
|
flux_id,
|
||||||
|
datetime(2024, 3, 15, 14, 30),
|
||||||
|
output={
|
||||||
|
"truc": "chose",
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
logs = metadata_repository.flux(flux_id)
|
||||||
|
assert logs == ExecutionLog(
|
||||||
|
datetime=datetime(2024, 3, 15, 14, 30),
|
||||||
|
output=FluxMetaData(data={"truc": "chose"}),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def test_add_and_list_tables(metadata_repository):
|
||||||
|
table_ids = ["my_table", "table2", "blahblah"]
|
||||||
|
for f in table_ids:
|
||||||
|
metadata_repository.add_table(f)
|
||||||
|
assert metadata_repository.tables() == table_ids
|
||||||
|
|
||||||
|
|
||||||
|
def test_register_table_modification(location, metadata_repository):
|
||||||
|
table_id = "my_table"
|
||||||
|
flux_id = "my_flux"
|
||||||
|
metadata_repository.add_table(table_id)
|
||||||
|
|
||||||
|
metadata_repository.register_table_modification(
|
||||||
|
table_id, datetime(2023, 3, 15, 14, 30), flux_id
|
||||||
|
)
|
||||||
|
|
||||||
|
metadata_filepath = location / metadata_repository.OBJECTS["table"]["filemodel"].format(
|
||||||
|
id=table_id
|
||||||
|
)
|
||||||
|
with open(metadata_filepath, "r") as csvfile:
|
||||||
|
content = csvfile.read()
|
||||||
|
assert content == "datetime,flux_id\n2023-03-15T14:30:00,my_flux\n"
|
||||||
|
|
||||||
|
|
||||||
|
def test_register_and_get_mod_logs(metadata_repository):
|
||||||
|
table_id = "my_table"
|
||||||
|
flux_id = "my_flux"
|
||||||
|
metadata_repository.add_table(table_id)
|
||||||
|
|
||||||
|
metadata_repository.register_table_modification(
|
||||||
|
table_id, datetime(2023, 3, 15, 14, 30), flux_id
|
||||||
|
)
|
||||||
|
metadata_repository.register_table_modification(
|
||||||
|
table_id, datetime(2024, 3, 15, 14, 30), flux_id
|
||||||
|
)
|
||||||
|
|
||||||
|
logs = metadata_repository.table_logs(table_id)
|
||||||
|
assert logs == [
|
||||||
|
ModificationLog(datetime=datetime(2023, 3, 15, 14, 30), flux_id=flux_id),
|
||||||
|
ModificationLog(datetime=datetime(2024, 3, 15, 14, 30), flux_id=flux_id),
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
def test_register_and_get_last_log(metadata_repository):
|
||||||
|
table_id = "my_table"
|
||||||
|
flux_id = "my_flux"
|
||||||
|
metadata_repository.add_table(table_id)
|
||||||
|
|
||||||
|
metadata_repository.register_table_modification(
|
||||||
|
table_id, datetime(2023, 3, 15, 14, 30), flux_id
|
||||||
|
)
|
||||||
|
metadata_repository.register_table_modification(
|
||||||
|
table_id, datetime(2024, 3, 15, 14, 30), flux_id
|
||||||
|
)
|
||||||
|
|
||||||
|
logs = metadata_repository.table(table_id)
|
||||||
|
assert logs == ModificationLog(datetime=datetime(2024, 3, 15, 14, 30), flux_id=flux_id)
|
Reference in New Issue
Block a user