An opinionated, highly specific, Elixir wrapper around brod: the Erlang Kafka client. ☕
NOTE: Although we're using this in production at Spreedly it is still under active development. The API may change and there may be serious bugs we've yet to encounter.
-
Add
kaffe
to your list of dependencies inmix.exs
:def deps do [{:kaffe, git: "git@github.com:spreedly/kaffe.git", branch: "master"}] end
-
Ensure
kaffe
is started with your application:def application do [applications: [:logger, :kaffe]] end
-
Configure a Kaffe Consumer and/or Producer
-
Add a
handle_message/1
function to a local module (e.g.MessageProcessor
). This function will be called with each Kafka message as a map. Each message map will include the topic and partition in addition to the normal Kafka message metadata.The module's
handle_message/1
function must return:ok
or Kaffe will throw an error. In normal (synchronous consumer) operation the Kaffe consumer will block until yourhandle_message/1
function returns:ok
.defmodule MessageProcessor def handle_message(%{key: key, value: value} = message) do IO.inspect message IO.puts "#{key}: #{value}" :ok # The handle_message function MUST return :ok end end
%{ attributes: 0, crc: 1914336469, key: "kafka message key", magic_byte: 0, offset: 41, partition: 17, topic: "some-kafka-topic", value: "the actual kafka message value is here" }
-
Configure your Kaffe Consumer in your mix config
config :kaffe, consumer: [ endpoints: [kafka: 9092], # that's [hostname: kafka_port] topics: ["interesting-topic"], # the topic(s) that will be consumed consumer_group: "your-app-consumer-group", # the consumer group for tracking offsets in Kafka message_handler: MessageProcessor, # the module from Step 1 that will process messages # optional async_message_ack: false, # see "async message acknowledgement" below start_with_earliest_message: true # default false ],
The
start_with_earliest_message
field controls where your consumer group starts when it starts for the very first time. Once offsets have been committed to Kafka then they will supercede this option. If omitted then your consumer group will start processing from the most recent messages in the topic instead of consuming all available messages.To configure a Kaffe Consumer for a Heroku Kafka compatible environment including SSL omit the
endpoint
and instead setheroku_kafka_env: true
config :kaffe, consumer: [ heroku_kafka_env: true, topics: ["interesting-topic"], consumer_group: "your-app-consumer-group", message_handler: MessageProcessor ]
With that setting in place Kaffe will automatically pull required info from the following ENV variables:
KAFKA_URL
KAFKA_CLIENT_CERT
KAFKA_CLIENT_CERT_KEY
KAFKA_TRUSTED_CERT
(not used yet)
-
Add
Kaffe.Consumer
as a worker in your supervision treeworker(Kaffe.Consumer, [])
If you need asynchronous message consumption:
-
Add a
handle_message/2
function to your processing module. This function will be called with the Consumerpid
and the Kafka message. When your processing is complete you will need to callKaffe.Consumer.ack(pid, message)
to acknowledge the offset. -
Set
async
to true when you start the Kaffe.Consumerconsumer_group = "demo-commitlog-consumer" topic = "commitlog" message_handler = MessageProcessor async = true worker(Kaffe.Consumer, [consumer_group, topics, message_handler, async]) # … in your message handler module def handle_message(pid, message) do spawn_message_processing_worker(pid, message) :ok # MUST return :ok end # … somewhere in your system when the worker is finished processing Kaffe.Consumer.ack(pid, message)
NOTE: Asynchronous consumption means your system will no longer provide any backpressure to the Kaffe.Consumer. You will also need to add robust measures to your system to ensure that no messages are lost in processing. IE if you spawn 5 workers processing a series of asynchronous messages from Kafka and 1 of them crashes without acknowledgement then it's possible and likely that the message will be skipped entirely.
Kafka only tracks a single numeric offset, not individual messages. If a message fails and a later offset is committed then the failed message will not be sent again.
It's possible that your topic and system are entirely ok with losing some messages (i.e. frequent metrics that aren't individually important).
Kaffe.Producer
handles producing messages to Kafka and will automatically select the topic partitions per message or can be given a function to call to determine the partition per message.
-
Configure your Kaffe Producer in your mix config
config :kaffe, producer: [ endpoints: [kafka: 9092], # [hostname: port] topics: ["kafka-topic"], # optional partition_strategy: :round_robin ]
The
partition_strategy
setting can be one of::md5
: (default) provides even and deterministic distrbution of the messages over the available partitions based on an MD5 hash of the key:round_robin
: cycle through each partition starting from 0 at application start:random
: select a random partition for each message- function: a given function to call to determine the correct partition
To configure a Kaffe Producer for a Heroku Kafka compatible environment including SSL omit the
endpoint
and instead setheroku_kafka_env: true
config :kaffe, producer: [ heroku_kafka_env: true, topics: ["kafka-topic"], # optional partition_strategy: :round_robin ]
With that setting in place Kaffe will automatically pull required info from the following ENV variables:
KAFKA_URL
KAFKA_CLIENT_CERT
KAFKA_CLIENT_CERT_KEY
KAFKA_TRUSTED_CERT
-
Add
Kaffe.Producer
as a worker in your supervision tree.worker(Kaffe.Producer, [])
Currently only synchronous message production is supported.
Once the Kaffe.Producer
has started there are three ways to produce:
-
key
/value
- The key/value will be produced to the first topic given to the producer when it was started. The partition will be selected with the chosen strategy or given function.Kaffe.Producer.produce_sync("key", "value")
-
topic
/key
/value
- The key/value will be produced to the given topic.Kaffe.Producer.produce_sync("whitelist", "key", "value")
-
topic
/partition
/key
/value
- The key/value will be produced to the given topic/partition.Kaffe.Producer.produce_sync("whitelist", 2, "key", "value")
NOTE: With this approach Kaffe will not calculate the next partition since it assumes you're taking over that job by giving it a specific partition.