rialto.runner package

Subpackages

Submodules

rialto.runner.config_loader module

rialto.runner.config_loader.get_pipelines_config(path: str, overrides: Dict) PipelinesConfig[source]

Load and parse yaml config

rialto.runner.config_overrides module

rialto.runner.config_overrides.override_config(config: Dict, overrides: Dict) Dict[source]

Override config with user input

Parameters:
  • config – config dictionary

  • overrides – dictionary of overrides

Returns:

Overridden config

rialto.runner.date_manager module

class rialto.runner.date_manager.DateManager[source]

Bases: object

Date generation and shifts based on configuration

static all_dates(date_from: date, date_to: date) List[date][source]

Get list of all dates between, inclusive

Parameters:
  • date_from – starting date

  • date_to – ending date

Returns:

List[date]

static date_subtract(run_date: date, units: str, value: int) date[source]

Generate starting date from given date and config

Parameters:
  • run_date – base date

  • units – units: years, months, weeks, days

  • value – number of units to subtract

Returns:

Starting date

static run_dates(date_from: date, date_to: date, schedule: ScheduleConfig) List[date][source]

Select dates inside given interval depending on frequency and selected day

Parameters:
  • date_from – interval start

  • date_to – interval end

  • schedule – schedule config

Returns:

list of dates

static str_to_date(str_date: str) date[source]

Convert YYYY-MM-DD string to date

Parameters:

str_date – string date

Returns:

date

static to_info_date(date: date, schedule: ScheduleConfig) date[source]

Shift given date according to config

Parameters:
  • date – input date

  • schedule – schedule config

Returns:

date

rialto.runner.runner module

class rialto.runner.runner.Runner(spark: SparkSession, config_path: str, run_date: str = None, rerun: bool = False, op: str = None, skip_dependencies: bool = False, overrides: Dict = None, merge_schema: bool = False)[source]

Bases: object

A scheduler and dependency checker for feature runs

check_dates_have_partition(table: Table, dates: List[date]) List[bool][source]

For given list of dates, check if there is a matching partition for each

Parameters:
  • table – Table object

  • dates – list of dates to check

Returns:

list of bool

check_dependencies(pipeline: PipelineConfig, run_date: date) bool[source]

Check for all dependencies in config if they have available partitions

Parameters:
  • pipeline – configuration

  • run_date – run date

Returns:

bool

rialto.runner.table module

class rialto.runner.table.Table(catalog: str = None, schema: str = None, table: str = None, schema_path: str = None, table_path: str = None, class_name: str = None, partition: str = None)[source]

Bases: object

Handler for databricks catalog paths

get_schema_path()[source]

Get path of table’s schema

get_table_path()[source]

Get full table path

rialto.runner.transformation module

class rialto.runner.transformation.Transformation[source]

Bases: object

Interface for feature implementation

abstractmethod run(reader: DataReader, run_date: date, spark: SparkSession = None, config: PipelineConfig = None, metadata_manager: MetadataManager = None, feature_loader: PysparkFeatureLoader = None) DataFrame[source]

Run the transformation

Parameters:
  • reader – data store api object

  • run_date – date

  • spark – spark session

  • config – pipeline config

  • metadata_manager – metadata manager

  • feature_loader – feature loader

Returns:

dataframe

rialto.runner.utils module

rialto.runner.utils.find_dependency(config: PipelineConfig, name: str)[source]

Get dependency from config

Parameters:
  • config – Pipeline configuration

  • name – Dependency name

Returns:

Dependency object

rialto.runner.utils.get_partitions(reader: DataReader, table: Table) List[date][source]

Get partition values

Parameters:

table – Table object

Returns:

List of partition values

rialto.runner.utils.init_tools(spark: SparkSession, pipeline: PipelineConfig) Tuple[MetadataManager, PysparkFeatureLoader][source]

Initialize metadata manager and feature loader

Parameters:
  • spark – Spark session

  • pipeline – Pipeline configuration

Returns:

MetadataManager and PysparkFeatureLoader

rialto.runner.utils.load_module(cfg: ModuleConfig) Transformation[source]

Load feature group

Parameters:

cfg – Feature configuration

Returns:

Transformation object

rialto.runner.utils.table_exists(spark: SparkSession, table: str) bool[source]

Check table exists in spark catalog

Parameters:

table – full table path

Returns:

bool

Module contents