Skip to content

Commit

Permalink
Merge branch 'master' into confluent-debian
Browse files Browse the repository at this point in the history
  • Loading branch information
edenhill committed May 23, 2016
2 parents e49bc8a + 2213fb2 commit e1f7ff5
Show file tree
Hide file tree
Showing 11 changed files with 276 additions and 34 deletions.
4 changes: 2 additions & 2 deletions CONFIGURATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ ssl.crl.location | * | |
sasl.mechanisms | * | GSSAPI, PLAIN | GSSAPI | SASL mechanism to use for authentication. Supported: GSSAPI, PLAIN. **NOTE**: Despite the name only one mechanism must be configured. <br>*Type: string*
sasl.kerberos.service.name | * | | kafka | Kerberos principal name that Kafka runs as. <br>*Type: string*
sasl.kerberos.principal | * | | kafkaclient | This client's Kerberos principal name. <br>*Type: string*
sasl.kerberos.kinit.cmd | * | | kinit | Kerberos kinit command path. <br>*Type: string*
sasl.kerberos.keytab | * | | | Path to Kerberos keytab file. Uses system default if not set. <br>*Type: string*
sasl.kerberos.kinit.cmd | * | | kinit -S "%{sasl.kerberos.service.name}/%{broker.name}" -k -t "%{sasl.kerberos.keytab}" %{sasl.kerberos.principal} | Full kerberos kinit command string, %{config.prop.name} is replaced by corresponding config object value, %{broker.name} returns the broker's hostname. <br>*Type: string*
sasl.kerberos.keytab | * | | | Path to Kerberos keytab file. Uses system default if not set.**NOTE**: This is not automatically used but must be added to the template in sasl.kerberos.kinit.cmd as ` ... -t %{sasl.kerberos.keytab}`. <br>*Type: string*
sasl.kerberos.min.time.before.relogin | * | 1 .. 86400000 | 60000 | Minimum time in milliseconds between key refresh attempts. <br>*Type: integer*
sasl.username | * | | | SASL username for use with the PLAIN mechanism <br>*Type: string*
sasl.password | * | | | SASL password for use with the PLAIN mechanism <br>*Type: string*
Expand Down
2 changes: 1 addition & 1 deletion src-cpp/rdkafkacpp.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ namespace RdKafka {
* @remark This value should only be used during compile time,
* for runtime checks of version use RdKafka::version()
*/
#define RD_KAFKA_VERSION 0x00090100
#define RD_KAFKA_VERSION 0x000901ff

/**
* @brief Returns the librdkafka version as integer.
Expand Down
2 changes: 1 addition & 1 deletion src/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ SRCS= rdkafka.c rdkafka_broker.c rdkafka_msg.c rdkafka_topic.c \
rdkafka_assignor.c rdkafka_range_assignor.c \
rdkafka_roundrobin_assignor.c rdkafka_feature.c \
rdcrc32.c rdaddr.c rdrand.c rdlist.c tinycthread.c \
rdlog.c \
rdlog.c rdstring.c \
$(SRCS_y)

HDRS= rdkafka.h
Expand Down
8 changes: 8 additions & 0 deletions src/rd.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,14 @@ static RD_INLINE RD_UNUSED char *rd_strndup(const char *s, size_t len) {
}


char *rd_string_render (const char *template,
char *errstr, size_t errstr_size,
ssize_t (*callback) (const char *key,
char *buf, size_t size,
void *opaque),
void *opaque);



/*
* Portability
Expand Down
13 changes: 13 additions & 0 deletions src/rdkafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -1090,6 +1090,19 @@ rd_kafka_t *rd_kafka_new (rd_kafka_type_t type, rd_kafka_conf_t *conf,
return NULL;
}

#if WITH_SASL
if (rk->rk_conf.security_protocol == RD_KAFKA_PROTO_SASL_SSL ||
rk->rk_conf.security_protocol == RD_KAFKA_PROTO_SASL_PLAINTEXT) {
/* Validate SASL config */
if (rd_kafka_sasl_conf_validate(rk, errstr, errstr_size) == -1) {
rd_kafka_destroy_internal(rk);
rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INVALID_ARG,
EINVAL);
return NULL;
}
}
#endif

