Skip to content

PDP 28 (Cross Routing Key Ordering)

Derek Moore edited this page Jul 9, 2021 · 1 revision

Motivation

Today on the reader, when data is coming from multiple segments, there is no way to maintain even approximate order. Events are pulled from segment and given to the reading application based what segment has more data in memory. This can result in some parts of the keyspace getting very far ahead of others. Unless they merge down to a common segment there will never be any event that causes them to adjust to be closer to one another. So the longer a reader group has been running the more out of order its events become. While we do not provide any guarantees that this violates, it represents a significant inconvenience to many applications.

Goal

To achieve best effort ordering on a cross routing key basis.

Non-Goals

This proposal is narrow in focus and is not attempting to solve some related problems:

  • Put hard bounds on how out of order events can be.
  • Provide a solution for event time.
  • Provide a solution for watermarking in the Flink connector.

These are important in their own right, and may be relevant to the discussion of adoption, but are out of scope for the proposal itself.

Method

A time index is stored on the server, and the values passed to the client. These are used to determine the order of events on a cross segment basis and provide weak fairness.

API changes

None.

Wire protocol changes

In the SegmentRead reply from the server to the client we would add an additional field that would contain a number indicating how the data returned should be ordered relative to other data. It would additionally have a token for where the read left off.

In StreamSegmentInfo we add a field that contains the current value that is to be used for data if it were to be added to the segment now. In ReadSegment we would add an additional field of the token that was returned from the previous SegmentRead.

Compatibility

This change would be backwards compatible, as we can just infer a value of 0 for the ordering field if the field is missing. This would prevent the feature from working, but would be no different than if it did not exist. If the token is missing this means the server does not have the new feature and the client will simply not provide the token it it's ReadSegment call.

Client changes

Changes in the client on the writer side

None.

Change in the client on the reader side

The reader keeps a buffer from each segment that it is reading from and selects which one to read from in response to a readNextEvent() call using a component called Orderer. Currently the only criteria this class takes into account is the amount of data in memory.

In SegmentInputStreamImpl on the client, it would track the ordering information returned from the server and expose the current value via its interface so that Orderer could check if one segment is ahead or behind another. A reasonable initial implementation would be for the orderer to select the segment returning the lowest value.

Additionally, in AsyncSegmentInputStreamImpl, it would hold in memory the token it received from the last SegmentRead and pass it into the next ReadSegment request.

In EventStreamReaderImpl, the getLag() function can return the data from the time index. This will fulfill Issue 191. The getLag() function is already used by the ReaderGroupStateManager to determine how to rebalance segments, but this would anchor it to be much more accurate.

Server side changes

In every StreamSegmentAppendOperation (on non-transaction segments) and every MergeTransactionOperation written to Tier-1 we would add an additional long. This value would be obtained from a monotonic clock class.

The clock would be shared across segment and be at approximately the same value across hosts. This could be derived from a wall clock enforcing monotonically, or be pulled from a central source and tracked using nanotime for deltas.

In OperationProcessor, we would update a new component of metadata for the segment on append. This would consist of offset, clock pairs using the value that was written to tier-1 for the last DataFrame of the append. The ReadIndex can store this in RocksDB as it is non-constant in size. We can use the concatenate operator to keep the stored content concise.

This data constitutes an event offset to time index. The ReadIndex can use this to locate the time for an event being read. When a ReadSegment request arrives with a token, that token can be interpreted as a offset into this index data. Then the entries for the relevant range for the index can be returned along with a token for where the next read should occur at.

When StorageWriter moves data to Tier-2 the corresponding time index should also be moved there and the ReadIndex updated. The data can be written as its own file. Once the data is written to tier-2 it can be evicted from memory just like the data in the segment.

If a read arrives at PravegaRequestProcessor from a new client, it will not have a token. If the offset is 0 the the token can be assumed to be zero. However, if it is not, we can locate the correct offset with a binary or better yet a Newtonian search. (If the segment is 10MB in length and we are being asked to read at 8.5MB in, instead of selecting a guess 50% of the way between the start and end of the index, pick one 85% of the way.)

Locating an initial token does not require introducing a new API on the store, we can simply have a token constant that represents that the client does not have a token and the value needs to be located.

Transactions

We need not track index data on transaction segments. This is because if we were to track it there and merge it, it would violate the monotonically of the values. Instead we should use a single point for the transaction at the time of merge. So the MergeTransactionOperation would need to contain the a value from the clock.

Clone this wiki locally