From fe8f76245bc93ac0c14c26938d4179cc92c48f32 Mon Sep 17 00:00:00 2001 From: Bertrand Benjamin Date: Wed, 14 Aug 2024 07:22:01 +0200 Subject: [PATCH] Feat: start flux --- dashboard/libs/flux/flux.py | 70 +++++++++++++++++++++++++++++ tests/test_flux.py | 88 +++++++++++++++++++++++++++++++++++++ 2 files changed, 158 insertions(+) create mode 100644 dashboard/libs/flux/flux.py create mode 100644 tests/test_flux.py diff --git a/dashboard/libs/flux/flux.py b/dashboard/libs/flux/flux.py new file mode 100644 index 0000000..b69db3c --- /dev/null +++ b/dashboard/libs/flux/flux.py @@ -0,0 +1,70 @@ +from collections.abc import Callable +from datetime import datetime + +import pandas as pd +from pydantic import BaseModel + +from ..repository.repository import AbstractRepository + + +class Schema(BaseModel): + repository: str + schema: str + + +class Table(BaseModel): + repository: str + schema: str + table: str + + +class Flux(BaseModel): + sources: list[Table] + destinations: dict[str, Table] + transformation: Callable[[list[pd.DataFrame]], dict[str, pd.DataFrame]] + + +class State(BaseModel): + statuses: dict[str, str] + qty_out: int + failed_lines: list[str] + start: datetime + end: datetime + + +Repositories = dict[str, AbstractRepository] + + +def open_source(repositories: Repositories, source: Table) -> pd.DataFrame: + return repositories[source.repository].read(source.table, source.schema) + + +def write_source( + content: pd.DataFrame, repositories: Repositories, destination: Table +) -> str: + return repositories[destination.repository].write( + content, destination.table, destination.schema + ) + + +def consume_flux(flux: Flux, repositories: dict[str, AbstractRepository]) -> State: + start = datetime.now() + src_dfs = [open_source(repositories, source) for source in flux.sources] + + built_dfs = flux.transformation(src_dfs) + + statuses = { + dest: write_source(df, repositories, flux.destinations[dest]) + for dest, df in built_dfs.items() + } + + end = datetime.now() + qty_out = 0 + failed_lines = [] + return State( + statuses=statuses, + qty_out=qty_out, + failed_lines=failed_lines, + start=start, + end=end, + ) diff --git a/tests/test_flux.py b/tests/test_flux.py new file mode 100644 index 0000000..8e211ef --- /dev/null +++ b/tests/test_flux.py @@ -0,0 +1,88 @@ +import pandas as pd +import pytest + +from dashboard.libs.repository.repository import AbstractRepository + +FakeTable = pd.DataFrame +FakeSchema = dict[str, pd.DataFrame] +FakeSchemas = dict[str, FakeSchema] + + +class FakeRepository(AbstractRepository): + def __init__(self, schemas: FakeSchemas): + self._schemas = {} + for schema_name, tables in schemas.items(): + schema = {} + for table, df in tables.items(): + schema[table] = { + "df": df, + "metadata": { + "status": "new", + "qty_read": 0, + "qty_write": 0, + }, + } + self._schemas[schema_name] = schema + + def schemas(self): + """List schemas""" + return list(self._schemas.keys()) + + def tables(self, schema): + """List table's name in schema""" + return list(self._schemas[schema].keys()) + + def infos(self, table: str, schema: str) -> dict[str, str]: + """Get infos about the table""" + return self._schemas[schema][table]["metadata"] + + def read(self, table, schema) -> pd.DataFrame: + """Get content of the table""" + self._schemas[schema][table]["metadata"]["qty_read"] += 1 + return self._schemas[schema][table]["df"] + + def write(self, content, table, schema) -> dict[str, str]: + """Write content into the table""" + self._schemas[schema][table]["df"] = content + self._schemas[schema][table]["metadata"]["status"] = "modified" + self._schemas[schema][table]["metadata"]["qty_write"] += 1 + return self.infos(table, schema) + + def delete_table(self, table, schema): + """Delete the table""" + raise NotImplementedError + + +def test_fakerepository(): + fakerepository = FakeRepository( + { + "foo": { + "table1": pd.DataFrame({"A": []}), + "table2": pd.DataFrame({"B": []}), + }, + "bar": { + "table1": pd.DataFrame({"C": []}), + "table2": pd.DataFrame({"D": []}), + }, + } + ) + assert fakerepository.schemas() == ["foo", "bar"] + assert fakerepository.tables("foo") == ["table1", "table2"] + assert fakerepository.infos("table1", "foo") == { + "status": "new", + "qty_read": 0, + "qty_write": 0, + } + assert fakerepository.read("table1", "foo").equals(pd.DataFrame({"A": []})) + assert fakerepository.infos("table1", "foo") == { + "status": "new", + "qty_read": 1, + "qty_write": 0, + } + + df = pd.DataFrame({"A": [1, 2]}) + assert fakerepository.write(df, "table1", "foo") == { + "status": "modified", + "qty_read": 1, + "qty_write": 1, + }