rialto.runner.services package
Submodules
rialto.runner.services.config_loader module
rialto.runner.services.config_overrides module
rialto.runner.services.data_checker module
- class rialto.runner.services.data_checker.DataChecker(reader: DataReader)[source]
Bases:
objectChecks if data for given date or date range is present in storage
- check_date(target: Table, partition_date: date) bool[source]
Check if data for given date is present in target
- Parameters:
target – target Table to check
partition_date – Date to check
- Returns:
True if data for given date is present, False otherwise
- check_range(target: Table, start_date: date, end_date: date) bool[source]
Check if data for given date range is present in target
- Parameters:
target – target Table to check
start_date – Starting date of the range to check
end_date – Ending date of the range to check
- Returns:
True if data for given date range is present, False otherwise
- check_written(target: Table, partition_date: date, df: DataFrame) int[source]
Check how many records were written
- Parameters:
target – target Table to check
partition_date – Date to check
df – DataFrame that was written, used to determine filters if not provided in config
- Returns:
Number of records for given date
rialto.runner.services.date_manager module
- class rialto.runner.services.date_manager.DateManager(config: RunnerConfig, run_date: str = None)[source]
Bases:
objectDate generation and shifts based on configuration
- static all_dates(date_from: date, date_until: date) List[date][source]
Get list of all dates between, inclusive
- Parameters:
date_from – starting date
date_until – ending date
- Returns:
List[date]
- static date_subtract(input_date: date, units: str, value: int) date[source]
Subtract given number of units from input date
- Parameters:
input_date – base date
units – units: years, months, weeks, days
value – number of units to subtract
- Returns:
Starting date
rialto.runner.services.executor module
- class rialto.runner.services.executor.PipelineExecutor(spark: SparkSession, reader: DataReader, checker: DataChecker)[source]
Bases:
objectExecutes a single pipeline task.
- execute(pipeline: PipelineTask) DataFrame[source]
Execute the pipeline task.
- Parameters:
pipeline – Pipeline object to execute.
- Returns:
DataFrame resulting from pipeline execution.
rialto.runner.services.result_mapper module
- class rialto.runner.services.result_mapper.TaskResultMapper[source]
Bases:
objectMaps task execution outcomes to Record objects with consistent schema
- static already_complete(task: PipelineTask, run_start: datetime) Record[source]
Map skipped (already complete) task to Record
- static dependencies_incomplete(task: PipelineTask, run_start: datetime, failed_deps: list) Record[source]
Map dependency failure to Record
- static exception(task: PipelineTask, run_start: datetime, exception_message: str, traceback_str: str) Record[source]
Map exception during execution to Record
- static interrupted(task: PipelineTask, run_start: datetime) Record[source]
Map keyboard interrupt to Record
- static success(task: PipelineTask, run_start: datetime, records_count: int) Record[source]
Map successful task execution to Record
rialto.runner.services.table module
- class rialto.runner.services.table.Table(catalog: str = None, schema: str = None, table: str = None, schema_path: str = None, table_path: str = None, class_name: str = None, partition: str = None, secondary_partitions: List[str] = None, filters: Dict = None)[source]
Bases:
objectHandler for databricks catalog paths
- classmethod from_dependency_config(config: DependencyConfig) Table[source]
Create table object from pipeline config dependency section
- Parameters:
config – Dependency configuration
- Returns:
Table object
rialto.runner.services.task_registry module
- class rialto.runner.services.task_registry.PipelineDependency(table: Table, date_from: date, date_until: date, complete: bool = False)[source]
Bases:
objectClass representing a pipeline dependency, with associated table and date range for checking completion
- complete: bool = False
- date_from: date
- date_until: date
- class rialto.runner.services.task_registry.PipelineTask(name: str, execution_date: ~datetime.date, partition_date: ~datetime.date, config: ~rialto.runner.services.config_loader.PipelineConfig, target: ~rialto.runner.services.table.Table, dependencies: ~typing.List[~rialto.runner.services.task_registry.PipelineDependency] = <factory>, completion: bool = False, dependencies_complete: bool = False, precheck_failed: bool = False, error: str | None = None, error_trace: str | None = None, result: str = '')[source]
Bases:
objectClass representing a pipeline to be executed.
- completion: bool = False
- config: PipelineConfig
- dependencies: List[PipelineDependency]
- dependencies_complete: bool = False
- error: str | None = None
- error_trace: str | None = None
- execution_date: date
- name: str
- partition_date: date
- precheck_failed: bool = False
- result: str = ''
- class rialto.runner.services.task_registry.TaskRegistry(spark: SparkSession, date_manager: DateManager)[source]
Bases:
objectRegistry for pipeline tasks to be executed
- add_task(name: str, execution_date: date, partition_date: date, config: PipelineConfig) None[source]
Add task to registry
- Parameters:
name – Name of the pipeline
execution_date – Date when the pipeline is scheduled to run
partition_date – Date for which the pipeline is processing data
config – PipelineConfig object with pipeline configuration
- Returns:
None, adds a Pipeline object to self.tasks
rialto.runner.services.task_status_checker module
- class rialto.runner.services.task_status_checker.TaskStatusChecker(checker: DataChecker)[source]
Bases:
objectHandles completion and dependency checks for pipeline tasks.
- check_completion(pipeline: PipelineTask) None[source]
Check if pipeline is complete by checking if target data exists for partition date
- Parameters:
pipeline – Pipeline object for which to check completion
- Returns:
None, updates self.completion attribute
- check_pipeline_dependencies(pipeline: PipelineTask) None[source]
Check if dependencies are complete by checking if data exists for each dependency in date range
- Parameters:
pipeline – Pipeline object for which to check dependencies
- Returns:
None, updates self.dependencies_complete attribute
rialto.runner.services.writer module
- class rialto.runner.services.writer.DatabricksWriter(spark: SparkSession, merge_schema=False)[source]
Bases:
WriterSupporting class for runner, Databricks write operations