# Copyright 2022 ABSA Group Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
__all__ = ["Tracker"]
from datetime import datetime
from pyspark.sql import SparkSession
from rialto.runner.bookkeeper import BookKeeper
from rialto.runner.config_loader import MailConfig
from rialto.runner.mailer import HTMLMessage, Mailer
from rialto.runner.record import Record
[docs]
class Tracker:
"""Collect information about runs and sent them out via email"""
def __init__(self, mail_cfg: MailConfig, bookkeeping: str = None, spark: SparkSession = None):
self.records = []
self.last_error = None
self.pipeline_start = datetime.now()
self.exceptions = []
self.mail_cfg = mail_cfg
self.bookkeeper = None
if bookkeeping:
self.bookkeeper = BookKeeper(table=bookkeeping, spark=spark)
[docs]
def add(self, record: Record) -> None:
"""Add record for one run"""
self.records.append(record)
if self.bookkeeper:
self.bookkeeper.add(record)
[docs]
def report_by_mail(self):
"""Create and send html report"""
if len(self.records) or self.mail_cfg.sent_empty:
report = HTMLMessage.make_report(self.pipeline_start, self.records)
for receiver in self.mail_cfg.to:
message = Mailer.create_message(
subject=self.mail_cfg.subject, sender=self.mail_cfg.sender, receiver=receiver, body=report
)
Mailer.send_mail(self.mail_cfg.smtp, message)