#if WITH_SSL
if (rk->rk_conf.security_protocol == RD_KAFKA_PROTO_SSL ||
rk->rk_conf.security_protocol == RD_KAFKA_PROTO_SASL_SSL) {
Expand Down
2 changes: 1 addition & 1 deletion src/rdkafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ typedef SSIZE_T ssize_t;
* @remark This value should only be used during compile time,
* for runtime checks of version use rd_kafka_version()
*/
#define RD_KAFKA_VERSION 0x00090100
#define RD_KAFKA_VERSION 0x000901ff

/**
* @brief Returns the librdkafka version as integer.
Expand Down
12 changes: 9 additions & 3 deletions src/rdkafka_conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -416,11 +416,17 @@ static const struct rd_kafka_property rd_kafka_properties[] = {
.sdef = "kafkaclient" },
{ _RK_GLOBAL, "sasl.kerberos.kinit.cmd", _RK_C_STR,
_RK(sasl.kinit_cmd),
"Kerberos kinit command path.",
.sdef = "kinit" },
"Full kerberos kinit command string, %{config.prop.name} is replaced "
"by corresponding config object value, %{broker.name} returns the "
"broker's hostname.",
.sdef = "kinit -S \"%{sasl.kerberos.service.name}/%{broker.name}\" "
"-k -t \"%{sasl.kerberos.keytab}\" %{sasl.kerberos.principal}" },
{ _RK_GLOBAL, "sasl.kerberos.keytab", _RK_C_STR,
_RK(sasl.keytab),
"Path to Kerberos keytab file. Uses system default if not set." },
"Path to Kerberos keytab file. Uses system default if not set."
"**NOTE**: This is not automatically used but must be added to the "
"template in sasl.kerberos.kinit.cmd as "
"` ... -t %{sasl.kerberos.keytab}`." },
{ _RK_GLOBAL, "sasl.kerberos.min.time.before.relogin", _RK_C_INT,
_RK(sasl.relogin_min_time),
"Minimum time in milliseconds between key refresh attempts.",
Expand Down
124 changes: 98 additions & 26 deletions src/rdkafka_sasl.c
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@

#include <sasl/sasl.h>

static mtx_t rd_kafka_sasl_kinit_lock;


/**
* Send auth message with framing.
Expand Down Expand Up @@ -210,6 +212,42 @@ int rd_kafka_sasl_io_event (rd_kafka_transport_t *rktrans, int events,



static ssize_t render_callback (const char *key, char *buf,
size_t size, void *opaque) {
rd_kafka_broker_t *rkb = opaque;

if (!strcmp(key, "broker.name")) {
char *val, *t;
size_t len;
rd_kafka_broker_lock(rkb);
rd_strdupa(&val, rkb->rkb_nodename);
rd_kafka_broker_unlock(rkb);

/* Just the broker name, no port */
if ((t = strchr(val, ':')))
len = (size_t)(t-val);
else
len = strlen(val);

if (buf)
memcpy(buf, val, RD_MIN(len, size));

return len;

} else {
rd_kafka_conf_res_t res;
size_t destsize = size;

/* Try config lookup. */
res = rd_kafka_conf_get(&rkb->rkb_rk->rk_conf, key,
buf, &destsize);
if (res != RD_KAFKA_CONF_OK)
return -1;

/* Dont include \0 in returned size */
return (destsize > 0 ? destsize-1 : destsize);
}
}


/**
Expand All @@ -222,56 +260,55 @@ int rd_kafka_sasl_io_event (rd_kafka_transport_t *rktrans, int events,
static int rd_kafka_sasl_kinit_refresh (rd_kafka_broker_t *rkb) {
rd_kafka_t *rk = rkb->rkb_rk;
int r;
char cmd[512];
char keytab[512];
char *hostname, *t;
char *cmd;
char errstr[128];

if (!rk->rk_conf.sasl.kinit_cmd || !strstr(rk->rk_conf.sasl.mechanisms, "GSSAPI"))
if (!rk->rk_conf.sasl.kinit_cmd ||
!strstr(rk->rk_conf.sasl.mechanisms, "GSSAPI"))
return 0; /* kinit not configured */

/* Build kinit refresh command line for this broker. */
rd_kafka_broker_lock(rkb);
rd_strdupa(&hostname, rkb->rkb_nodename);
rd_kafka_broker_unlock(rkb);

if ((t = strchr(hostname, ':')))
*t = '\0'; /* remove ":port" */

