Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

alts: Queue ALTS handshakes once limit is reached rather than dropping. #6884

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
alts: Queue ALTS handshakes once limit is reached rather than dropping.
  • Loading branch information
matthewstevenson88 committed Dec 19, 2023
commit 658ffaac496e427e28fa599e4b6fb255c9044628
10 changes: 4 additions & 6 deletions credentials/alts/internal/handshaker/handshaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,6 @@
// control number of concurrent created (but not closed) handshakes.
clientHandshakes = semaphore.NewWeighted(int64(envconfig.ALTSMaxConcurrentHandshakes))
serverHandshakes = semaphore.NewWeighted(int64(envconfig.ALTSMaxConcurrentHandshakes))
// errDropped occurs when maxPendingHandshakes is reached.
errDropped = errors.New("maximum number of concurrent ALTS handshakes is reached")
// errOutOfBound occurs when the handshake service returns a consumed
// bytes value larger than the buffer that was passed to it originally.
errOutOfBound = errors.New("handshaker service consumed bytes value is out-of-bound")
Expand Down Expand Up @@ -156,8 +154,8 @@
// ClientHandshake starts and completes a client ALTS handshake for GCP. Once
// done, ClientHandshake returns a secure connection.
func (h *altsHandshaker) ClientHandshake(ctx context.Context) (net.Conn, credentials.AuthInfo, error) {
if !clientHandshakes.TryAcquire(1) {
return nil, nil, errDropped
if err := clientHandshakes.Acquire(ctx, 1); err != nil {
return nil, nil, err

Check warning on line 158 in credentials/alts/internal/handshaker/handshaker.go

View check run for this annotation

Codecov / codecov/patch

credentials/alts/internal/handshaker/handshaker.go#L158

Added line #L158 was not covered by tests
Copy link
Contributor

@cesarghali cesarghali Dec 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please clarify how returning the actual error help queuing instead of dropping? Is there another PR after this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the current code, we call TryAcquire on the semaphore. If the semaphore is capped out, then this immediately returns false, so we immediately return errDropped to the user.

When we replace the TryAcquire call with Acquire instead, we will block on trying to acquire the semaphore until ctx times out. Internally to the semaphore, there is a queue of "acquire attempts", so calling Acquire effectively adds the current goroutine to this queue.

As explained in b/312467484, we want to do this so that we have uniform behavior among the 3 languages.

Does that make sense / help clarify?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the clarification. This makes sense. Can you please add a comment that Acquire blocks until it can acquire? Thank you!

}
defer clientHandshakes.Release(1)

Expand Down Expand Up @@ -209,8 +207,8 @@
// ServerHandshake starts and completes a server ALTS handshake for GCP. Once
// done, ServerHandshake returns a secure connection.
func (h *altsHandshaker) ServerHandshake(ctx context.Context) (net.Conn, credentials.AuthInfo, error) {
if !serverHandshakes.TryAcquire(1) {
return nil, nil, errDropped
if err := serverHandshakes.Acquire(ctx, 1); err != nil {
return nil, nil, err

Check warning on line 211 in credentials/alts/internal/handshaker/handshaker.go

View check run for this annotation

Codecov / codecov/patch

credentials/alts/internal/handshaker/handshaker.go#L211

Added line #L211 was not covered by tests
}
defer serverHandshakes.Release(1)

Expand Down
12 changes: 6 additions & 6 deletions credentials/alts/internal/handshaker/handshaker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,10 +193,10 @@ func (s) TestClientHandshake(t *testing.T) {
}()
}

// Ensure all errors are expected.
// Ensure that there are no errors.
for i := 0; i < testCase.numberOfHandshakes; i++ {
if err := <-errc; err != nil && err != errDropped {
t.Errorf("ClientHandshake() = _, %v, want _, <nil> or %v", err, errDropped)
if err := <-errc; err != nil {
t.Errorf("ClientHandshake() = _, %v, want _, <nil>", err)
}
}

Expand Down Expand Up @@ -250,10 +250,10 @@ func (s) TestServerHandshake(t *testing.T) {
}()
}

// Ensure all errors are expected.
// Ensure that there are no errors.
for i := 0; i < testCase.numberOfHandshakes; i++ {
if err := <-errc; err != nil && err != errDropped {
t.Errorf("ServerHandshake() = _, %v, want _, <nil> or %v", err, errDropped)
if err := <-errc; err != nil {
t.Errorf("ServerHandshake() = _, %v, want _, <nil>", err)
}
}

Expand Down
Loading