Skip to content

First Steps

KafkaRecord

The simplest KafkaRecord can be defined through inheritance of the KafkaRecord class:

import datetime
from py2k.record import KafkaRecord


class MyRecord(KafkaRecord):
    name: str
    age: int
    birthday: datetime.date

The KafkaRecord makes use of Pydantic models to allow for automatic avro schema generation.

KafkaWriter

The KafkaWriter is the object responsible for take a list of KafkaRecords and posting them onto Kafka. Using the KafkaWriter only requires that you define your schema registry config and producer config that is compatible with the confluent_kafka API. Specifically:

Note

Additionally, you can have a look at the librdkafka documentation for addtional configuration options or the Confluent Schema Registry documentation

The KafkaWriter can be used as follows:

from py2k.writer import KafkaWriter

writer = KafkaWriter(
    topic='dummy_topic',
    schema_registry_config={'url': 'http://myschemaregistry.com:8081'},
    producer_config={'bootstrap.servers': 'myproducer.com:9092'}
)

In practice, copy this and put it into a file called test_py2k.py and fill in your kafka configuration.

import datetime
from py2k.record import KafkaRecord


class MyRecord(KafkaRecord):
    name: str
    age: int
    birthday: datetime.date

record = MyRecord(**{'name': 'Dan',
                     'age': 27,
                     'birthday': datetime.date(1993,9,4)})

from py2k.writer import KafkaWriter

writer = KafkaWriter(
    topic='dummy_topic',
    schema_registry_config={'url': 'http://myschemaregistry.com:8081'},
    producer_config={'bootstrap.servers': 'myproducer.com:9092'}
)

writer.write([record])

running this:

$ python test_py2k.py
100%|██████████| 1/1 [00:00<00:00,  7.69it/s]

// It has now pushed that record onto Kafka