import pandas as pd import pytest from dashboard.libs.flux.flux import Flux, consume_flux from dashboard.libs.repository.repository import AbstractRepository FakeTable = pd.DataFrame FakeSchema = dict[str, pd.DataFrame] FakeSchemas = dict[str, FakeSchema] class FakeRepository(AbstractRepository): def __init__(self, schemas: FakeSchemas): self._schemas = {} for schema_name, tables in schemas.items(): schema = {} for table, df in tables.items(): schema[table] = { "df": df, "metadata": { "status": "new", "qty_read": 0, "qty_write": 0, }, } self._schemas[schema_name] = schema def schemas(self): """List schemas""" return list(self._schemas.keys()) def tables(self, schema): """List table's name in schema""" return list(self._schemas[schema].keys()) def infos(self, table: str, schema: str) -> dict[str, str]: """Get infos about the table""" return self._schemas[schema][table]["metadata"] def read(self, table, schema) -> pd.DataFrame: """Get content of the table""" self._schemas[schema][table]["metadata"]["qty_read"] += 1 return self._schemas[schema][table]["df"] def write(self, content, table, schema) -> dict[str, str]: """Write content into the table""" try: self._schemas[schema][table]["df"] = content except KeyError: self._schemas[schema][table] = { "df": content, "metadata": { "status": "new", "qty_read": 0, "qty_write": 0, }, } self._schemas[schema][table]["metadata"]["status"] = "modified" self._schemas[schema][table]["metadata"]["qty_write"] += 1 return self.infos(table, schema) def delete_table(self, table, schema): """Delete the table""" raise NotImplementedError def test_fakerepository(): fakerepository = FakeRepository( { "foo": { "table1": pd.DataFrame({"A": []}), "table2": pd.DataFrame({"B": []}), }, "bar": { "table1": pd.DataFrame({"C": []}), "table2": pd.DataFrame({"D": []}), }, } ) assert fakerepository.schemas() == ["foo", "bar"] assert fakerepository.tables("foo") == ["table1", "table2"] assert fakerepository.infos("table1", "foo") == { "status": "new", "qty_read": 0, "qty_write": 0, } assert fakerepository.read("table1", "foo").equals(pd.DataFrame({"A": []})) assert fakerepository.infos("table1", "foo") == { "status": "new", "qty_read": 1, "qty_write": 0, } df = pd.DataFrame({"A": [1, 2]}) assert fakerepository.write(df, "table1", "foo") == { "status": "modified", "qty_read": 1, "qty_write": 1, } def test_consume_flux(): source_repository = FakeRepository( { "source": { "table1": pd.DataFrame({"A": [1, 2, 3]}), }, } ) dest_repository = FakeRepository( { "destination": {}, } ) repositories = { "source": source_repository, "dest": dest_repository, } transformation = lambda dfs: {"dest": dfs[0] * 2} flux = Flux( sources=[{"repository": "source", "schema": "source", "table": "table1"}], destinations={ "dest": {"repository": "dest", "schema": "destination", "table": "table1"} }, transformation=transformation, ) state = consume_flux(flux, repositories) assert state.statuses["dest"] == {'status': 'modified', 'qty_read': 0, 'qty_write': 1} assert dest_repository.read("table1", "destination").equals(pd.DataFrame({"A": [2, 4, 6]}))