refact: move Transformation to flux model

This commit is contained in:
Bertrand Benjamin 2025-01-05 15:43:29 +01:00
parent 9a5c581f31
commit e4af62b136
4 changed files with 20 additions and 24 deletions

View File

@ -1,7 +1,20 @@
from collections.abc import Callable
from pydantic import BaseModel, computed_field
from plesna.models.storage import Table
from plesna.models.transformation import Transformation
class Transformation(BaseModel):
"""
The function have to have at least 2 arguments: sources and targets
Other arguments will came throught extra_kwrds
The function will have to return metadata as dict
"""
name: str
function: Callable
extra_kwrds: dict = {}
class Flux(BaseModel):

View File

@ -1,15 +0,0 @@
from collections.abc import Callable
from pydantic import BaseModel
class Transformation(BaseModel):
"""
The function have to have at least 2 arguments: sources and targets
Other arguments will came throught extra_kwrds
The function will have to return metadata as dict
"""
function: Callable
extra_kwrds: dict = {}

View File

@ -1,7 +1,6 @@
from plesna.compute.consume_flux import consume_flux
from plesna.models.flux import Flux
from plesna.models.flux import Flux, Transformation
from plesna.models.storage import Table
from plesna.models.transformation import Transformation
def test_consume_flux():
@ -24,7 +23,7 @@ def test_consume_flux():
flux = Flux(
sources=sources,
targets=targets,
transformation=Transformation(function=func, extra_kwrds={"extra": "super"}),
transformation=Transformation(name="func", function=func, extra_kwrds={"extra": "super"}),
)
meta = consume_flux(flux)

View File

@ -4,9 +4,8 @@ from pathlib import Path
import pytest
from plesna.dataplatform import DataPlateform
from plesna.graph.graph import Node
from plesna.models.flux import Flux
from plesna.models.transformation import Transformation
from plesna.models.graphs import Node
from plesna.models.flux import Flux, Transformation
from plesna.storage.repository.fs_repository import FSRepository
FIXTURE_DIR = Path(__file__).parent.parent / Path("raw_datas")
@ -55,7 +54,7 @@ def copy_flux(repository: FSRepository) -> Flux:
raw_brz_copy_username = Flux(
sources=raw_username,
targets=bronze_username,
transformation=Transformation(function=copy, extra_kwrds=extra_kwrds),
transformation=Transformation(name="copy", function=copy, extra_kwrds=extra_kwrds),
)
return raw_brz_copy_username
@ -76,7 +75,7 @@ def foo_flux(repository: FSRepository) -> Flux:
flux = Flux(
sources=src,
targets=targets,
transformation=Transformation(function=foo, extra_kwrds=extra_kwrds),
transformation=Transformation(name="foo", function=foo, extra_kwrds=extra_kwrds),
)
return flux