Skip to content

PDP 36 (Connection Pooling)

Derek Moore edited this page Jul 9, 2021 · 1 revision

Introduction:

A Stream in Pravega is split into set of shards or partitions generally referred to as Segments. When an event is written to the Stream by the Pravega Client it is written into one of these segments based on the event routing key. These Segments which are a part of SegmentContainers are managed by the different Segment Store Service instances.

At present the Pravega EventStreamWriter creates new connections to different segment stores for every Segment it is writing to. A new connection to a segment store is created even when multiple segments are owned by the same segment store. In the case of EventStreamReader, every Segment being read by the Pravega client maps to a new connection. RevisionedStreamClient, which enables to read and write to a Stream with strong consistency also creates new connection to a segment store for every Segment it is reading from and a new connection for every Segment it is writing to.

The number of connections created increases if the user is writing and reading from multiple Streams.

The goal of connection pooling is to ensure a common pool of connections between the client process and the segment stores, which does not require a linear growth of the number of connections with the number of segments.

Requirements:

The requirement can be broken down into the following

  • WireCommand Changes to enable connection pooling during Append path.
  • WireCommand Changes to enable connection pooling during Segment Reads.
  • WireCommand changes to enable connection pooling for the RevisionedStreamClient.
  • Connection pool for Netty channels (io.netty.channel.Channel) and enable the ability to match 1:1 replies that a request generated.

API Changes:

No changes to the API due to connection pooling. By default connection pooling is enabled for writers too. Connection pooling for writers can be disabled by using the following option inside the EventWriterConfig

EventWriterConfig writerConfig = EventWriterConfig.builder().enableConnectionPooling(false).build();

Design:

Connection pool for Netty channels:

Netty channel (io.netty.channel.Channel) provides a way to interact with a network socket or a component which is capable of I/O operations such as read, write, connect and bind. As a part of connection pooling we need to ensure that

  1. The number of channels/connections between a particular client process and a given segment store is configurable.

  2. Given a pool of connections we should have the ability to match 1:1 the reply that a request generated. This implies that the response WireCommands.SegmentRead received for a WireCommands.ReadSegment request sent by a Segment Reader should be sent to the same reader. Similarly, in the case of Append request sent by a Segment writer the response WireCommands.DataAppended should be sent to the same segment writer. Since writers batch multiple Appends into an WireCommands.AppendBlock we should ensure that connections from different Segment writers are shared based on the load.

  3. Close of Segment Writer or Segment Reader does not imply that the underlying connection should be closed.

Every Segment Writer or Segment Reader connects to the underlying io.netty.channel.Channel using io.pravega.client.netty.impl.ClientConnection which is obtained by passing the URI of the segment store and the registering its io.pravega.shared.protocol.netty.ReplyProcessor . The ability to match 1:1 replies that a request generated can be implemented by using a request id field which is returned by the segment store with every reply. Based on the requestId field, the client invokes the correct ReplyProcessor of the client (Segment Writer/Reader) sharing the same underlying connection. The requestId is present in most of the WireCommands.

The requestId which is of type long composed of a flowId (int) and a requestSequenceNumber (int). The flowId is used to represent the communication between Segment clients and the SegmentStore which uses an underlying connection pool. This flowId is unique per connection pool and a Flow is always tied to a specific network connection. The requestSequenceNumber is used to represent the requestSequence for a given Flow.

WireCommand Changes to enable connection pooling during Segment Reads:

There can be multiple WireCommandType#SEGMENT_READ sent on the same connection/channel by the segment store. This implies that we need to ensure that the client invokes the correct io.pravega.shared.protocol.netty.ReplyProcessor. The lookup of the right reply processor can be based on

  1. The segment being read from and the offset at which the segment is being read from. i.e. io.pravega.shared.protocol.netty.WireCommands.SegmentRead#segment and the io.pravega.shared.protocol.netty.WireCommands.SegmentRead#offset
  2. requestId can be added to the existing WireCommands.ReadSegment which would correspond to a particular Segment Reader. WireCommands.SegmentRead which is responded by the Segmentstore when there is data would also include a new requestId.

