Skip to content

Commit

Permalink
Update design doc. Remove offset handling doc.
Browse files Browse the repository at this point in the history
  • Loading branch information
xianwill committed Jun 11, 2021
1 parent 00f3b34 commit 8f844c7
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 114 deletions.
8 changes: 4 additions & 4 deletions README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ RUST_LOG=debug cargo run ingest web_requests ./tests/data/web_requests \
'meta.kafka.topic: kafka.topic'
```

To clean data from previous local runs, execute `./bin/clean-example-data.sh`. You'll need to do this if you destroy your Kafka container between runs.
To clean data from previous local runs, execute `./bin/clean-example-data.sh`. You'll need to do this if you destroy your Kafka container between runs since your delta log directory will be out of sync with Kafka offsets.

==== Example Data

A tarball containing 100K line-delimited JSON messages is included in `tests/json/web_requests-100K.json.tar.gz`. `./bin/extract-example-json.sh` will unpack this to the expected location.
A tarball containing 100K line-delimited JSON messages is included in `tests/json/web_requests-100K.json.tar.gz`. Running `./bin/extract-example-json.sh` will unpack this to the expected location.

===== Pretty-printed example from the file

Expand Down Expand Up @@ -74,7 +74,7 @@ NOTE: URLs sampled for the test data are sourced from Wikipedia's list of most p
* Show some parquet data (using [parquet-tools](https://pypi.org/project/parquet-tools/))
** `parquet-tools show tests/data/web_requests/date=2021-03-24/<some file written by your example>`

== Tests
=== Developing

Make sure the docker-compose test_stack is running then execute `cargo test`.
Make sure the docker-compose test_stack is running, and execute `cargo test` to run unit and integration tests.

49 changes: 7 additions & 42 deletions doc/DESIGN.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,6 @@ Only JSON Kafka message formats will be supported in the initial implementation.

This document uses the term "job" to represent the full set of running kafka-delta-ingest resources required to sync a single Kafka topic to a Delta table. The term "process" is used to represent a single running resource that is part of a job. The relationship from topic-to-process is 1:M (to accommodate scalability and efficient resource allocation for both low-volume and high-volume topics), so a topic may have one or more processes handling it, but a single process will only handle a single topic. The relationship from job-to-process is 1:M, and the relationship from job-to-topic is 1:1.


### External Components

* Kafka
* Kafka topics shall be the primary data source of jobs
* Simple Storage Service (S3)
* S3 shall be the primary data sink of jobs - specifically delta table locations
* DynamoDb
* DynamoDb shall be used in a few different ways:
* As the backend for distributed locks used by processes for concurrency control
* As a write-ahead-log (WAL) for Kafka partition offsets where processes will manage the Kafka partition offsets associated with delta log entries.
* As a guard for S3 file writes to satisfy "atomic rename" semantics provided by local file system apis (implemented within delta-rs)

### Process Parameters

Basic required process properties (i.e. parameters with no default) must include:
Expand Down Expand Up @@ -52,7 +39,7 @@ _Figure 1_

![Jobs and Processes](./img/kafka-delta-ingest-jobs-and-processes.png)

Each kafka-delta-ingest job is fully distributed with no coordinator process. Processes will coordinate on various tasks through optimistic concurrency retries or distributed locking.
Each kafka-delta-ingest job is fully distributed with no coordinator process. Processes will coordinate on delta writes using optimistic concurrency.

---

Expand Down Expand Up @@ -98,45 +85,23 @@ This section lists the crate repositories that are most important to the total j
* [arrow](https://github.com/apache/arrow/tree/master/rust/arrow)
* [parquet](https://github.com/apache/arrow/tree/master/rust/parquet)
* [rdkafka](https://github.com/fede1024/rust-rdkafka)
* [rusoto](https://github.com/rusoto/rusoto)
* [dipstick](https://github.com/fralalonde/dipstick)
* [tokio](https://github.com/tokio-rs/tokio)


### End-to-end Transactions / Kafka Offset Handling

This section describes how end-to-end transactions that account for Kafka partition offsets should be committed in a way that:

1. prevents duplication from re-processing of already written messages and
2. still allows existing tools like Burrow to be used for monitoring consumer lag.

On startup and after rebalance, each kafka-delta-ingest process must check its assigned partitions. Each process must maintain an internal map of assigned partitions and current offsets. [Kafka’s group management protocol guarantees that individual consumers within a group will be assigned a mutually exclusive set of partitions.

Upon startup or partition assignment (in case of rebalnce), to identify the last offset written to Delta Lake for each assigned partition, the kafka-delta-ingest process must locate the last txn action in the delta log and then query the partition offset write-ahead-log (WAL) to locate the partition offsets where the write-ahead-log version matches the txn.version attribute of the delta log.

Prior to writing a delta transaction log entry, each process must first record the offsets it intends to commit to the partition offset WAL with a started state. Since each process only handles a subset of partitions, it must first read the last completed transaction state record from the WAL, and merge its assigned partitions with the global partition offset map therein. After the first partition offsets WAL write, the process must then commit the delta transaction log entry, and then update the partition offset WAL record to completed.
This section describes how end-to-end transactions that account for Kafka partition offsets should be committed in a way that prevents duplication from re-processing of already written messages.

When preparing to write a transaction, each process must first acquire a distributed lock before writing the next transaction state record to the partition offset WAL. After committing or failing the associated delta transaction, the process must release the lock so other processes can resume use of the partition offset WAL. The full details of the distributed lock design are not fully fleshed out by this document, but may take inspiration from the dynamo-db-lock-client java implementation. Rust crate implementations of distributed locking may be considered as well rather than implementing a home grown solution (e.g. see [dynalock](https://crates.io/crates/dynalock)).
On startup and after rebalance, each kafka-delta-ingest process must check its assigned partitions and re-seek the consumer when necessary. Each process must maintain an internal map of assigned partitions and current offsets. Kafka’s group management protocol guarantees that individual consumers within a group will be assigned a mutually exclusive set of partitions at any given time.

To support using existing open-source tools for monitoring Kafka consumer lag, after completing the distributed transaction against the partition offset WAL and delta log, the process must commit its offsets to Kafka. A failure to commit Kafka offsets after the transaction completes is not protected by the transaction flow. This is tolerable since
Upon startup or partition assignment (in case of rebalnce), to identify the last offset written to Delta Lake for each assigned partition, the kafka-delta-ingest process must locate the last txn action in the delta log for each of its assigned partitions and re-seek the consumer based on the txn.version attribute of the delta log.

1. the WAL is used as the source of truth for topic partition offsets and processes will seek to those offsets instead of relying on the `__consumer_offsets` topic
2. this type of failure should occur rarely (only on system failure) so consumer lag metrics will only be out-of-sync until the next successful write for each partition. Essentially, the value stored in `__consumer_offsets` is a monitoring metric only and the WAL is the ultimate source of truth.
When performing a write, each process must commit the last offset of each partition within a txn action contained in the delta log entry.

Upon distributed lock timeout, processes will need to compete to acquire the lock and resolve the current state. If the txn version listed in the partition offset WAL has already been committed to Delta, the correct resolution is to complete the partition offset WAL entry. If a Delta transaction is not committed, the correct resolution is to mark the partition offset WAL entry as failed and forget about the written Delta files.

Figure 2 shows an interaction diagram between kafka-delta-ingest and external system resources around the transaction flow. Each green numbered circle represents the order of the interaction within the flow.
Figure 2 shows a workflow diagram of these steps.

_Figure 2_

![System Interaction](./img/kafka-delta-ingest-interaction.png)

Figure 3 shows a candidate logical diagram of the partition offset WAL table schema in pseudo-Chen notation.

_Figure 3_

![Transaction State](./img/kafka-delta-ingest-transaction-state.png)

The distributed lock schema is not pictured in this design document, but may take inspiration from the [dynamo-db-lock-client](https://github.com/awslabs/amazon-dynamodb-lock-client) Java implementation.

![Process Workflow](./img/kafka-delta-ingest-workflow.png)

68 changes: 0 additions & 68 deletions doc/OFFSET_HANDLING.adoc

This file was deleted.

0 comments on commit 8f844c7

Please sign in to comment.