🥑 A Kafka client and Avro (de)serializer library
To install, use your favourite dependency manager.
The package name is avrokado
.
npm i avrokado --save
yarn add avrokado --exact
For examples, please refer to the examples folder.
This will fetch the key
and value
schemas for a topicName
.
new SchemaRegistry (
endpoint: string,
topics: ReadonlyArray<string> | string,
version: number | 'latest' | 'all'
) => SchemaRegistry;
Where:
- endpoint: Endpoint for your Schema Registry;
- topics: Name of the topics (
Array
) or topic (String
) you want to retrieve the schemas for; - version: It can be either:
- A
number
, which will then force the function to only fetch that version; all
, which means it will fetchall
versions of the schemas;latest
, which will fetch only thelatest
schema versions.
- A
- schemas: Object containing the loaded schemas.
load
async load() => Promise<void>;
The load
method will load all the schemas selected to memory, and can be accessed through the schemas
field from the instanced class.
It is recommended to load the schemas BEFORE creating your Consumer or Producer.
This will create a consumer stream using node-rdkafka.
Please check their DOCUMENTATION since most of the options are from this library.
new AvroConsumer(
conf: Object,
topicConf: Object,
schemas: TopicsSchemas
) => AvroConsumer;
Where:
- consumerConfiguration:
librdkafka
's consumer-specific configuration; - defaultTopicConfiguration:
librdkafka
's default topic configuration; - streamOpts:
librdkafka
's read stream options; - schemas: An object with all
key
andvalue
schemas (return fromloadSchemas
).
Returns a AvroConsumer
, which extends from Readable
stream.
- stream: This is a
ConsumerStream
object fromnode-rdkafka
, which has another fieldconsumer
for theKafkaConsumer
itself (yes it's ugly).
Event name | Trigger/Description |
---|---|
avro |
Whenever a message is parsed with Avro |
ready |
When the Consumer Stream is created |
event.error |
Wraps ConsumerStream.consumer 's event.error event |
And any other event emitted by a ConsumerStream
from node-rdkafka
.
Specifically for avro
event emitted, it should be expected a AvroMessage
type, which contains:
Variable | Description |
---|---|
value |
The raw value buffer |
key |
The raw key buffer |
size |
Size in bytes of the raw message |
topic |
Name of the topic |
offset |
Offset in which the message is |
partition |
Partition from the topic |
timestamp |
When the message was retrieved |
valueSchemaId |
Schema ID for the value |
keySchemaId |
Schema ID for the key |
parsedValue |
Avro-deserialized value (from value) |
parsedKey |
Avro-deserialized key (from key) |
- To use the
KafkaConsumer
methods, for now you will need to doAvroConsumer.stream.consumer
.
This will create a producer using node-rdkafka.
Please check their DOCUMENTATION since most of the options are from this library.
new AvroProducer(
conf: Object,
topicConf: Object,
schemas: TopicsSchemas
) => AvroProducer;
Where:
- conf:
librdkafka
's producer-specific configuration; - topicConf?:
librdkafka
's default topic configuration; - schemas: An object with all
key
andvalue
schemas (return fromloadSchemas
).
Returns a AvroProducer
, which extends from Producer
.
connect
connect(
metadataOption: Object = {}
) => Promise<true | Error>;
The connect
method will connect to the Kafka broker and await
until a connection is successfully made or an error is thrown.
produce
produce(
topic: string,
partition?: number,
message?: unknown,
key?: unknown,
sendRaw?: boolean,
timestamp?: number,
opaque?: unknown
) => void;
The produce
method will produce a message to Kafka. If sendRaw
is set to true
, the message WILL NOT be avro encoded.
disconnect
disconnect(
timeout: number = 5000
) => Promise<true | Error>;
The disconnect
method will disconnect from the Kafka broker and await
until it is gracefully interrupted.
- Install
Docker
; - Install
docker-compose
; - Start up the images with
docker-compose up -d
and make sure zookeeper, kafka and schema-registry are all running; - Run
npm run test
oryarn test
.
- Improve in-code documentation.