rialto.runner.services package

Submodules

rialto.runner.services.config_loader module

class rialto.runner.services.config_loader.ConfigLoader[source]

Bases: object

Loader for pipelines config

static load_yaml(path: str, overrides: Dict) PipelinesConfig[source]

Load yaml config and apply overrides

rialto.runner.services.config_overrides module

rialto.runner.services.config_overrides.override_config(config: Dict, overrides: Dict) Dict[source]

Override config with user input

Parameters:
  • config – config dictionary

  • overrides – dictionary of overrides

Returns:

Overridden config

rialto.runner.services.data_checker module

class rialto.runner.services.data_checker.DataChecker(reader: DataReader)[source]

Bases: object

Checks if data for given date or date range is present in storage

check_date(target: Table, partition_date: date) bool[source]

Check if data for given date is present in target

Parameters:
  • target – target Table to check

  • partition_date – Date to check

Returns:

True if data for given date is present, False otherwise

check_range(target: Table, start_date: date, end_date: date) bool[source]

Check if data for given date range is present in target

Parameters:
  • target – target Table to check

  • start_date – Starting date of the range to check

  • end_date – Ending date of the range to check

Returns:

True if data for given date range is present, False otherwise

check_written(target: Table, partition_date: date, df: DataFrame) int[source]

Check how many records were written

Parameters:
  • target – target Table to check

  • partition_date – Date to check

  • df – DataFrame that was written, used to determine filters if not provided in config

Returns:

Number of records for given date

rialto.runner.services.date_manager module

class rialto.runner.services.date_manager.DateManager(config: RunnerConfig, run_date: str = None)[source]

Bases: object

Date generation and shifts based on configuration

static all_dates(date_from: date, date_until: date) List[date][source]

Get list of all dates between, inclusive

Parameters:
  • date_from – starting date

  • date_until – ending date

Returns:

List[date]

static date_subtract(input_date: date, units: str, value: int) date[source]

Subtract given number of units from input date

Parameters:
  • input_date – base date

  • units – units: years, months, weeks, days

  • value – number of units to subtract

Returns:

Starting date

get_date_from() date[source]

Get starting date of the execution window

get_date_until() date[source]

Get ending date of the execution window

get_execution_and_partition_dates(schedule: ScheduleConfig) List[tuple[date, date]][source]

Get list of execution and partition dates for given configuration

Returns:

List of tuples with execution and partition dates

static str_to_date(str_date: str) date[source]

Convert YYYY-MM-DD string to date

Parameters:

str_date – string date

Returns:

date

rialto.runner.services.executor module

class rialto.runner.services.executor.PipelineExecutor(spark: SparkSession, reader: DataReader, checker: DataChecker)[source]

Bases: object

Executes a single pipeline task.

execute(pipeline: PipelineTask) DataFrame[source]

Execute the pipeline task.

Parameters:

pipeline – Pipeline object to execute.

Returns:

DataFrame resulting from pipeline execution.

rialto.runner.services.result_mapper module

class rialto.runner.services.result_mapper.TaskResultMapper[source]

Bases: object

Maps task execution outcomes to Record objects with consistent schema

static already_complete(task: PipelineTask, run_start: datetime) Record[source]

Map skipped (already complete) task to Record

static dependencies_incomplete(task: PipelineTask, run_start: datetime, failed_deps: list) Record[source]

Map dependency failure to Record

static exception(task: PipelineTask, run_start: datetime, exception_message: str, traceback_str: str) Record[source]

Map exception during execution to Record

static interrupted(task: PipelineTask, run_start: datetime) Record[source]

Map keyboard interrupt to Record

static success(task: PipelineTask, run_start: datetime, records_count: int) Record[source]

Map successful task execution to Record

rialto.runner.services.table module

class rialto.runner.services.table.Table(catalog: str = None, schema: str = None, table: str = None, schema_path: str = None, table_path: str = None, class_name: str = None, partition: str = None, secondary_partitions: List[str] = None, filters: Dict = None)[source]

Bases: object

Handler for databricks catalog paths

classmethod from_dependency_config(config: DependencyConfig) Table[source]

Create table object from pipeline config dependency section

Parameters:

config – Dependency configuration

Returns:

Table object

classmethod from_target_config(config: PipelineConfig) Table[source]

Create table object from pipeline config target section

Parameters:

config – Pipeline configuration

Returns:

Table object

get_all_partition_columns() List[str][source]

Get list of all partitions

get_schema_path() str[source]

Get path of table’s schema

get_table_path() str[source]

Get full table path

rialto.runner.services.task_registry module

class rialto.runner.services.task_registry.PipelineDependency(table: Table, date_from: date, date_until: date, complete: bool = False)[source]

Bases: object

Class representing a pipeline dependency, with associated table and date range for checking completion

complete: bool = False
date_from: date
date_until: date
table: Table
class rialto.runner.services.task_registry.PipelineTask(name: str, execution_date: ~datetime.date, partition_date: ~datetime.date, config: ~rialto.runner.services.config_loader.PipelineConfig, target: ~rialto.runner.services.table.Table, dependencies: ~typing.List[~rialto.runner.services.task_registry.PipelineDependency] = <factory>, completion: bool = False, dependencies_complete: bool = False, precheck_failed: bool = False, error: str | None = None, error_trace: str | None = None, result: str = '')[source]

Bases: object

Class representing a pipeline to be executed.

completion: bool = False
config: PipelineConfig
dependencies: List[PipelineDependency]
dependencies_complete: bool = False
error: str | None = None
error_trace: str | None = None
execution_date: date
name: str
partition_date: date
precheck_failed: bool = False
result: str = ''
target: Table
class rialto.runner.services.task_registry.TaskRegistry(spark: SparkSession, date_manager: DateManager)[source]

Bases: object

Registry for pipeline tasks to be executed

add_task(name: str, execution_date: date, partition_date: date, config: PipelineConfig) None[source]

Add task to registry

Parameters:
  • name – Name of the pipeline

  • execution_date – Date when the pipeline is scheduled to run

  • partition_date – Date for which the pipeline is processing data

  • config – PipelineConfig object with pipeline configuration

Returns:

None, adds a Pipeline object to self.tasks

log_status() None[source]

Log status of all tasks in registry, showing completion and dependency status

rialto.runner.services.task_status_checker module

class rialto.runner.services.task_status_checker.TaskStatusChecker(checker: DataChecker)[source]

Bases: object

Handles completion and dependency checks for pipeline tasks.

check_completion(pipeline: PipelineTask) None[source]

Check if pipeline is complete by checking if target data exists for partition date

Parameters:

pipeline – Pipeline object for which to check completion

Returns:

None, updates self.completion attribute

check_pipeline_dependencies(pipeline: PipelineTask) None[source]

Check if dependencies are complete by checking if data exists for each dependency in date range

Parameters:

pipeline – Pipeline object for which to check dependencies

Returns:

None, updates self.dependencies_complete attribute

rialto.runner.services.writer module

class rialto.runner.services.writer.DatabricksWriter(spark: SparkSession, merge_schema=False)[source]

Bases: Writer

Supporting class for runner, Databricks write operations

write(df: DataFrame, partition_date: date, table: Table) None[source]

Write dataframe to storage

Parameters:
  • df – dataframe to write

  • partition_date – date to partition

  • table – path to write to

Returns:

None

class rialto.runner.services.writer.Writer[source]

Bases: ABC

Supporting class for runner

abstractmethod write(df: DataFrame, info_date: date, table: Table) None[source]

Write dataframe to storage

Parameters:
  • df – dataframe to write

  • info_date – date to partition

  • table – path to write to

Returns:

None

Module contents