2022-09-26: This project has moved to a top-level repository at https://github.com/googleapis/java-pubsub-group-kafka-connector/ for future releases of the Pub/Sub Kafka connector. See #325 for the announcement.
The CloudPubSubConnector is a connector to be used with Kafka Connect to publish messages from Kafka to Google Cloud Pub/Sub or Pub/Sub Lite and vice versa.
CloudPubSubSinkConnector provides a sink connector to copy messages from Kafka to Google Cloud Pub/Sub. CloudPubSubSourceConnector provides a source connector to copy messages from Google Cloud Pub/Sub to Kafka. PubSubLiteSinkConnector provides a sink connector to copy messages from Kafka to Pub/Sub Lite. PubSubLiteSourceConnector provides a source connector to copy messages from Pub/Sub Lite to Kafka.
-
Regardless of whether you are running on Google Cloud Platform or not, you need to create a project and create service key that allows you access to the Cloud Pub/Sub API's and default quotas.
-
Create project on Google Cloud Platform. By default, this project will have multiple service accounts associated with it (see "IAM & Admin" within GCP console). Within this section, find the tab for "Service Accounts". Create a new service account and make sure to select "Furnish a new private key". Doing this will create the service account and download a private key file to your local machine.
-
Go to the "IAM" tab, find the service account you just created and click on the dropdown menu named "Role(s)". Under the "Pub/Sub" submenu, select "Pub/Sub Admin". Finally, the key file that was downloaded to your machine needs to be placed on the machine running the framework. An environment variable named GOOGLE_APPLICATION_CREDENTIALS must point to this file. (Tip: export this environment variable as part of your shell startup file).
export GOOGLE_APPLICATION_CREDENTIALS=/path/to/key/file
You can download copy_tool.py
, a single-file python script which downloads,
sets up and runs the kafka connector in a single-machine configuration. This
script requires:
- python >= 3.5
- requests installed
JAVA_HOME
configured properlyGOOGLE_APPLICATION_CREDENTIALS
set- A properties file with connector.class and other properties set
It can be invoked on mac/linux with:
python3 path/to/copy_tool.py --bootstrap_servers=MY_KAFKA_SERVER,OTHER_SERVER --connector_properties_file=path/to/connector.properties
or windows with:
python3 path\to\copy_tool.py --bootstrap_servers=MY_KAFKA_SERVER,OTHER_SERVER --connector_properties_file=path\to\connector.properties
A pre-built uber-jar is available for download with the latest release.
You can also build the connector from head, as described below.
-
Copy the pubsub-kafka-connector.jar to the place where you will run your Kafka connector.
-
Create a configuration file for the Pub/Sub connector and copy it to the place where you will run Kafka connect. The configuration should set up the proper Kafka topics, Pub/Sub topic, and Pub/Sub project. For Pub/Sub Lite, this should also set the correct location (google cloud zone). Sample configuration files for the source and sink connectors are provided at configs/.
-
Create an appropriate configuration for your Kafka connect instance. More information on the configuration for Kafka connect can be found in the Kafka Users Guide.
-
If running the Kafka Connector behind a proxy, you need to export the KAFKA_OPTS variable with options for connecting around the proxy. You can export this variable as part of a shell script in order ot make it easier. Here is an example:
export KAFKA_OPTS="-Dhttp.proxyHost=<host> -Dhttp.proxyPort=<port> -Dhttps.proxyHost=<host> -Dhttps.proxyPort=<port>"
In addition to the configs supplied by the Kafka Connect API, the Cloud Pub/Sub Connector supports the following configs:
Config | Value Range | Default | Description |
---|---|---|---|
cps.subscription | String | REQUIRED (No default) | The name of the subscription to Cloud Pub/Sub, e.g. "sub" for subscription "/projects/bar/subscriptions/sub". |
cps.project | String | REQUIRED (No default) | The project containing the topic from which to pull messages, e.g. "bar" from above. |
cps.endpoint | String | "pubsub.googleapis.com:443" | The Cloud Pub/Sub endpoint to use. |
kafka.topic | String | REQUIRED (No default) | The topic in Kafka which will receive messages that were pulled from Cloud Pub/Sub. |
cps.maxBatchSize | Integer | 100 | The maximum number of messages to batch per pull request to Cloud Pub/Sub. |
cps.makeOrderingKeyAttribute | Boolean | false | When true, copy the ordering key to the set of attributes set in the Kafka message. |
kafka.key.attribute | String | null | The Cloud Pub/Sub message attribute to use as a key for messages published to Kafka. If set to "orderingKey", use the message's ordering key. |
kafka.partition.count | Integer | 1 | The number of Kafka partitions for the Kafka topic in which messages will be published to. NOTE: this parameter is ignored if partition scheme is "kafka_partitioner". |
kafka.partition.scheme | round_robin, hash_key, hash_value, kafka_partitioner, ordering_key | round_robin | The scheme for assigning a message to a partition in Kafka. The scheme "round_robin" assigns partitions in a round robin fashion, while the schemes "hash_key" and "hash_value" find the partition by hashing the message key and message value respectively. "kafka_partitioner" scheme delegates partitioning logic to kafka producer, which by default detects number of partitions automatically and performs either murmur hash based partition mapping or round robin depending on whether message key is provided or not. "ordering_key" uses the hash code of a message's ordering key. If no ordering key is present, uses "round_robin". |
gcp.credentials.file.path | String | Optional | The file path, which stores GCP credentials. |
gcp.credentials.json | String | Optional | GCP credentials JSON blob |
kafka.record.headers | Boolean | false | Use Kafka record headers to store Pub/Sub message attributes |
Config | Value Range | Default | Description |
---|---|---|---|
cps.topic | String | REQUIRED (No default) | The topic in Cloud Pub/Sub to publish to, e.g. "foo" for topic "/projects/bar/topics/foo". |
cps.project | String | REQUIRED (No default) | The project in Cloud Pub/Sub containing the topic, e.g. "bar" from above. |
cps.endpoint | String | "pubsub.googleapis.com:443" | The Cloud Pub/Sub endpoint to use. |
maxBufferSize | Integer | 100 | The maximum number of messages that can be received for the messages on a topic partition before publishing them to Cloud Pub/Sub. |
maxBufferBytes | Long | 10000000 | The maximum number of bytes that can be received for the messages on a topic partition before publishing them to Cloud Pub/Sub. |
maxOutstandingRequestBytes | Long | Long.MAX_VALUE | The maximum number of total bytes that can be outstanding (including incomplete and pending batches) before the publisher will block further publishing. |
maxOutstandingMessages | Long | Long.MAX_VALUE | The maximum number of messages that can be outstanding (including incomplete and pending batches) before the publisher will block further publishing. |
maxDelayThresholdMs | Integer | 100 | The maximum amount of time to wait to reach maxBufferSize or maxBufferBytes before publishing outstanding messages to Cloud Pub/Sub. |
maxRequestTimeoutMs | Integer | 10000 | The timeout for individual publish requests to Cloud Pub/Sub. |
maxTotalTimeoutMs | Integer | 60000 | The total timeout for a call to publish (including retries) to Cloud Pub/Sub. |
gcp.credentials.file.path | String | Optional | The file path, which stores GCP credentials. |
gcp.credentials.json | String | Optional | GCP credentials JSON blob |
metadata.publish | Boolean | false | When true, include the Kafka topic, partition, offset, and timestamp as message attributes when a message is published to Cloud Pub/Sub. |
headers.publish | Boolean | false | When true, include any headers as attributes when a message is published to Cloud Pub/Sub. |
orderingKeySource | String (none, key, partition) | none | When set to "none", do not set the ordering key. When set to "key", uses a message's key as the ordering key. If set to "partition", converts the partition number to a String and uses that as the ordering key. Note that using "partition" should only be used for low-throughput topics or topics with thousands of partitions. |
In addition to the configs supplied by the Kafka Connect API, the Pub/Sub Lite Connector supports the following configs:
Config | Value Range | Default | Description |
---|---|---|---|
pubsublite.subscription | String | REQUIRED (No default) | The name of the subscription to Pub/Sub Lite, e.g. "sub" for subscription "/projects/bar/locations/europe-south7-q/subscriptions/sub". |
pubsublite.project | String | REQUIRED (No default) | The project in Pub/Sub Lite containing the subscription, e.g. "bar" from above. |
pubsublite.location | String | REQUIRED (No default) | The location in Pub/Sub Lite containing the subscription, e.g. "europe-south7-q" from above. |
kafka.topic | String | REQUIRED (No default) | The topic in Kafka which will receive messages that were pulled from Pub/Sub Lite. |
pubsublite.partition_flow_control.messages | Long | Long.MAX_VALUE | The maximum number of outstanding messages per Pub/Sub Lite partition. |
pubsublite.partition_flow_control.bytes | Long | 20,000,000 | The maximum number of outstanding bytes per Pub/Sub Lite partition. |
Config | Value Range | Default | Description |
---|---|---|---|
pubsublite.topic | String | REQUIRED (No default) | The topic in Pub/Sub Lite to publish to, e.g. "foo" for topic "/projects/bar/locations/europe-south7-q/topics/foo". |
pubsublite.project | String | REQUIRED (No default) | The project in Pub/Sub Lite containing the topic, e.g. "bar" from above. |
pubsublite.location | String | REQUIRED (No default) | The location in Pub/Sub Lite containing the topic, e.g. "europe-south7-q" from above. |
A pubsub message has two main parts: the message body and attributes. The message body is a ByteString object that translates well to and from the byte[] bodies of Kafka messages. For this reason, we recommend using a converter that produces primitive data types (i.e. integer, float, string, or bytes schema) where possible to prevent deserializing and reserializing the same message body.
Additionally, a Pubsub message size cannot exceed 10MB, so please check your broker's message.max.bytes configuration to prevent possible errors.
The sink connector handles the conversion in the following way:
- For integer, float, string, and bytes schemas, the bytes of the Kafka message's value are passed directly into the Pubsub message body.
- For map and struct types, the values are stored in attributes. Pubsub only
supports string to string mapping in attributes. To make the connector as
versatile as possible, the toString() method will be called on whatever
object passed in as the key or value for a map and the value for a struct.
- One additional feature is we allow a specification of a particular field or key to be placed in the Pubsub message body. To do so, set the messageBodyName configuration with the struct field or map key. This value is stored as a ByteString, and any integer, byte, float, or array type is included in the message body as if it were the sole value.
- For arrays, we only support primitive array types due to potential collisions of field names or keys of a struct/map array. The connector handles arrays in a fairly predictable fashion, each value is concatenated together into a ByteString object.
- In all cases, the Kafka key value is stored in the Pubsub message's attributes as a string, currently "key".
IMPORTANT NOTICE: There are three limitations to keep in mind when using Pubsub message attributes as stated on its documentation
- "Attributes per message: 100"
- "Attribute key size: 256 bytes"
- "Attribute value size: 1024 bytes"
If you enable copy of Kafka headers as Pubsub message attribute (it is disabled by default), the connector will copy only those headers meeting these limitations and will skip those that do not.
The source connector takes a similar approach in handling the conversion from a Pubsub message into a SourceRecord with a relevant Schema.
- The connector searches for the given kafka.key.attribute in the attributes of the Pubsub message. If found, this will be used as the Kafka key with a string schema type. Otherwise, it will be set to null.
- If the Pubsub message doesn't have any other attributes, the message body is stored as a byte[] for the Kafka message's value.
- However, if there are attributes beyond the Kafka key, the value is assigned
a struct schema. Each key in the Pubsub message's attributes map becomes a
field name, with the values set accordingly with string schemas. In this
case, the Pubsub message body is identified by the field name set in
"message", and has the schema types bytes.
- In these cases, to carry forward the structure of data stored in attributes, we recommend using a converter that can represent a struct schema type in a useful way, e.g. JsonConverter.
Pub/Sub Lite's messages have the following structure:
class Message {
ByteString key;
ByteString data;
ListMultimap<String, ByteString> attributes;
Optional<Timestamp> eventTime;
}
This maps quite closely to the SinkRecord class, except for serialization. The table below shows how each field in SinkRecord will be mapped to the underlying message:
SinkRecord | Message |
---|---|
key{Schema} | key |
value{Schema} | data |
headers | attributes |
topic | attributes["x-goog-pubsublite-source-kafka-topic"] |
kafkaPartition | attributes["x-goog-pubsublite-source-kafka-partition"] |
kafkaOffset | attributes["x-goog-pubsublite-source-kafka-offset"] |
timestamp | eventTime |
timestampType | attributes["x-goog-pubsublite-source-kafka-event-time-type"] |
When a key, value or header value with a schema is encoded as a ByteString, the following logic will be used:
- null schemas are treated as Schema.STRING_SCHEMA
- Top level BYTES payloads are unmodified.
- Top level STRING payloads are encoded using copyFromUtf8.
- Top level Integral payloads are converted using copyFromUtf8(Long.toString(x.longValue()))
- Top level Floating point payloads are converted using copyFromUtf8(Double.toString(x.doubleValue()))
- All other payloads are encoded into a protobuf Value, then converted to a ByteString.
- Nested STRING fields are encoded into a protobuf Value.
- Nested BYTES fields are encoded to a protobuf Value holding the base64 encoded bytes.
- Nested Numeric fields are encoded as a double into a protobuf Value.
- Maps with Array, Map, or Struct keys are not supported.
- BYTES keys in maps are base64 encoded.
- Integral keys are converted using Long.toString(x.longValue())
- Floating point keys are converted using Double.toString(x.doubleValue())
The source connector will perform a one to one mapping from SequencedMessage fields to their SourceRecord counterparts.
In addition, empty message.key
fields will be converted to null
and assigned
round-robin to kafka partitions. Messages with identical, non-empty keys will be
routed to the same kafka partition.
SequencedMessage | SourceRecord field | SourceRecord schema |
---|---|---|
message.key | key | BYTES |
message.data | value | BYTES |
message.attributes | headers | BYTES |
sourcePartition["topic"] | String field in map | |
sourcePartition["partition"] | Integer field in map | |
cursor.offset | sourceOffset["offset"] | Long field in map |
message.event_time | timestamp | long milliseconds since unix epoch if present |
publish_time | timestamp | long milliseconds since unix epoch if no event_time exists |
These instructions assume you are using Maven.
-
If you want to build the connector from head, clone the repository, ensuring to do so recursively to pick up submodules:
git clone --recursive https://github.com/GoogleCloudPlatform/pubsub
If you wish to build from a released version of the connector, download it from the Releases section in GitHub.
-
Unzip the source code if downloaded from the release version.
-
Go into the kafka-connector directory in the cloned repo or downloaded release.
-
Make the jar that contains the connector:
mvn package
The resulting jar is at target/pubsub-kafka-connector.jar.