%%% Copyright (c) 2014-2021 Klarna Bank AB (publ)
%%% Licensed under the Apache License, Version 2.0 (the "License");
%%% you may not use this file except in compliance with the License.
%%% You may obtain a copy of the License at
%%% http://www.apache.org/licenses/LICENSE-2.0
%%% Unless required by applicable law or agreed to in writing, software
%%% distributed under the License is distributed on an "AS IS" BASIS,
%%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%%% See the License for the specific language governing permissions and
%%% limitations under the License.
%% @doc A `brod_producer' is a `gen_server' that is responsible for producing
%% messages to a given partition of a given topic.
%% See the overview
%% for some more information and examples.
-export([ start_link/4
, produce/3
, produce_cb/4
, produce_no_ack/3
, stop/1
, sync_produce_request/2
-export([ code_change/3
, handle_call/3
, handle_cast/2
, handle_info/2
, init/1
, terminate/2
-if(?OTP_RELEASE < 25).
-export([ do_send_fun/4
, do_no_ack/2
, do_bufcb/2
-export_type([ config/0 ]).
%% default number of messages in buffer before block callers
%% default number of message sets sent on wire before block waiting for acks
%% by default, send max 1 MB of data in one batch (message set)
-define(DEFAULT_MAX_BATCH_SIZE, 1048576).
%% by default, require acks from all ISR
%% by default, leader should wait 10 seconds for replicas to ack
-define(DEFAULT_ACK_TIMEOUT, 10000).
%% by default, brod_producer will sleep for 0.5 second before trying to send
%% buffered messages again upon receiving a error from kafka
%% by default, brod_producer will try to retry 3 times before crashing
%% by default, no compression
-define(DEFAULT_COMPRESSION, no_compression).
%% by default, messages never linger around in buffer
%% should be sent immediately when onwire-limit allows
%% by default, messages never linger around in buffer
-define(RETRY_MSG, retry).
-define(DELAYED_SEND_MSG(REF), {delayed_send, REF}).
-define(config(Key, Default), proplists:get_value(Key, Config, Default)).
-type milli_sec() :: non_neg_integer().
-type delay_send_ref() :: ?undef | {reference(), reference()}.
-type topic() :: brod:topic().
-type partition() :: brod:partition().
-type offset() :: brod:offset().
-type config() :: proplists:proplist().
-type call_ref() :: brod:call_ref().
-type conn() :: kpro:connection().
{ client_pid :: pid()
, topic :: topic()
, partition :: partition()
, connection :: ?undef | conn()
, conn_mref :: ?undef | reference()
, buffer :: brod_producer_buffer:buf()
, retry_backoff_ms :: non_neg_integer()
, retry_tref :: ?undef | reference()
, delay_send_ref :: delay_send_ref()
, produce_req_vsn :: {default | resolved | configured,
-type state() :: #state{}.
%%%_* APIs =====================================================================
%% @doc Start (link) a partition producer.
%% Possible configs (passed as a proplist):
%% - `required_acks' (optional, default = -1):
%% How many acknowledgements the kafka broker should receive from the
%% clustered replicas before acking producer.
%% 0: the broker will not send any response
%% (this is the only case where the broker will not reply to a request)
%% 1: The leader will wait the data is written to the local log before
%% sending a response.
%% -1: If it is -1 the broker will block until the message is committed by
%% all in sync replicas before acking.
%% - `ack_timeout' (optional, default = 10000 ms):
%% Maximum time in milliseconds the broker can await the receipt of the
%% number of acknowledgements in `RequiredAcks'. The timeout is not an exact
%% limit on the request time for a few reasons: (1) it does not include
%% network latency, (2) the timer begins at the beginning of the processing
%% of this request so if many requests are queued due to broker overload
%% that wait time will not be included, (3) kafka leader will not terminate
%% a local write so if the local write time exceeds this timeout it will
%% not be respected.
%% - `partition_buffer_limit' (optional, default = 256):
%% How many requests (per-partition) can be buffered without blocking the
%% caller. The callers are released (by receiving the
%% 'brod_produce_req_buffered' reply) once the request is taken into buffer
%% and after the request has been put on wire, then the caller may expect
%% a reply 'brod_produce_req_acked' when the request is accepted by kafka.
%% - `partition_onwire_limit' (optional, default = 1):
%% How many message sets (per-partition) can be sent to kafka broker
%% asynchronously before receiving ACKs from broker.
%% NOTE: setting a number greater than 1 may cause messages being persisted
%% in an order different from the order they were produced.
%% - `max_batch_size' (in bytes, optional, default = 1M):
%% In case callers are producing faster than brokers can handle (or
%% congestion on wire), try to accumulate small requests into batches
%% as much as possible but not exceeding max_batch_size.
%% OBS: If compression is enabled, care should be taken when picking
%% the max batch size, because a compressed batch will be produced
%% as one message and this message might be larger than
%% 'max.message.bytes' in kafka config (or topic config)
%% - `max_retries' (optional, default = 3):
%% If `{max_retries, N}' is given, the producer retry produce request for
%% N times before crashing in case of failures like connection being
%% shutdown by remote or exceptions received in produce response from kafka.
%% The special value N = -1 means "retry indefinitely"
%% - `retry_backoff_ms' (optional, default = 500);
%% Time in milli-seconds to sleep before retry the failed produce request.
%% - `compression' (optional, default = `no_compression`):
%% `gzip' or `snappy' to enable compression
%% - `max_linger_ms' (optional, default = 0):
%% Messages are allowed to 'linger' in buffer for this amount of
%% milli-seconds before being sent.
%% Definition of 'linger': A message is in "linger" state when it is allowed
%% to be sent on-wire, but chosen not to (for better batching).
%% The default value is 0 for 2 reasons:
- Backward compatibility (for 2.x releases)
%% - Not to surprise `brod:produce_sync' callers
%% - `max_linger_count' (optional, default = 0):
%% At most this amount (count not size) of messages are allowed to "linger"
%% in buffer. Messages will be sent regardless of "linger" age when this
%% threshold is hit.
%% NOTE: It does not make sense to have this value set larger than
%% `partition_buffer_limit'
%% - `produce_req_vsn' (optional, default = undefined):
%% User determined produce API version to use, discard the API version range
%% received from kafka. This is to be used when a topic in newer version
%% kafka is configured to store older version message format.
%% e.g. When a topic in kafka 0.11 is configured to have message format
%% 0.10, sending message with headers would result in `unknown_server_error'
%% error code.
-spec start_link(pid(), topic(), partition(), config()) -> {ok, pid()}.
start_link(ClientPid, Topic, Partition, Config) ->
gen_server:start_link(?MODULE, {ClientPid, Topic, Partition, Config}, []).
%% @doc Produce a message to partition asynchronously.
%% The call is blocked until the request has been buffered in producer worker
%% The function returns a call reference of type `call_ref()' to the
%% caller so the caller can used it to expect (match) a
%% `#brod_produce_reply{result = brod_produce_req_acked}'
%% message after the produce request has been acked by kafka.
-spec produce(pid(), brod:key(), brod:value()) ->
{ok, call_ref()} | {error, any()}.
produce(Pid, Key, Value) ->
produce_cb(Pid, Key, Value, ?undef).
%% @doc Fire-n-forget, no ack, no back-pressure.
-spec produce_no_ack(pid(), brod:key(), brod:value()) -> ok.
produce_no_ack(Pid, Key, Value) ->
CallRef = #brod_call_ref{caller = ?undef},
AckCb = fun ?MODULE:do_no_ack/2,
Batch = brod_utils:make_batch_input(Key, Value),
Pid ! {produce, CallRef, Batch, AckCb},
%% @doc Async produce, evaluate callback if `AckCb' is a function
%% otherwise send `#brod_produce_reply{result = brod_produce_req_acked}'
%% message to caller after the produce request has been acked by kafka.
-spec produce_cb(pid(), brod:key(), brod:value(),
?undef | brod:produce_ack_cb()) ->
ok | {ok, call_ref()} | {error, any()}.
produce_cb(Pid, Key, Value, AckCb) ->
CallRef = #brod_call_ref{ caller = self()
, callee = Pid
, ref = Mref = erlang:monitor(process, Pid)
Batch = brod_utils:make_batch_input(Key, Value),
Pid ! {produce, CallRef, Batch, AckCb},
#brod_produce_reply{ call_ref = #brod_call_ref{ ref = Mref }
, result = ?buffered
} ->
erlang:demonitor(Mref, [flush]),
case AckCb of
?undef -> {ok, CallRef};
_ -> ok
{'DOWN', Mref, process, _Pid, Reason} ->
{error, {producer_down, Reason}}
%% @doc Block calling process until it receives an acked reply for the
%% `CallRef'.
%% The caller pid of this function must be the caller of
%% {@link produce/3} in which the call reference was created.
-spec sync_produce_request(call_ref(), timeout()) ->
{ok, offset()} | {error, Reason}
when Reason :: timeout | {producer_down, any()}.
sync_produce_request(CallRef, Timeout) ->
#brod_call_ref{ caller = Caller
, callee = Callee
, ref = Ref
} = CallRef,
Caller = self(), %% assert
Mref = erlang:monitor(process, Callee),
#brod_produce_reply{ call_ref = #brod_call_ref{ref = Ref}
, base_offset = Offset
, result = brod_produce_req_acked
} ->
erlang:demonitor(Mref, [flush]),
{ok, Offset};
{'DOWN', Mref, process, _Pid, Reason} ->
{error, {producer_down, Reason}}
Timeout ->
erlang:demonitor(Mref, [flush]),
{error, timeout}
%% @doc Stop the process
-spec stop(pid()) -> ok.
stop(Pid) -> ok = gen_server:call(Pid, stop).
%%%_* gen_server callbacks =====================================================
%% @private
init({ClientPid, Topic, Partition, Config}) ->
erlang:process_flag(trap_exit, true),
BufferLimit = ?config(partition_buffer_limit, ?DEFAULT_PARTITION_BUFFER_LIMIT),
OnWireLimit = ?config(partition_onwire_limit, ?DEFAULT_PARTITION_ONWIRE_LIMIT),
MaxBatchSize = ?config(max_batch_size, ?DEFAULT_MAX_BATCH_SIZE),
MaxRetries = ?config(max_retries, ?DEFAULT_MAX_RETRIES),
RetryBackoffMs = ?config(retry_backoff_ms, ?DEFAULT_RETRY_BACKOFF_MS),
RequiredAcks = ?config(required_acks, ?DEFAULT_REQUIRED_ACKS),
AckTimeout = ?config(ack_timeout, ?DEFAULT_ACK_TIMEOUT),
Compression = ?config(compression, ?DEFAULT_COMPRESSION),
MaxLingerMs = ?config(max_linger_ms, ?DEFAULT_MAX_LINGER_MS),
MaxLingerCount = ?config(max_linger_count, ?DEFAULT_MAX_LINGER_COUNT),
SendFun = make_send_fun(Topic, Partition, RequiredAcks, AckTimeout, Compression),
Buffer = brod_producer_buffer:new(BufferLimit, OnWireLimit, MaxBatchSize,
MaxRetries, MaxLingerMs, MaxLingerCount,
DefaultVsn = brod_kafka_apis:default_version(produce),
ReqVersion = case ?config(produce_req_vsn, ?undef) of
?undef -> {default, DefaultVsn};
Vsn -> {configured, Vsn}
State = #state{ client_pid = ClientPid
, topic = Topic
, partition = Partition
, buffer = Buffer
, retry_backoff_ms = RetryBackoffMs
, connection = ?undef
, produce_req_vsn = ReqVersion
%% Register self() to client.
ok = brod_client:register_producer(ClientPid, Topic, Partition),
{ok, State}.
%% @private
#state{delay_send_ref = {_Tref, MsgRef}} = State0) ->
State1 = State0#state{delay_send_ref = ?undef},
{ok, State} = maybe_produce(State1),
{noreply, State};
handle_info(?DELAYED_SEND_MSG(_MsgRef), #state{} = State) ->
%% stale delay-send timer expiration, discard
{noreply, State};
handle_info(?RETRY_MSG, #state{} = State0) ->
State1 = State0#state{retry_tref = ?undef},
{ok, State2} = maybe_reinit_connection(State1),
%% For retry-interval deterministic, produce regardless of connection state.
%% In case it has failed to find a new connection in maybe_reinit_connection/1
%% the produce call should fail immediately with {error, no_leader_connection}
%% and a new retry should be scheduled (if not reached max_retries yet)
{ok, State} = maybe_produce(State2),
{noreply, State};
handle_info({'DOWN', _MonitorRef, process, Pid, Reason},
#state{connection = Pid, buffer = Buffer0} = State) ->
case brod_producer_buffer:is_empty(Buffer0) of
true ->
%% no connection restart in case of empty request buffer
{noreply, State#state{connection = ?undef, conn_mref = ?undef}};
false ->
%% put sent requests back to buffer immediately after connection down
%% to fail fast if retry is not allowed (reaching max_retries).
Buffer = brod_producer_buffer:nack_all(Buffer0, Reason),
{ok, NewState} = schedule_retry(State#state{buffer = Buffer}),
{noreply, NewState#state{connection = ?undef, conn_mref = ?undef}}
handle_info({produce, CallRef, Batch, AckCb}, #state{partition = Partition} = State) ->
BufCb = make_bufcb(CallRef, AckCb, Partition),
handle_produce(BufCb, Batch, State);
handle_info({msg, Pid, #kpro_rsp{ api = produce
, ref = Ref
, msg = Rsp
#state{ connection = Pid
, buffer = Buffer
} = State) ->
[TopicRsp] = kpro:find(responses, Rsp),
Topic = kpro:find(topic, TopicRsp),
[PartitionRsp] = kpro:find(partition_responses, TopicRsp),
Partition = kpro:find(partition, PartitionRsp),
ErrorCode = kpro:find(error_code, PartitionRsp),
Offset = kpro:find(base_offset, PartitionRsp),
Topic = State#state.topic, %% assert
Partition = State#state.partition, %% assert
{ok, NewState} =
case ?IS_ERROR(ErrorCode) of
true ->
_ = log_error_code(Topic, Partition, Offset, ErrorCode),
Error = {produce_response_error, Topic, Partition,
Offset, ErrorCode},
is_retriable(ErrorCode) orelse exit({not_retriable, Error}),
NewBuffer = brod_producer_buffer:nack(Buffer, Ref, Error),
schedule_retry(State#state{buffer = NewBuffer});
false ->
NewBuffer = brod_producer_buffer:ack(Buffer, Ref, Offset),
maybe_produce(State#state{buffer = NewBuffer})
{noreply, NewState};
handle_info(_Info, #state{} = State) ->
{noreply, State}.
%% @private
handle_call(stop, _From, #state{} = State) ->
{stop, normal, ok, State};
handle_call(Call, _From, #state{} = State) ->
{reply, {error, {unsupported_call, Call}}, State}.
%% @private
handle_cast(_Cast, #state{} = State) ->
{noreply, State}.
%% @private
code_change(_OldVsn, #state{} = State, _Extra) ->
{ok, State}.
%% @private
terminate(Reason, #state{client_pid = ClientPid
, topic = Topic
, partition = Partition
}) ->
case brod_utils:is_normal_reason(Reason) of
true ->
brod_client:deregister_producer(ClientPid, Topic, Partition);
false ->
%% @private
-if(?OTP_RELEASE < 25).
format_status(normal, [_PDict, State=#state{}]) ->
[{data, [{"State", State}]}];
format_status(terminate, [_PDict, State=#state{buffer = Buffer}]) ->
%% Do not format the buffer attribute when process terminates abnormally and logs an error
%% but allow it when is a sys:get_status/1.2
State#state{buffer = brod_producer_buffer:empty_buffers(Buffer)}.
format_status(#{reason := normal} = Status) ->
format_status(#{reason := terminate, state := #state{buffer = Buffer} = State} = Status) ->
%% Do not format the buffer attribute when process terminates abnormally and logs an error
%% but allow it when is a sys:get_status/1.2
Status#{state => State#state{buffer = brod_producer_buffer:empty_buffers(Buffer)}}.
%%%_* Internal Functions =======================================================
make_send_fun(Topic, Partition, RequiredAcks, AckTimeout, Compression) ->
ExtraArg = {Topic, Partition, RequiredAcks, AckTimeout, Compression},
{fun ?MODULE:do_send_fun/4, ExtraArg}.
%% @private
do_send_fun(ExtraArg, Conn, BatchInput, Vsn) ->
{Topic, Partition, RequiredAcks, AckTimeout, Compression} = ExtraArg,
ProduceRequest =
brod_kafka_request:produce(Vsn, Topic, Partition, BatchInput,
RequiredAcks, AckTimeout, Compression),
case send(Conn, ProduceRequest) of
ok when ProduceRequest#kpro_req.no_ack ->
ok ->
{ok, ProduceRequest#kpro_req.ref};
{error, Reason} ->
{error, Reason}
%% @private
do_no_ack(_Partition, _BaseOffset) -> ok.
-spec log_error_code(topic(), partition(), offset(), brod:error_code()) -> _.
log_error_code(Topic, Partition, Offset, ErrorCode) ->
?BROD_LOG_ERROR("Produce error ~s-~B Offset: ~B Error: ~p",
[Topic, Partition, Offset, ErrorCode]).
make_bufcb(CallRef, AckCb, Partition) ->
{fun ?MODULE:do_bufcb/2, _ExtraArg = {CallRef, AckCb, Partition}}.
%% @private
do_bufcb({CallRef, AckCb, Partition}, Arg) ->
#brod_call_ref{caller = Pid} = CallRef,
case Arg of
?buffered when is_pid(Pid) ->
Reply = #brod_produce_reply{ call_ref = CallRef
, result = ?buffered
erlang:send(Pid, Reply);
?buffered ->
%% caller requires no ack
{?acked, BaseOffset} when AckCb =:= ?undef ->
Reply = #brod_produce_reply{ call_ref = CallRef
, base_offset = BaseOffset
, result = ?acked
erlang:send(Pid, Reply);
{?acked, BaseOffset} when is_function(AckCb, 2) ->
AckCb(Partition, BaseOffset)
handle_produce(BufCb, Batch,
#state{retry_tref = Ref} = State) when is_reference(Ref) ->
%% pending on retry, add to buffer regardless of connection state
do_handle_produce(BufCb, Batch, State);
handle_produce(BufCb, Batch,
#state{connection = Pid} = State) when is_pid(Pid) ->
%% Connection is alive, add to buffer, and try send produce request
do_handle_produce(BufCb, Batch, State);
handle_produce(BufCb, Batch, #state{} = State) ->
%% this is the first request after fresh start/restart or connection death
{ok, NewState} = maybe_reinit_connection(State),
do_handle_produce(BufCb, Batch, NewState).
do_handle_produce(BufCb, Batch, #state{buffer = Buffer} = State) ->
NewBuffer = brod_producer_buffer:add(Buffer, BufCb, Batch),
State1 = State#state{buffer = NewBuffer},
{ok, NewState} = maybe_produce(State1),
{noreply, NewState}.
-spec maybe_reinit_connection(state()) -> {ok, state()}.
maybe_reinit_connection(#state{ client_pid = ClientPid
, connection = OldConnection
, conn_mref = OldConnMref
, topic = Topic
, partition = Partition
, buffer = Buffer0
, produce_req_vsn = ReqVersion
} = State) ->
%% Lookup, or maybe (re-)establish a connection to partition leader
case brod_client:get_leader_connection(ClientPid, Topic, Partition) of
{ok, OldConnection} ->
%% Still the old connection
{ok, State};
{ok, Connection} ->
ok = maybe_demonitor(OldConnMref),
ConnMref = erlang:monitor(process, Connection),
%% Make sure the sent but not acked ones are put back to buffer
Buffer = brod_producer_buffer:nack_all(Buffer0, new_leader),
{ok, State#state{ connection = Connection
, conn_mref = ConnMref
, buffer = Buffer
, produce_req_vsn = req_vsn(Connection, ReqVersion)
{error, Reason} ->
ok = maybe_demonitor(OldConnMref),
%% Make sure the sent but not acked ones are put back to buffer
Buffer = brod_producer_buffer:nack_all(Buffer0, no_leader_connection),
?BROD_LOG_WARNING("Failed to (re)init connection, reason:\n~p",
{ok, State#state{ connection = ?undef
, conn_mref = ?undef
, buffer = Buffer
maybe_produce(#state{retry_tref = Ref} = State) when is_reference(Ref) ->
%% pending on retry after failure
{ok, State};
maybe_produce(#state{ buffer = Buffer0
, connection = Connection
, delay_send_ref = DelaySendRef0
, produce_req_vsn = {_, Vsn}
} = State) ->
_ = cancel_delay_send_timer(DelaySendRef0),
case brod_producer_buffer:maybe_send(Buffer0, Connection, Vsn) of
{ok, Buffer} ->
%% One or more produce requests are sent;
%% Or no more message left to send;
%% Or it has hit `partition_onwire_limit', pending on reply from kafka
{ok, State#state{buffer = Buffer}};
{{delay, Timeout}, Buffer} ->
%% Keep the messages in buffer for better batching
DelaySendRef = start_delay_send_timer(Timeout),
NewState = State#state{ buffer = Buffer
, delay_send_ref = DelaySendRef
{ok, NewState};
{retry, Buffer} ->
%% Failed to send, e.g. due to connection error, retry later
schedule_retry(State#state{buffer = Buffer})
%% Resolve produce API version to use.
%% If api version is configured by user, always use configured version,
%% Otherwise if we have a connection to partition leader
%% pick the highest version supported by kafka.
%% If connection is down, keep using the version previously used.
req_vsn(_, {configured, Vsn}) ->
{configured, Vsn};
req_vsn(Conn, _NotConfigured) when is_pid(Conn) ->
{resolved, brod_kafka_apis:pick_version(Conn, produce)}.
%% Start delay send timer.
-spec start_delay_send_timer(milli_sec()) -> delay_send_ref().
start_delay_send_timer(Timeout) ->
MsgRef = make_ref(),
TRef = erlang:send_after(Timeout, self(), ?DELAYED_SEND_MSG(MsgRef)),
{TRef, MsgRef}.
%% Ensure delay send timer is canceled.
%% But not flushing the possibly already sent (stale) message
%% Stale message should be discarded in handle_info
-spec cancel_delay_send_timer(delay_send_ref()) -> _.
cancel_delay_send_timer(?undef) -> ok;
cancel_delay_send_timer({Tref, _Msg}) -> _ = erlang:cancel_timer(Tref).
maybe_demonitor(?undef) ->
maybe_demonitor(Mref) ->
true = erlang:demonitor(Mref, [flush]),
schedule_retry(#state{ retry_tref = ?undef
, retry_backoff_ms = Timeout
} = State) ->
TRef = erlang:send_after(Timeout, self(), ?RETRY_MSG),
{ok, State#state{retry_tref = TRef}};
schedule_retry(State) ->
%% retry timer has been already activated
{ok, State}.
is_retriable(EC) when EC =:= ?unknown_topic_or_partition;
EC =:= ?leader_not_available;
EC =:= ?not_leader_for_partition;
EC =:= ?request_timed_out;
EC =:= ?not_enough_replicas;
EC =:= ?not_enough_replicas_after_append ->
is_retriable(_) ->
-spec send(?undef | pid(), kpro:req()) -> ok | {error, any()}.
send(?undef, _KafkaReq) -> {error, no_leader_connection};
send(Connection, KafkaReq) -> kpro:request_async(Connection, KafkaReq).
