rialto.runner package

Submodules

rialto.runner.config_loader module

rialto.runner.config_loader.get_pipelines_config(path: str, overrides: Dict) PipelinesConfig[source]

Load and parse yaml config

rialto.runner.date_manager module

class rialto.runner.date_manager.DateManager[source]

Bases: object

Date generation and shifts based on configuration

static all_dates(date_from: date, date_to: date) List[date][source]

Get list of all dates between, inclusive

Parameters:
  • date_from – starting date

  • date_to – ending date

Returns:

List[date]

static date_subtract(run_date: date, units: str, value: int) date[source]

Generate starting date from given date and config

Parameters:
  • run_date – base date

  • units – units: years, months, weeks, days

  • value – number of units to subtract

Returns:

Starting date

static run_dates(date_from: date, date_to: date, schedule: ScheduleConfig) List[date][source]

Select dates inside given interval depending on frequency and selected day

Parameters:
  • date_from – interval start

  • date_to – interval end

  • schedule – schedule config

Returns:

list of dates

static str_to_date(str_date: str) date[source]

Convert YYYY-MM-DD string to date

Parameters:

str_date – string date

Returns:

date

static to_info_date(date: date, schedule: ScheduleConfig) date[source]

Shift given date according to config

Parameters:
  • date – input date

  • schedule – schedule config

Returns:

date

rialto.runner.runner module

class rialto.runner.runner.Runner(spark: SparkSession, config_path: str, run_date: str | None = None, rerun: bool = False, op: str | None = None, skip_dependencies: bool = False, overrides: Dict | None = None)[source]

Bases: object

A scheduler and dependency checker for feature runs

check_dates_have_partition(table: Table, dates: List[date]) List[bool][source]

For given list of dates, check if there is a matching partition for each

Parameters:
  • table – Table object

  • dates – list of dates to check

Returns:

list of bool

check_dependencies(pipeline: PipelineConfig, run_date: date) bool[source]

Check for all dependencies in config if they have available partitions

Parameters:
  • pipeline – configuration

  • run_date – run date

Returns:

bool

rialto.runner.table module

class rialto.runner.table.Table(catalog: str | None = None, schema: str | None = None, table: str | None = None, schema_path: str | None = None, table_path: str | None = None, class_name: str | None = None, partition: str | None = None)[source]

Bases: object

Handler for databricks catalog paths

get_schema_path()[source]

Get path of table’s schema

get_table_path()[source]

Get full table path

rialto.runner.tracker module

class rialto.runner.tracker.Record(job: str, target: str, date: ~.datetime.date, time: ~datetime.timedelta, records: int, status: str, reason: str, exception: str | None = None)[source]

Bases: object

Dataclass with information about one run of one pipeline.

date: date
exception: str | None = None
job: str
reason: str
records: int
status: str
target: str
time: timedelta
class rialto.runner.tracker.Tracker[source]

Bases: object

Collect information about runs and sent them out via email

add(record: Record) None[source]

Add record for one run

report(mail_cfg: MailConfig)[source]

Create and send html report

rialto.runner.transformation module

class rialto.runner.transformation.Transformation[source]

Bases: object

Interface for feature implementation

abstract run(reader: DataReader, run_date: date, spark: SparkSession | None = None, config: PipelineConfig | None = None, metadata_manager: MetadataManager | None = None, feature_loader: PysparkFeatureLoader | None = None) DataFrame[source]

Run the transformation

Parameters:
  • reader – data store api object

  • run_date – date

  • spark – spark session

  • config – pipeline config

  • metadata_manager – metadata manager

  • feature_loader – feature loader

Returns:

dataframe

Module contents