Skip to content

Commit

Permalink
Update the README with Azure and Kafka SSL information
Browse files Browse the repository at this point in the history
  • Loading branch information
thovoll authored and rtyler committed Apr 4, 2023
1 parent 0bcc8c2 commit 85c06cf
Showing 1 changed file with 154 additions and 7 deletions.
161 changes: 154 additions & 7 deletions README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -33,21 +33,39 @@ export AWS_ACCESS_KEY_ID=test
export AWS_SECRET_ACCESS_KEY=test

RUST_LOG=debug cargo run ingest web_requests ./tests/data/web_requests \
-l 60 \
-a web_requests \
-t 'date: substr(meta.producer.timestamp, `0`, `10`)' \
'meta.kafka.offset: kafka.offset' \
'meta.kafka.partition: kafka.partition' \
'meta.kafka.topic: kafka.topic' \
-o earliest
--allowed_latency 60 \
--app_id web_requests \
--transform 'date: substr(meta.producer.timestamp, `0`, `10`)' \
'meta.kafka.offset: kafka.offset' \
'meta.kafka.partition: kafka.partition' \
'meta.kafka.topic: kafka.topic' \
--auto_offset_reset earliest
```

Notes:

* The AWS_* environment variables are for S3 and are required by the delta-rs library.
** Above, AWS_ENDPOINT_URL points to localstack.
* The Kafka broker is assumed to be at localhost:9092, use -k to override.
* To clean data from previous local runs, execute `./bin/clean-example-data.sh`. You'll need to do this if you destroy your Kafka container between runs since your delta log directory will be out of sync with Kafka offsets.

==== Kafka SSL

In case you have Kafka topics secured by SSL client certificates, you can specify these secrets as environment variables.

For the cert chain include the PEM content as an environment variable named `KAFKA_DELTA_INGEST_CERT`.
For the cert private key include the PEM content as an environment variable named `KAFKA_DELTA_INGEST_KEY`.

These will be set as the `ssl.certificate.pem` and `ssl.key.pem` Kafka settings respectively.

Make sure to provide the additional option:

```
-K security.protocol=SSL
```

when invoking the cli command as well.

==== Example Data

A tarball containing 100K line-delimited JSON messages is included in `tests/json/web_requests-100K.json.tar.gz`. Running `./bin/extract-example-json.sh` will unpack this to the expected location.
Expand Down Expand Up @@ -80,6 +98,129 @@ NOTE: URLs sampled for the test data are sourced from Wikipedia's list of most p
* Show some parquet data (using link:https://pypi.org/project/parquet-tools/[parquet-tools])
** `parquet-tools show tests/data/web_requests/date=2021-03-24/<some file written by your example>`

=== Using Azure Event Hubs

Azure Event Hubs (with pricing tier "Standard" or higher) has a Kafka Surface that can be used with kafka-delta-ingest.

Azure Event Hubs doesn't have a local emulator, so an actual Azure Event Hubs resource is required. As a result, there's no need for the docker-compose application described above.

More info:

* https://docs.microsoft.com/en-us/azure/event-hubs/apache-kafka-migration-guide
* https://docs.microsoft.com/en-us/azure/event-hubs/apache-kafka-troubleshooting-guide
* https://docs.microsoft.com/en-us/azure/event-hubs/apache-kafka-configurations#librdkafka-configuration-properties
* https://github.com/Azure/azure-event-hubs-for-kafka/blob/master/CONFIGURATION.md#librdkafka-configuration-properties
* https://github.com/edenhill/librdkafka/wiki/Using-SASL-with-librdkafka
* https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
* https://github.com/edenhill/librdkafka/issues/3109


==== Starting Worker Processes

1. link:https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-create[Create] an Azure Event Hubs Namespace and within it, an Event Hub (which corresponds to a Kafka topic).

2. Set these environment variables, they are required by the delta-rs library:
* `AZURE_STORAGE_ACCOUNT_NAME` (just the storage account name, not the FQDN)
* `AZURE_STORAGE_ACCOUNT_KEY` (just the key, not the connection string)

3. Create the `_delta_log` directory in the `web_requests` directory in Azure Storage and upload the link:https://github.com/delta-io/kafka-delta-ingest/blob/main/tests/data/web_requests/_delta_log/00000000000000000000.json[first Delta transaction containing the schema] to this directory.

4. In the docker command below, replace the following placeholders with your values:
* `AZURE_STORAGE_ACCOUNT_NAME` (just the storage account name, not the FQDN)
* `AZURE_STORAGE_ACCOUNT_KEY` (just the key, not the connection string)
* `EVENTHUBS_NAMESPACE_NAME` (just the namespace name, not the FQDN)
* `EVENTHUBS_KEY_NAME`
* `EVENTHUBS_KEY`

5. Build the docker image

```
docker build -t kdi:0.1 . -f Dockerfile-Debian
```

Notes:

* If this takes a long time, make sure that docker has enough memory

6. Execute this docker command to run kafka-delta-ingest

```
docker run -it --network=host ^
-e RUST_LOG="debug" ^
-e SSL_CERT_FILE=/etc/ssl/certs/ca-certificates.crt ^
-e AZURE_STORAGE_ACCOUNT_NAME={AZURE_STORAGE_ACCOUNT_NAME} ^
-e "AZURE_STORAGE_ACCOUNT_KEY={AZURE_STORAGE_ACCOUNT_KEY}" ^
kdi:0.1 ^
ingest web_requests adls2://{AZURE_STORAGE_ACCOUNT_NAME}/{FILESYSTEM_NAME}/web_requests ^
--allowed_latency 5 ^
--kafka thovoll-kdi-eh.servicebus.windows.net:9093 ^
--Kafka security.protocol=SASL_SSL ^
--Kafka sasl.mechanism=PLAIN ^
--Kafka sasl.username=$ConnectionString ^
--Kafka sasl.password=Endpoint=sb://{EVENTHUBS_NAMESPACE_NAME}.servicebus.windows.net/;SharedAccessKeyName={EVENTHUBS_KEY_NAME};SharedAccessKey={EVENTHUBS_KEY} ^
--Kafka socket.keepalive.enable=true ^
--Kafka metadata.max.age.ms=180000 ^
--Kafka heartbeat.interval.ms=3000 ^
--Kafka session.timeout.ms=30000 ^
--Kafka debug=broker,security,protocol ^
--app_id web_requests ^
--transform "date: substr(meta.producer.timestamp, `0`, `10`)" ^
--transform "meta.kafka.offset: kafka.offset" ^
--transform "meta.kafka.partition: kafka.partition" ^
--transform "meta.kafka.topic: kafka.topic" ^
--auto_offset_reset earliest
```

Notes:

* In the docker command:
** The `sasl.username` is the literal string `$ConnectionString` and not a placeholder.
** The following `--Kafka` arguments are taken from link:https://docs.microsoft.com/en-us/azure/event-hubs/apache-kafka-configurations#librdkafka-configuration-properties[here]:
*** `socket.keepalive.enable=true`
*** `metadata.max.age.ms=180000`
*** `heartbeat.interval.ms=3000`
*** `session.timeout.ms=30000`

==== Sending data to Event Hubs

On Windows, link:https://github.com/paolosalvatori/ServiceBusExplorer[Service Bus Explorer] can be used to send data to Event Hubs.

The following payload should be sent for the web_requests Delta table:

```json
{
"status": 200,
"session_id": "7c28bcf9-be26-4d0b-931a-3374ab4bb458",
"method": "GET",
"meta": {
"producer": {
"timestamp": "2021-03-24T15:06:17.321710+00:00"
}
},
"uuid": "831c6afa-375c-4988-b248-096f9ed101f8",
"url": "http://www.example.com"
}
```

==== Verifying data from Event Hub using kcat

kcat can be run on Windows via docker using this command, which will print the last message (-o -1).

Make sure to first replace the following placeholders:

* `EVENTHUBS_NAMESPACE_NAME` (just the namespace name, not the FQDN)
* `EVENTHUBS_KEY_NAME`
* `EVENTHUBS_KEY`

```
docker run -it --network=host edenhill/kcat:1.7.1 -C -o -1 -b {EVENTHUBS_NAMESPACE_NAME}.servicebus.windows.net:9093 -t web_requests -X security.protocol=SASL_SSL -X sasl.mechanism=PLAIN -X sasl.username=$ConnectionString -X sasl.password=Endpoint=sb://{EVENTHUBS_NAMESPACE_NAME}.servicebus.windows.net/;SharedAccessKeyName={EVENTHUBS_KEY_NAME};SharedAccessKey={EVENTHUBS_KEY} -X socket.keepalive.enable=true -X metadata.max.age.ms=180000 -X heartbeat.interval.ms=3000 -X session.timeout.ms=30000
```

Notes:

* The following configuration settings in the command above are taken from link:https://docs.microsoft.com/en-us/azure/event-hubs/apache-kafka-configurations#librdkafka-configuration-properties[here]:
`-X socket.keepalive.enable=true -X metadata.max.age.ms=180000 -X heartbeat.interval.ms=3000 -X session.timeout.ms=30000`

== Kafka SSL

In case you have Kafka topics secured by SSL client certificates, you can specify these secrets as environment variables.
Expand Down Expand Up @@ -115,6 +256,12 @@ aws dynamodb create-table --table-name delta_rs_lock_table \
```

For more information, see link:https://github.com/delta-io/delta-rs/tree/dbc2994c5fddfd39fc31a8f9202df74788f59a01/dynamodb_lock[DynamoDB lock].
==== Verifying data in Azure Storage

Use the Azure Portal to browse the file system:

* Data files: `web_requests/date=2021-03-24`
* Delta log files: `web_requests/_delta_log`

== Developing

Expand Down

0 comments on commit 85c06cf

Please sign in to comment.