Skip to content

Commit

Permalink
Avoid possible use-after-free when updating last fetched offset
Browse files Browse the repository at this point in the history
  • Loading branch information
edenhill committed May 18, 2016
1 parent a5e7be8 commit 8fcea73
Showing 1 changed file with 6 additions and 2 deletions.
8 changes: 6 additions & 2 deletions src/rdkafka_broker.c
Original file line number Diff line number Diff line change
Expand Up @@ -3456,21 +3456,25 @@ rd_kafka_fetch_reply_handle (rd_kafka_broker_t *rkb,
if (rd_kafka_q_len(&tmp_opq) > 0) {
/* Update partitions fetch offset based on
* last message's offest. */
int64_t last_offset = -1;
rd_kafka_op_t *rko =
rd_kafka_q_last(&tmp_opq,
RD_KAFKA_OP_FETCH,
0 /* no error ops */);

if (rko)
last_offset = rko->rko_offset;

if (rd_kafka_q_concat(&rktp->rktp_fetchq,
&tmp_opq) == -1) {
/* rktp fetchq disabled, probably
* shutting down. Drop messages. */
rd_kafka_q_purge0(&tmp_opq,
0/*no-lock*/);
} else {
if (rko)
if (last_offset != -1)
rktp->rktp_offsets.fetch_offset =
rko->rko_offset + 1;
last_offset + 1;
rd_atomic64_add(&rktp->rktp_c.msgs,
rd_kafka_q_len(&tmp_opq));
}
Expand Down

0 comments on commit 8fcea73

Please sign in to comment.