Deploys and configures a test/validation environment for the Debezium (PostgreSQL®) connector with Apache Kafka service integration
- I.e.: this project deploys and configures:
- Apache Kafka service
- Apache Kafka Connector service
- PostgreSQL® service
- Not suitable for environments higher than test/dev
- This project's Terraform does not leverage remote encrypted locking statefiles, etc.
-
Terraform ver 14.x+ installed
-
aiven-client
(latest version) installed and configured for use with an Aiven Authentication token- So, your Aiven Cli config files should look like:
cat ~/.config/aiven/aiven-client.json { "default_project": "test-debezium" } cat ~/.config/aiven/aiven-credentials.json | sed 's/{//g; s/}//g' { "auth_token": "YOUR_AIVEN_SERVICE_TOKEN_HERE" "user_email": "firstname.lastname@company.org" }
- Note for more information about token use, please see aiven-client#authenticate-logins-and-tokens
- So, your Aiven Cli config files should look like:
-
Now update the
avn_api_token
variable in the/terraform/.secrets.tfvars
file with your Aiven token (which you just configured in the above step). You can just rename the sample file:cd ./terraform && mv .secrets.tfvars.example .secrets.tfvars
and update it with your token.
1. Update terraform/.auto.tfvars with your desired variable values
- Make sure to use a valid project_id
cat terraform/.auto.tfvars
# Postgres
avn_pg_svc_project_id = "test-demo"
avn_pg_svc_cloud = "google-us-central1"
avn_pg_svc_plan = "business-4"
avn_pg_svc_name = "test-pg-debezium-gcp"
avn_pg_svc_window_dow = "friday"
avn_pg_svc_window_time = "12:00:00"
avn_pg_svc_version = "12"
# Kafka
avn_kafka_svc_version = "2.7"
avn_kafka_svc_project_id = "test-demo"
avn_kafka_svc_cloud = "google-us-central1"
avn_kafka_svc_plan = "business-4"
avn_kafka_connect_svc_plan = "startup-4"
avn_kafka_svc_name = "test-kafka-debezium-gcp"
avn_kafka_connector_svc_name = "test-kafka-connector-debezium-gcp"
2. Execute one terraform wrapper script to deploy and configure all requisite resources.
./bin/deploy-terraform-infra.sh
3. start the replication slot monitor lag script in one terminal:
python bin/python_scripts/replication_slot_monitor.py --verbose --sleep 10
- Note that in the event of a database failover, the script will continue to retry the connection:
ERROR replication_slot_monitor: Failed connecting to the PG server. Retrying after 5 seconds...
- There is a helper python script that generates data to a test Postgresql table and verifies that the changes are captured by Debezium. Verification is done simply by consuming from the Kafka topic to which the connector writes the CDC events to and then checking if these records matches the ids of the records that were inserted into the source database table.
- The test database table is called
test
and it is hardcoded inbin/config-debezium-pg-kafka.sh
andbin/python_scripts/debezium_pg_producer.py
.
Consuming from the Kafka topic and inserting data into the test table are done in separate Python threads.
4. Start the PG producer script in another terminal:
python bin/python_scripts/debezium_pg_producer.py --verbose --sleep 3
Notes
- The script accepts the following optional arguments:
--table TABLE The table into which we write test data. Default is "test" table.
--sleep SLEEP Delay between inserts. Default 1 second. Used to control data flow
--iterations ITERATIONS How many inserts before closing program. Defaults to 10000 inserts.
--drop-table Drop the test table before inserting new data into it. Default false.
--check-debezium-slot Only insert if the Debezium slot is active, to guarantee no data loss if PG fails over. Default false.
--verbose Sets the log level to DEBUG. Default log level is WARN
- When a database failover event happens and the script cannot connect to the database to insert data, you will see this log entry:
ERROR debezium_pg_producer: Postgres data insert: Failed connecting to the PG server. Retrying after 5 seconds...
After 10-15 seconds the insert thread should resume. The Kafka consumer thread however might fail to get new records (will show INFO debezium_pg_producer: Kafka Consumer exited
) when the Debezium connector fails to resume.
To end the script, press Ctrl + C once: you will then get a log message if Debezium has successfully captured all the changes since the script started, or did it miss some records:
Kafka Consumer: Some inserts were not captured by debezium
or
Kafka Consumer: All inserts were captured by debezium
During testing we have noticed that the replication slot can be inactive for ~15-20 minutes: this means that the connector isn't capturing any changes. What is interesting however is that the connector appears to run fine. It only has failed at the time when the PG database failover happened--with a failing error message like this:
ERROR Producer failure
org.postgresql.util.PSQLException: Database connection failed when writing to copy
or
ERROR Could not execute heartbeat action
After that the connector's status shows as running, and no more error messages are outputted. But the replication slot to which it consumes is still inactive :-?
We couldn't find a sure way to reproduce this consistently, as it seems to occur randomly during failover events.
-
To get a more detailed log output, we can change the logging level of the running Debezium connector to TRACE level using this command:
./bin/set_debezium_connector_logging_level.sh io.debezium.connector.postgresql trace
-
To get the current logging levels of all loggers in our Kafka Connect cluster, just run the above script again with no arguments:
./bin/set_debezium_connector_logging_level.sh
- In addition to our python test/validation scripts, here are a few SQL examples:
-
We should see 't' (true) under the 'active' column for each slot.
SELECT * from pg_replication_slots;
-
Show/monitor how much lag we have behind the slots:
SELECT redo_lsn, slot_name,restart_lsn, round((redo_lsn-restart_lsn) / 1024 / 1024 / 1024, 2) AS GB_behind FROM pg_control_checkpoint(), pg_replication_slots;
-
- Clean up / Destroy All terraform infrastructure deployed via our wrapper script in step 2
./bin/DESTROY-terraform-infra.sh
echo "ensure all resources terminated:"
./bin/show-all-terraform-infra.sh
Kafka consumer in debezium_pg_producer.py
not resuming after PG failover
Fails with:
Exception ignored in: <function ConsumerCoordinator.__del__ at 0x7f64090de3a0>
...
RuntimeError: cannot join current thread
Solution for now is to re-run the debezium_pg_producer.py
script.
- continue with pg data scripts and validate data flow through Apache Kafka via debezium.
- possibly automate the triggering of maintenance and or fail-over events with scaling up/down
- use env variables to specify region and project name for use in config-debezium-pg-kafka (right now hardcoded in .auto.tfvars)
Apache Kafka, Apache Kafka Connect are either registered trademarks or trademarks of the Apache Software Foundation in the United States and/or other countries. Debezium, PostgreSQL and Terraform are trademarks and property of their respective owners. All product and service names used in this website are for identification purposes only and do not imply endorsement.