plesna/tests/test_flux.py

132 lines
4.0 KiB
Python
Raw Permalink Normal View History

2024-08-14 05:22:01 +00:00
import pandas as pd
import pytest
2024-08-14 05:41:36 +00:00
from dashboard.libs.flux.flux import Flux, consume_flux
2024-08-14 05:22:01 +00:00
from dashboard.libs.repository.repository import AbstractRepository
FakeTable = pd.DataFrame
FakeSchema = dict[str, pd.DataFrame]
FakeSchemas = dict[str, FakeSchema]
class FakeRepository(AbstractRepository):
def __init__(self, schemas: FakeSchemas):
self._schemas = {}
for schema_name, tables in schemas.items():
schema = {}
for table, df in tables.items():
schema[table] = {
"df": df,
"metadata": {
"status": "new",
"qty_read": 0,
"qty_write": 0,
},
}
self._schemas[schema_name] = schema
def schemas(self):
"""List schemas"""
return list(self._schemas.keys())
def tables(self, schema):
"""List table's name in schema"""
return list(self._schemas[schema].keys())
def infos(self, table: str, schema: str) -> dict[str, str]:
"""Get infos about the table"""
return self._schemas[schema][table]["metadata"]
def read(self, table, schema) -> pd.DataFrame:
"""Get content of the table"""
self._schemas[schema][table]["metadata"]["qty_read"] += 1
return self._schemas[schema][table]["df"]
def write(self, content, table, schema) -> dict[str, str]:
"""Write content into the table"""
2024-08-14 05:41:36 +00:00
try:
self._schemas[schema][table]["df"] = content
except KeyError:
self._schemas[schema][table] = {
"df": content,
"metadata": {
"status": "new",
"qty_read": 0,
"qty_write": 0,
},
}
2024-08-14 05:22:01 +00:00
self._schemas[schema][table]["metadata"]["status"] = "modified"
self._schemas[schema][table]["metadata"]["qty_write"] += 1
return self.infos(table, schema)
def delete_table(self, table, schema):
"""Delete the table"""
raise NotImplementedError
def test_fakerepository():
fakerepository = FakeRepository(
{
"foo": {
"table1": pd.DataFrame({"A": []}),
"table2": pd.DataFrame({"B": []}),
},
"bar": {
"table1": pd.DataFrame({"C": []}),
"table2": pd.DataFrame({"D": []}),
},
}
)
assert fakerepository.schemas() == ["foo", "bar"]
assert fakerepository.tables("foo") == ["table1", "table2"]
assert fakerepository.infos("table1", "foo") == {
"status": "new",
"qty_read": 0,
"qty_write": 0,
}
assert fakerepository.read("table1", "foo").equals(pd.DataFrame({"A": []}))
assert fakerepository.infos("table1", "foo") == {
"status": "new",
"qty_read": 1,
"qty_write": 0,
}
df = pd.DataFrame({"A": [1, 2]})
assert fakerepository.write(df, "table1", "foo") == {
"status": "modified",
"qty_read": 1,
"qty_write": 1,
}
2024-08-14 05:41:36 +00:00
def test_consume_flux():
source_repository = FakeRepository(
{
"source": {
"table1": pd.DataFrame({"A": [1, 2, 3]}),
},
}
)
dest_repository = FakeRepository(
{
"destination": {},
}
)
repositories = {
"source": source_repository,
"dest": dest_repository,
}
transformation = lambda dfs: {"dest": dfs[0] * 2}
flux = Flux(
sources=[{"repository": "source", "schema": "source", "table": "table1"}],
destinations={
"dest": {"repository": "dest", "schema": "destination", "table": "table1"}
},
transformation=transformation,
)
state = consume_flux(flux, repositories)
assert state.statuses["dest"] == {'status': 'modified', 'qty_read': 0, 'qty_write': 1}
assert dest_repository.read("table1", "destination").equals(pd.DataFrame({"A": [2, 4, 6]}))