Skip to content

Reference

py2k.writer.KafkaWriter

__init__(self, topic, schema_registry_config, producer_config) special

A class for easy writing of data to kafka

Parameters:

Name Type Description Default
topic str

topic to post to

required
schema_registry_config Dict[str, Any]

a dictionary compatible with the confluent_kafka.schema_registry.SchemaRegistryClient

required
producer_config Dict[str, Any]

a dictionary compatible with the confluent_kafka.SerializingProducer

required

write(self, records, verbose=False)

Writes data to Kafka.

Parameters:

Name Type Description Default
records Iterable[KafkaRecord]

Serialized KafkaModel objects

required
verbose bool

Whether or not you want to show the loading bar

False

Examples:

>>> from py2k.writer import KafkaWriter
>>> writer = KafkaWriter(topic=topic,
         schema_registry_config=schema_registry_config,
         producer_config=producer_config)
>>> writer.write(records)
100%|██████████| 4/4 [00:00<00:00,  7.69it/s]

py2k.record.PandasToRecordsTransformer

class model for automatic serialization of Pandas DataFrame to KafkaRecord

__init__(self, df, record_name, fields_defaults=None, types_defaults=None, optional_fields=None, key_fields=None, include_key=None) special

Parameters:

Name Type Description Default
df pd.DataFrame

Pandas dataframe to serialize

required
record_name str

destination Pydantic model

required
fields_defaults Dict[str, object]

default values for fields in the dataframe. The keys are the fields names. Defaults to None.

None
types_defaults Dict[object, object]

default values for the types in the dataframe. The keys are the types, e.g. int. Defaults to None.

None
optional_fields List[str]

list of fields which should be marked as optional. Defaults to None.

None
key_fields Set[str]

set of fields which are meant to be key of the schema

None
include_key bool

bool: Indicator whether key fields should be included in value

None
Source code in py2k/record.py
def __init__(self, df: pd.DataFrame, record_name: str,
             fields_defaults: Dict[str, object] = None,
             types_defaults: Dict[object, object] = None,
             optional_fields: List[str] = None,
             key_fields: Set[str] = None,
             include_key: bool = None):
    """
    Args:
        df (pd.DataFrame): Pandas dataframe to serialize
        record_name (str): destination Pydantic model
        fields_defaults (Dict[str, object], optional): default values for
             fields in the dataframe. The keys are the fields names.
             Defaults to None.
        types_defaults (Dict[object, object], optional): default values
             for the types in the dataframe. The keys are the types,
             e.g. int. Defaults to None.
        optional_fields (List[str], optional): list of fields which should
             be marked as optional. Defaults to None.
        key_fields (Set[str], optional): set of fields which are meant
            to be key of the schema
        include_key: bool: Indicator whether key fields should be
            included in value
    """

    self._df = df
    _class = self._class(key_fields, include_key)

    model_creator = PandasModelCreator(df, record_name, fields_defaults,
                                       types_defaults, optional_fields,
                                       _class)

    self._model = model_creator.create()

from_pandas(self, df=None)

Creates list of KafkaModel objects from a pandas DataFrame

Parameters:

Name Type Description Default
df pd.DataFrame

Pandas dataframe. Defaults to None.

None

Returns:

Type Description
List[KafkaModel]

serialized list of KafkaModel objects

Examples:

>>> record_transformer = PandasToRecordsTransformer(df=df,
                                                    record_name='KafkaRecord')
>>> record_transformer.from_pandas()
    [KafkaRecord(name='Daniel',
                 cool_level='low',
                 value=27.1,
                 date=datetime.date(2021, 3, 1))]
Source code in py2k/record.py
def from_pandas(self, df: pd.DataFrame = None) -> List['KafkaRecord']:
    """Creates list of KafkaModel objects from a pandas DataFrame

    Args:
        df (pd.DataFrame): Pandas dataframe. Defaults to None.

    Returns:
        List[KafkaModel]: serialized list of KafkaModel objects

    Examples:
        >>> record_transformer = PandasToRecordsTransformer(df=df,
                                                            record_name='KafkaRecord')
        >>> record_transformer.from_pandas()
            [KafkaRecord(name='Daniel',
                         cool_level='low',
                         value=27.1,
                         date=datetime.date(2021, 3, 1))]

    """
    if df is not None:
        return self._model.from_pandas(df)

    return self._model.from_pandas(self._df)

iter_from_pandas(self, df=None)

Creates iterator of KafkaModel objects from a pandas DataFrame

Parameters:

Name Type Description Default
df pd.DataFrame

Pandas dataframe. Defaults to None.

None

Returns:

Type Description
Iterator[KafkaModel]

serialized list of KafkaModel objects

Examples:

>>> record_transformer = PandasToRecordsTransformer(df=df,
                                                    record_name='KafkaRecord')
>>> record_transformer.iter_from_pandas()
Source code in py2k/record.py
def iter_from_pandas(self, df: pd.DataFrame = None):
    """Creates iterator of KafkaModel objects from a pandas DataFrame

    Args:
        df (pd.DataFrame): Pandas dataframe. Defaults to None.

    Returns:
        Iterator[KafkaModel]: serialized list of KafkaModel objects

    Examples:
        >>> record_transformer = PandasToRecordsTransformer(df=df,
                                                            record_name='KafkaRecord')
        >>> record_transformer.iter_from_pandas()
    """
    if df is not None:
        return self._model.iter_from_pandas(df)

    return self._model.iter_from_pandas(self._df)