-
Notifications
You must be signed in to change notification settings - Fork 408
PDP 36 (Connection Pooling)
A Stream in Pravega is split into set of shards or partitions generally referred to as Segment
s. When an event is written to the Stream by the Pravega Client it is written into one of these segments based on the event routing key. These Segment
s which are a part of SegmentContainer
s are managed by the different Segment Store Service instances.
At present the Pravega EventStreamWriter
creates new connections to different segment stores for every Segment
it is writing to. A new connection to a segment store is created even when multiple segments are owned by the same segment store. In the case of EventStreamReader
, every Segment
being read by the Pravega client maps to a new connection. RevisionedStreamClient
, which enables to read and write to a Stream with strong consistency also creates new connection to a segment store for every Segment
it is reading from and a new connection for every Segment it is writing to.
The number of connections created increases if the user is writing and reading from multiple Streams.
The goal of connection pooling is to ensure a common pool of connections between the client process and the segment stores, which does not require a linear growth of the number of connections with the number of segments.
The requirement can be broken down into the following
- WireCommand Changes to enable connection pooling during Append path.
- WireCommand Changes to enable connection pooling during Segment Reads.
- WireCommand changes to enable connection pooling for the RevisionedStreamClient.
- Connection pool for Netty channels (
io.netty.channel.Channel
) and enable the ability to match 1:1 replies that a request generated.
No changes to the API due to connection pooling.
By default connection pooling is enabled for writers too. Connection pooling for writers can be disabled by using the following option inside the EventWriterConfig
EventWriterConfig writerConfig = EventWriterConfig.builder().enableConnectionPooling(false).build();
Netty channel (io.netty.channel.Channel
) provides a way to interact with a network socket or a component which is capable of I/O operations such as read, write, connect and bind. As a part of connection pooling we need to ensure that
-
The number of channels/connections between a particular client process and a given segment store is configurable.
-
Given a pool of connections we should have the ability to match 1:1 the reply that a request generated. This implies that the response
WireCommands.SegmentRead
received for aWireCommands.ReadSegment
request sent by a Segment Reader should be sent to the same reader. Similarly, in the case ofAppend
request sent by a Segment writer the responseWireCommands.DataAppended
should be sent to the same segment writer. Since writers batch multipleAppend
s into anWireCommands.AppendBlock
we should ensure that connections from different Segment writers are shared based on the load. -
Close of Segment Writer or Segment Reader does not imply that the underlying connection should be closed.
Every Segment Writer or Segment Reader connects to the underlying io.netty.channel.Channel
using io.pravega.client.netty.impl.ClientConnection
which is obtained by passing the URI of the segment store and the registering its io.pravega.shared.protocol.netty.ReplyProcessor
. The ability to match 1:1 replies that a request generated can be implemented by using a request id field which is returned by the segment store with every reply. Based on the requestId
field, the client invokes the correct ReplyProcessor
of the client (Segment Writer/Reader) sharing the same underlying connection. The requestId
is present in most of the WireCommands
.
The requestId
which is of type long
composed of a flowId
(int) and a requestSequenceNumber
(int). The flowId
is used to represent the communication between Segment clients and the SegmentStore which uses an underlying connection pool. This flowId
is unique per connection pool and a Flow
is always tied to a specific network connection. The requestSequenceNumber
is used to represent the requestSequence for a given Flow
.
There can be multiple WireCommandType#SEGMENT_READ
sent on the same connection/channel by the segment store. This implies that we need to ensure that the client invokes the correct io.pravega.shared.protocol.netty.ReplyProcessor
. The lookup of the right reply processor can be based on
- The segment being read from and the offset at which the segment is being read from. i.e.
io.pravega.shared.protocol.netty.WireCommands.SegmentRead#segment
and theio.pravega.shared.protocol.netty.WireCommands.SegmentRead#offset
-
requestId
can be added to the existingWireCommands.ReadSegment
which would correspond to a particular Segment Reader.WireCommands.SegmentRead
which is responded by the Segmentstore when there is data would also include a newrequestId
.
Option (2) is the preferred solution, since it enables a correct multiplexing of the channel. This would also imply that all other responses returned SegmentStore would use the requestId
passed by during the WireCommands.ReadSegment
command. The responses to WireCommands.ReadSegment
can be
WireCommands.WrongHost
WireCommands.SegmentIsTruncated
WireCommands.SegmentIsSealed
WireCommands.AuthTokenCheckFailed
All the above commands already have the requestId
field. The requestId
field now will be populated with the requestId passed as part of WireCommands.ReadSegment.
Note: The current code uses segment offset as requestId and this would be changed as part of connection pooling implementation. Also, WireCommands.SegmentIsSealed needs to be modified to have a new field offset to track the offset at which SegmentSealed was observed.
The current behavior for the Append path is as follows:
During the append, the writer-id and the event number are passed to the Wirecommand Append
. The io.pravega.client.netty.impl.AppendBatchSizeTrackerImpl
decides if the individual Append
should be directly sent to the segment store or multiple Append
s should be batched together in a AppendBlock and sent to the SegmentStore. The current implementation of the CommandEncoder
already handles multiple appends from different writers, i.e., in case there is an append operation happening for a given segment and on the same connection we have an append operation to a different segment, then it closes the AppendBloc
k for the previous segment and starts a new AppendBlock
for appends to the next Segment
With connection pooling for Append path the following aspects need to be taken care of:
-
Multiple Appends to multiple segments on the same connection would lead to
AppendBlock
for one segment being closed and newerAppendBlock
being created the next writer. This would lead to pre-mature ending of AppendBlocks. The implementation needs to implement heuristics to choose a connection in the pool which is least busy. -
In the current implementation the event number is an increasing sequence. The same event number is used as
requestId
when the segment store responds to Appends withWireCommands.SegmentIsSealed
,WireCommands.NoSuchSegment
. -
In the case the client application creates multiple writers, all writing to the same stream, we need a way to ensure the
WireCommands.SegmentIsSealed
andWireCommands.NoSuchSegment
invoke theReplyProcessor
of the correct segment writers.
To satisfy the requirements during the Append path the idea here is to ensure every segment writer uses a unique requestId
. This implies WireCommands.AppendBlock
, WireCommands.AppendBlockEnd
and WireCommands.DataAppended
needs to be modified to have a requestId
.
The same requestId
will be used by the segment store when it responds to the append request using WireCommands.DataAppended
. The same requestId
will be used when the SSS responds with other commands like WireCommands.SegmentIsSealed
, WireCommands.NoSuchSegment
, WireCommands.WrongHost
, WireCommands.AppendSetup
and WireCommands.AuthTokenCheckFailed
. This requestId
will be used to invoke the ReplyProcessor
of the correct Segment writer.
The current RevisionedStreamClient
creates 3 connections
- A connection using Raw client.
- A segment read connection.
- A segment write connection.
The io.pravega.client.netty.impl.RawClient
already handles multiple types of WireCommands based on a request id. The following commands WireCommands.GetSegmentAttribute
, WireCommands.GetStreamSegmentInfo
, WireCommands.ConditionalAppend
, WireCommands.UpdateSegmentAttribute
, WireCommands.TruncateSegment
and WireCommands.SealSegment
are sent using the RawClient. At present RawClient is created for a specific segment and a separate connection is created for the same.
The same requestId
based mechanism, with increasing requestSequenceNumbers
can be used to invoke the ResponseProcessor
of the RawClient since the RawClient also uses a requestId
to track pending requests.
To manage the multiple underlying connections Netty4 already has multiple io.netty.channel.pool.ChannelPool
implementations.
-
io.netty.channel.pool.SimpleChannelPool
: This creates a new connection if there is no channel in the pool. There is no limit on the number of connections that can be created. -
io.netty.channel.pool.FixedChannelPool
: This is an implementation ofChannelPool
which enforces a maximum limit on the number of connections that can be created.
These options are not flexible for Pravega's requirements. Also, ChannelPool
has been removed from Netty 5.(https://github.com/netty/netty/pull/8681
).
Pull requests to implement connection pooling https://github.com/pravega/pravega/pull/3952 & https://github.com/pravega/pravega/pull/3622
Pull request to ensure connection pooling is configurable. https://github.com/pravega/pravega/pull/3952
- Contributing
- Guidelines for committers
- Testing
-
Pravega Design Documents (PDPs)
- PDP-19: Retention
- PDP-20: Txn Timeouts
- PDP-21: Protocol Revisioning
- PDP-22: Bookkeeper Based Tier-2
- PDP-23: Pravega Security
- PDP-24: Rolling Transactions
- PDP-25: Read-Only Segment Store
- PDP-26: Ingestion Watermarks
- PDP-27: Admin Tools
- PDP-28: Cross Routing Key Ordering
- PDP-29: Tables
- PDP-30: Byte Stream API
- PDP-31: End-to-End Request Tags
- PDP-32: Controller Metadata Scalability
- PDP-33: Watermarking
- PDP-34: Simplified-Tier-2
- PDP-35: Move Controller Metadata to KVS
- PDP-36: Connection Pooling
- PDP-37: Server-Side Compression
- PDP-38: Schema Registry
- PDP-39: Key-Value Tables Beta 1
- PDP-40: Consistent Order Guarantees for Storage Flushes
- PDP-41: Enabling Transport Layer Security (TLS) for External Clients
- PDP-42: New Resource String Format for Authorization
- PDP-43: Large Events
- PDP-44: Lightweight Transactions
- PDP-45: Health Check
- PDP-46: Read Only Permissions For Reading Data
- PDP-47: Pravega Consumption Based Retention
- PDP-48: Key-Value Tables Beta 2
- PDP-49: Segment Store Admin Gateway
- PDP-50: Stream Tags
- PDP-51: Segment Container Event Processor
- PDP-53: Robust Garbage Collection for SLTS
- PDP-54: Tier-1 Repair Tool
- PDP-55: New Reader API on segment level