if (rk->rk_conf.sasl.keytab)
rd_snprintf(keytab, sizeof(keytab),
"-k -t \"%s\"",
rk->rk_conf.sasl.keytab);
else /* default path */
rd_snprintf(keytab, sizeof(keytab), "-k -i");

rd_snprintf(cmd, sizeof(cmd), "%s -S \"%s/%s\" %s %s",
rk->rk_conf.sasl.kinit_cmd,
rk->rk_conf.sasl.service_name, hostname,
keytab,
rk->rk_conf.sasl.principal);
/* Build kinit refresh command line using string rendering and config */
cmd = rd_string_render(rk->rk_conf.sasl.kinit_cmd,
errstr, sizeof(errstr),
render_callback, rkb);
if (!cmd) {
rd_rkb_log(rkb, LOG_ERR, "SASLREFRESH",
"Failed to construct kinit command "
"from sasl.kerberos.kinit.cmd template: %s",
errstr);
return -1;
}

/* Execute kinit */
rd_rkb_dbg(rkb, SECURITY, "SASLREFRESH",
"Refreshing SASL keys with command: %s", cmd);

mtx_lock(&rd_kafka_sasl_kinit_lock);
r = system(cmd);
mtx_unlock(&rd_kafka_sasl_kinit_lock);

if (r == -1) {
rd_rkb_log(rkb, LOG_ERR, "SASLREFRESH",
"SASL key refresh failed: Failed to execute %s",
cmd);
rd_free(cmd);
return -1;
} else if (WIFSIGNALED(r)) {
rd_rkb_log(rkb, LOG_ERR, "SASLREFRESH",
"SASL key refresh failed: %s: received signal %d",
cmd, WTERMSIG(r));
rd_free(cmd);
return -1;
} else if (WIFEXITED(r) && WEXITSTATUS(r) != 0) {
rd_rkb_log(rkb, LOG_ERR, "SASLREFRESH",
"SASL key refresh failed: %s: exited with code %d",
cmd, WEXITSTATUS(r));
rd_free(cmd);
return -1;
}

rd_free(cmd);

rd_rkb_dbg(rkb, SECURITY, "SASLREFRESH", "SASL key refreshed");
return 0;
}
Expand Down Expand Up @@ -579,23 +616,58 @@ void rd_kafka_broker_sasl_init (rd_kafka_broker_t *rkb) {
rd_kafka_sasl_kinit_refresh_tmr_cb, rkb);
}



int rd_kafka_sasl_conf_validate (rd_kafka_t *rk,
char *errstr, size_t errstr_size) {
rd_kafka_broker_t rkb;
char *cmd;
char tmperr[128];

if (strcmp(rk->rk_conf.sasl.mechanisms, "GSSAPI"))
return 0;

memset(&rkb, 0, sizeof(rkb));
strcpy(rkb.rkb_nodename, "ATestBroker:9092");
rkb.rkb_rk = rk;
mtx_init(&rkb.rkb_lock, mtx_plain);

cmd = rd_string_render(rk->rk_conf.sasl.kinit_cmd,
tmperr, sizeof(tmperr),
render_callback, &rkb);

mtx_destroy(&rkb.rkb_lock);

if (!cmd) {
rd_snprintf(errstr, errstr_size,
"Invalid sasl.kerberos.kinit.cmd value: %s",
tmperr);
return -1;
}

rd_free(cmd);
return 0;
}


/**
* Global SASL termination.
* NOTE: Should not be called since the application may be using SASL too.
*/
void rd_kafka_sasl_global_term (void) {
sasl_done();
mtx_destroy(&rd_kafka_sasl_kinit_lock);
}




/**
* Global SASL init, called once per runtime.
*/
int rd_kafka_sasl_global_init (void) {
int r;

mtx_init(&rd_kafka_sasl_kinit_lock, mtx_plain);

r = sasl_client_init(NULL);
if (r != SASL_OK) {
fprintf(stderr, "librdkafka: sasl_client_init() failed: %s\n",
Expand Down
2 changes: 2 additions & 0 deletions src/rdkafka_sasl.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,5 @@ void rd_kafka_broker_sasl_init (rd_kafka_broker_t *rkb);

int rd_kafka_sasl_global_init (void);

int rd_kafka_sasl_conf_validate (rd_kafka_t *rk,
char *errstr, size_t errstr_size);
Loading

0 comments on commit e1f7ff5

Please sign in to comment.