71 lines
1.6 KiB
Python
71 lines
1.6 KiB
Python
from collections.abc import Callable
|
|
from datetime import datetime
|
|
|
|
import pandas as pd
|
|
from pydantic import BaseModel
|
|
|
|
from ..repository.repository import AbstractRepository
|
|
|
|
|
|
class Schema(BaseModel):
|
|
repository: str
|
|
schema: str
|
|
|
|
|
|
class Table(BaseModel):
|
|
repository: str
|
|
schema: str
|
|
table: str
|
|
|
|
|
|
class Flux(BaseModel):
|
|
sources: list[Table]
|
|
destinations: dict[str, Table]
|
|
transformation: Callable[[list[pd.DataFrame]], dict[str, pd.DataFrame]]
|
|
|
|
|
|
class State(BaseModel):
|
|
statuses: dict[str, dict]
|
|
qty_out: int
|
|
failed_lines: list[str]
|
|
start: datetime
|
|
end: datetime
|
|
|
|
|
|
Repositories = dict[str, AbstractRepository]
|
|
|
|
|
|
def open_source(repositories: Repositories, source: Table) -> pd.DataFrame:
|
|
return repositories[source.repository].read(source.table, source.schema)
|
|
|
|
|
|
def write_source(
|
|
content: pd.DataFrame, repositories: Repositories, destination: Table
|
|
) -> str:
|
|
return repositories[destination.repository].write(
|
|
content, destination.table, destination.schema
|
|
)
|
|
|
|
|
|
def consume_flux(flux: Flux, repositories: dict[str, AbstractRepository]) -> State:
|
|
start = datetime.now()
|
|
src_dfs = [open_source(repositories, source) for source in flux.sources]
|
|
|
|
built_dfs = flux.transformation(src_dfs)
|
|
|
|
statuses = {
|
|
dest: write_source(df, repositories, flux.destinations[dest])
|
|
for dest, df in built_dfs.items()
|
|
}
|
|
|
|
end = datetime.now()
|
|
qty_out = 0
|
|
failed_lines = []
|
|
return State(
|
|
statuses=statuses,
|
|
qty_out=qty_out,
|
|
failed_lines=failed_lines,
|
|
start=start,
|
|
end=end,
|
|
)
|