Option (2) is the preferred solution, since it enables a correct multiplexing of the channel. This would also imply that all other responses returned SegmentStore would use the requestId passed by during the WireCommands.ReadSegment command. The responses to WireCommands.ReadSegment can be

  • WireCommands.WrongHost
  • WireCommands.SegmentIsTruncated
  • WireCommands.SegmentIsSealed
  • WireCommands.AuthTokenCheckFailed

All the above commands already have the requestId field. The requestId field now will be populated with the requestId passed as part of WireCommands.ReadSegment.

Note: The current code uses segment offset as requestId and this would be changed as part of connection pooling implementation. Also, WireCommands.SegmentIsSealed needs to be modified to have a new field offset to track the offset at which SegmentSealed was observed.

WireCommand Changes to enable connection pooling during Append path:

The current behavior for the Append path is as follows:

During the append, the writer-id and the event number are passed to the Wirecommand Append. The io.pravega.client.netty.impl.AppendBatchSizeTrackerImpl decides if the individual Append should be directly sent to the segment store or multiple Appends should be batched together in a AppendBlock and sent to the SegmentStore. The current implementation of the CommandEncoder already handles multiple appends from different writers, i.e., in case there is an append operation happening for a given segment and on the same connection we have an append operation to a different segment, then it closes the AppendBlock for the previous segment and starts a new AppendBlock for appends to the next Segment

With connection pooling for Append path the following aspects need to be taken care of:

  1. Multiple Appends to multiple segments on the same connection would lead to AppendBlock for one segment being closed and newer AppendBlock being created the next writer. This would lead to pre-mature ending of AppendBlocks. The implementation needs to implement heuristics to choose a connection in the pool which is least busy.

  2. In the current implementation the event number is an increasing sequence. The same event number is used as requestId when the segment store responds to Appends with WireCommands.SegmentIsSealed, WireCommands.NoSuchSegment.

  3. In the case the client application creates multiple writers, all writing to the same stream, we need a way to ensure the WireCommands.SegmentIsSealed and WireCommands.NoSuchSegment invoke the ReplyProcessor of the correct segment writers.

To satisfy the requirements during the Append path the idea here is to ensure every segment writer uses a unique requestId. This implies WireCommands.AppendBlock, WireCommands.AppendBlockEnd and WireCommands.DataAppended needs to be modified to have a requestId.

The same requestId will be used by the segment store when it responds to the append request using WireCommands.DataAppended. The same requestId will be used when the SSS responds with other commands like WireCommands.SegmentIsSealed, WireCommands.NoSuchSegment, WireCommands.WrongHost, WireCommands.AppendSetup and WireCommands.AuthTokenCheckFailed. This requestId will be used to invoke the ReplyProcessor of the correct Segment writer.

WireCommand changes to enable connection pooling for the RevisionedStreamClient:

The current RevisionedStreamClient creates 3 connections

  1. A connection using Raw client.
  2. A segment read connection.
  3. A segment write connection.

The io.pravega.client.netty.impl.RawClient already handles multiple types of WireCommands based on a request id. The following commands WireCommands.GetSegmentAttribute, WireCommands.GetStreamSegmentInfo, WireCommands.ConditionalAppend, WireCommands.UpdateSegmentAttribute, WireCommands.TruncateSegment and WireCommands.SealSegment are sent using the RawClient. At present RawClient is created for a specific segment and a separate connection is created for the same.

The same requestId based mechanism, with increasing requestSequenceNumbers can be used to invoke the ResponseProcessor of the RawClient since the RawClient also uses a requestId to track pending requests.

Discarded Approaches

To manage the multiple underlying connections Netty4 already has multiple io.netty.channel.pool.ChannelPool implementations.

  1. io.netty.channel.pool.SimpleChannelPool : This creates a new connection if there is no channel in the pool. There is no limit on the number of connections that can be created.
  2. io.netty.channel.pool.FixedChannelPool : This is an implementation of ChannelPool which enforces a maximum limit on the number of connections that can be created.

These options are not flexible for Pravega's requirements. Also, ChannelPool has been removed from Netty 5.(https://github.com/netty/netty/pull/8681).

References:

Pull requests to implement connection pooling https://github.com/pravega/pravega/pull/3952 & https://github.com/pravega/pravega/pull/3622

Pull request to ensure connection pooling is configurable. https://github.com/pravega/pravega/pull/3952

Clone this wiki locally