First Steps¶
KafkaRecord¶
The simplest KafkaRecord can be defined through inheritance of the KafkaRecord
class:
import datetime
from py2k.record import KafkaRecord
class MyRecord(KafkaRecord):
name: str
age: int
birthday: datetime.date
The KafkaRecord
makes use of Pydantic models to allow for automatic avro schema generation.
KafkaWriter¶
The KafkaWriter
is the object responsible for take a list of KafkaRecords
and posting them onto Kafka. Using the KafkaWriter
only requires that you define your schema registry config and producer config that is compatible with the confluent_kafka API.
Specifically:
Note
Additionally, you can have a look at the librdkafka documentation for addtional configuration options or the Confluent Schema Registry documentation
The KafkaWriter
can be used as follows:
from py2k.writer import KafkaWriter
writer = KafkaWriter(
topic='dummy_topic',
schema_registry_config={'url': 'http://myschemaregistry.com:8081'},
producer_config={'bootstrap.servers': 'myproducer.com:9092'}
)
In practice, copy this and put it into a file called test_py2k.py
and fill in your kafka configuration.
import datetime
from py2k.record import KafkaRecord
class MyRecord(KafkaRecord):
name: str
age: int
birthday: datetime.date
record = MyRecord(**{'name': 'Dan',
'age': 27,
'birthday': datetime.date(1993,9,4)})
from py2k.writer import KafkaWriter
writer = KafkaWriter(
topic='dummy_topic',
schema_registry_config={'url': 'http://myschemaregistry.com:8081'},
producer_config={'bootstrap.servers': 'myproducer.com:9092'}
)
writer.write([record])
running this:
$ python test_py2k.py
100%|██████████| 1/1 [00:00<00:00, 7.69it/s]
// It has now pushed that record onto Kafka