Reference¶
py2k.writer.KafkaWriter
¶
__init__(self, topic, schema_registry_config, producer_config)
special
¶
A class for easy writing of data to kafka
Parameters:
Name | Type | Description | Default |
---|---|---|---|
topic |
str |
topic to post to |
required |
schema_registry_config |
Dict[str, Any] |
a dictionary compatible
with the |
required |
producer_config |
Dict[str, Any] |
a dictionary compatible with the
|
required |
write(self, records, verbose=False)
¶
Writes data to Kafka.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
records |
Iterable[KafkaRecord] |
Serialized |
required |
verbose |
bool |
Whether or not you want to show the loading bar |
False |
Examples:
>>> from py2k.writer import KafkaWriter
>>> writer = KafkaWriter(topic=topic,
schema_registry_config=schema_registry_config,
producer_config=producer_config)
>>> writer.write(records)
100%|██████████| 4/4 [00:00<00:00, 7.69it/s]
py2k.record.PandasToRecordsTransformer
¶
class model for automatic serialization of Pandas DataFrame to KafkaRecord
__init__(self, df, record_name, fields_defaults=None, types_defaults=None, optional_fields=None, key_fields=None, include_key=None)
special
¶
Parameters:
Name | Type | Description | Default |
---|---|---|---|
df |
pd.DataFrame |
Pandas dataframe to serialize |
required |
record_name |
str |
destination Pydantic model |
required |
fields_defaults |
Dict[str, object] |
default values for fields in the dataframe. The keys are the fields names. Defaults to None. |
None |
types_defaults |
Dict[object, object] |
default values for the types in the dataframe. The keys are the types, e.g. int. Defaults to None. |
None |
optional_fields |
List[str] |
list of fields which should be marked as optional. Defaults to None. |
None |
key_fields |
Set[str] |
set of fields which are meant to be key of the schema |
None |
include_key |
bool |
bool: Indicator whether key fields should be included in value |
None |
Source code in py2k/record.py
def __init__(self, df: pd.DataFrame, record_name: str,
fields_defaults: Dict[str, object] = None,
types_defaults: Dict[object, object] = None,
optional_fields: List[str] = None,
key_fields: Set[str] = None,
include_key: bool = None):
"""
Args:
df (pd.DataFrame): Pandas dataframe to serialize
record_name (str): destination Pydantic model
fields_defaults (Dict[str, object], optional): default values for
fields in the dataframe. The keys are the fields names.
Defaults to None.
types_defaults (Dict[object, object], optional): default values
for the types in the dataframe. The keys are the types,
e.g. int. Defaults to None.
optional_fields (List[str], optional): list of fields which should
be marked as optional. Defaults to None.
key_fields (Set[str], optional): set of fields which are meant
to be key of the schema
include_key: bool: Indicator whether key fields should be
included in value
"""
self._df = df
_class = self._class(key_fields, include_key)
model_creator = PandasModelCreator(df, record_name, fields_defaults,
types_defaults, optional_fields,
_class)
self._model = model_creator.create()
from_pandas(self, df=None)
¶
Creates list of KafkaModel objects from a pandas DataFrame
Parameters:
Name | Type | Description | Default |
---|---|---|---|
df |
pd.DataFrame |
Pandas dataframe. Defaults to None. |
None |
Returns:
Type | Description |
---|---|
List[KafkaModel] |
serialized list of KafkaModel objects |
Examples:
>>> record_transformer = PandasToRecordsTransformer(df=df,
record_name='KafkaRecord')
>>> record_transformer.from_pandas()
[KafkaRecord(name='Daniel',
cool_level='low',
value=27.1,
date=datetime.date(2021, 3, 1))]
Source code in py2k/record.py
def from_pandas(self, df: pd.DataFrame = None) -> List['KafkaRecord']:
"""Creates list of KafkaModel objects from a pandas DataFrame
Args:
df (pd.DataFrame): Pandas dataframe. Defaults to None.
Returns:
List[KafkaModel]: serialized list of KafkaModel objects
Examples:
>>> record_transformer = PandasToRecordsTransformer(df=df,
record_name='KafkaRecord')
>>> record_transformer.from_pandas()
[KafkaRecord(name='Daniel',
cool_level='low',
value=27.1,
date=datetime.date(2021, 3, 1))]
"""
if df is not None:
return self._model.from_pandas(df)
return self._model.from_pandas(self._df)
iter_from_pandas(self, df=None)
¶
Creates iterator of KafkaModel objects from a pandas DataFrame
Parameters:
Name | Type | Description | Default |
---|---|---|---|
df |
pd.DataFrame |
Pandas dataframe. Defaults to None. |
None |
Returns:
Type | Description |
---|---|
Iterator[KafkaModel] |
serialized list of KafkaModel objects |
Examples:
>>> record_transformer = PandasToRecordsTransformer(df=df,
record_name='KafkaRecord')
>>> record_transformer.iter_from_pandas()
Source code in py2k/record.py
def iter_from_pandas(self, df: pd.DataFrame = None):
"""Creates iterator of KafkaModel objects from a pandas DataFrame
Args:
df (pd.DataFrame): Pandas dataframe. Defaults to None.
Returns:
Iterator[KafkaModel]: serialized list of KafkaModel objects
Examples:
>>> record_transformer = PandasToRecordsTransformer(df=df,
record_name='KafkaRecord')
>>> record_transformer.iter_from_pandas()
"""
if df is not None:
return self._model.iter_from_pandas(df)
return self._model.iter_from_pandas(self._df)