diff --git a/dashboard/libs/flux/flux.py b/dashboard/libs/flux/flux.py index b69db3c..463ec7f 100644 --- a/dashboard/libs/flux/flux.py +++ b/dashboard/libs/flux/flux.py @@ -25,7 +25,7 @@ class Flux(BaseModel): class State(BaseModel): - statuses: dict[str, str] + statuses: dict[str, dict] qty_out: int failed_lines: list[str] start: datetime diff --git a/tests/test_flux.py b/tests/test_flux.py index 8e211ef..afd6bd4 100644 --- a/tests/test_flux.py +++ b/tests/test_flux.py @@ -1,6 +1,7 @@ import pandas as pd import pytest +from dashboard.libs.flux.flux import Flux, consume_flux from dashboard.libs.repository.repository import AbstractRepository FakeTable = pd.DataFrame @@ -43,7 +44,17 @@ class FakeRepository(AbstractRepository): def write(self, content, table, schema) -> dict[str, str]: """Write content into the table""" - self._schemas[schema][table]["df"] = content + try: + self._schemas[schema][table]["df"] = content + except KeyError: + self._schemas[schema][table] = { + "df": content, + "metadata": { + "status": "new", + "qty_read": 0, + "qty_write": 0, + }, + } self._schemas[schema][table]["metadata"]["status"] = "modified" self._schemas[schema][table]["metadata"]["qty_write"] += 1 return self.infos(table, schema) @@ -86,3 +97,35 @@ def test_fakerepository(): "qty_read": 1, "qty_write": 1, } + + +def test_consume_flux(): + source_repository = FakeRepository( + { + "source": { + "table1": pd.DataFrame({"A": [1, 2, 3]}), + }, + } + ) + dest_repository = FakeRepository( + { + "destination": {}, + } + ) + repositories = { + "source": source_repository, + "dest": dest_repository, + } + transformation = lambda dfs: {"dest": dfs[0] * 2} + + flux = Flux( + sources=[{"repository": "source", "schema": "source", "table": "table1"}], + destinations={ + "dest": {"repository": "dest", "schema": "destination", "table": "table1"} + }, + transformation=transformation, + ) + + state = consume_flux(flux, repositories) + assert state.statuses["dest"] == {'status': 'modified', 'qty_read': 0, 'qty_write': 1} + assert dest_repository.read("table1", "destination").equals(pd.DataFrame({"A": [2, 4, 6]}))