Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[EventEngine] WindowsEndpoint #31735

Merged
merged 21 commits into from
Nov 28, 2022
Merged

[EventEngine] WindowsEndpoint #31735

merged 21 commits into from
Nov 28, 2022

Conversation

drfloob
Copy link
Member

@drfloob drfloob commented Nov 22, 2022

Initial implementation, tested basic communication.

This also modifies the EventEngine TCP socket utils which were forked from iomgr for the PosixEventEngine, and makes them once again usable on other platforms.

Future work:

  • I expect that once the client is roughed out, the needs for two-stage shutdown and ref counting will be more obvious (and testable).

Automated fix for refs/heads/win-ee-endpoint
}

// TODO(hork): implement normalization
absl::StatusOr<std::string> ResolvedAddressToString(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I count two implementations of this in the code base now, making this a third.

Given this is relatively complex and bug-prone code to write & maintain, I'd like us to have a plan to get down to one. I'd strongly prefer that we never exceed two -- can this implementation be merged with another to achieve that?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will reunify the PosixEventEngine code.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is done, but because of it, I think we'll need a cherrypick.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correction: this does not require a cherrypick, I did a dry run and found no uses (which makes sense)

char* wsa_message = gpr_format_message(wsa_error);
std::string message;
if (!custom_message.empty()) {
std::string message = absl::StrCat(wsa_message, ": ", custom_message);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove the std::string here - we don't want to declare a new variable and then immediately throw it away

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oops, thanks.

case WSAECONNRESET:
case WSAECONNABORTED:
case WSA_OPERATION_ABORTED:
return StatusCreate(absl::StatusCode::kAborted, message, location, {});
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree we should be careful with status codes here - this could constitute an api change

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Chatting with @veblush, we could try to make this change in iomgr and see what the end2end tests say about it (hoping they catch what we care about). It's unclear if a low-level windows or posix socket error ever bubbles up as the final grpc_status_code. The careful thing to do is keep status quo, so I'll do that for now, and experiment separately. Until the status migration is complete (e.g., int payloads are gone) and the EventEngine migration is done, I'm not sure how we could be truly confident that this change is safe.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

sockaddr addr;
int addr_len = sizeof(addr);
if (getsockname(socket_->socket(), &addr, &addr_len) < 0) {
GPR_ASSERT(false &&
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer gpr_log, abort() -- this isn't an assert

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. There are 84 other uses of GPR_ASSERT(false && "Reason") in the codebase at the moment, it's a pattern/smell that can be cleaned up if you feel so inclined.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#31757 starts this work

GRPC_EVENT_ENGINE_TRACE("WindowsEndpoint::%p reading", this);
// Prepare the WSABUF struct
WSABUF wsa_buffers[kMaxWSABUFCount];
// TODO(hork): use read hint instead of the default?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ideally we should not return from a read until the read hint is achieved (it's labelled min_progress_size in iomgr and that seems a better name)

if we do so, the calling code will assuredly call read again and not process the incoming memory until that limit is achieved anyway

whether that's important now or not I have no opinion, but ultimately we'll want to post at least min_progress_size bytes for an async read to avoid spurious wakeups from the kernel (we could post more, that would be fine)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. min_progress_size is currently unused in the Windows iomgr implementation, but we can improve upon that here. I've added the override.

public:
explicit BaseEventClosure(WindowsEndpoint* endpoint);
virtual void Run() = 0;
void SetCallback(absl::AnyInvocable<void(absl::Status)> cb) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We always call SetCallback then SetSliceBuffer - suggest combining these into a single Init/Prime/Prepare call

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Primed.

}
}
GPR_ASSERT(data->Count() <= UINT_MAX);
WSABUF local_buffers[kMaxWSABUFCount];
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggest absl::InlinedVector<WSABUF, kMaxWSABUFCount> here and eschewing the complex hand allocation/deallocation

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will evaluate, thanks.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

buffers = allocated;
}
for (int i = 0; i < data->Count(); i++) {
auto slice = data->RefSlice(i);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

meta: we need a way of getting this data out without referencing the contained slice - doing so is incredibly inefficient

Copy link
Member Author

@drfloob drfloob Nov 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I deleted a comment here along the lines of // DO NOT SUBMIT: implement PeekSlice, but did not want to complicate this PR with a public API change. I can do the public SliceBuffer change now in a separate PR.

There are also ways we could add the functionality internally as a non-member helper function around the c slice buffer, but I think it's probably best in the public API.

if (status == 0) {
if (bytes_sent == data->Length()) {
// Write completed, exiting early
executor_->Run(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting question I don't know the answer to: we do a synchronous write for efficiency here, but then need an extra allocation to call the callback.... does this render the synchronous path moot?

Copy link
Member Author

@drfloob drfloob Nov 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're talking about allocations due to the size of the lambda capture? Even if so, this still saves a few hops through IOCP callbacks, so not quite moot, but it's not as good as the ExecCtx version we're replacing. We could instead capture this and add ref counting to the endpoint itself to minimize the size of the lambda.

GPR_ASSERT(write_info->bytes_transferred() == buffer_->Length());
}
endpoint_->executor_->Run(
[cb = std::move(cb), status = std::move(status)]() mutable {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The read callback is made inline without going through the executor again... is there a reason this one also cannot?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. That was an oversight, they should both use the executor. But since IOCP is running its callbacks in an executor anyhow, they can both be called inline here.

@@ -22,6 +22,11 @@
namespace grpc_event_engine {
namespace experimental {

sockaddr_in GetLoopbackAddress();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pedant: prefer GetSomeIpv4LoopbackAddress -- rfc5735 has loopback as being all of 127.0.0.0/8 for ipv4

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a better name. Done.

Copy link
Member Author

@drfloob drfloob left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your review. I responded with a couple of questions, fixed most suggestions, and have one or two left to do.

buffers = allocated;
}
for (int i = 0; i < data->Count(); i++) {
auto slice = data->RefSlice(i);
Copy link
Member Author

@drfloob drfloob Nov 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I deleted a comment here along the lines of // DO NOT SUBMIT: implement PeekSlice, but did not want to complicate this PR with a public API change. I can do the public SliceBuffer change now in a separate PR.

There are also ways we could add the functionality internally as a non-member helper function around the c slice buffer, but I think it's probably best in the public API.

sockaddr addr;
int addr_len = sizeof(addr);
if (getsockname(socket_->socket(), &addr, &addr_len) < 0) {
GPR_ASSERT(false &&
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. There are 84 other uses of GPR_ASSERT(false && "Reason") in the codebase at the moment, it's a pattern/smell that can be cleaned up if you feel so inclined.

char* wsa_message = gpr_format_message(wsa_error);
std::string message;
if (!custom_message.empty()) {
std::string message = absl::StrCat(wsa_message, ": ", custom_message);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oops, thanks.

if (status == 0) {
if (bytes_sent == data->Length()) {
// Write completed, exiting early
executor_->Run(
Copy link
Member Author

@drfloob drfloob Nov 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're talking about allocations due to the size of the lambda capture? Even if so, this still saves a few hops through IOCP callbacks, so not quite moot, but it's not as good as the ExecCtx version we're replacing. We could instead capture this and add ref counting to the endpoint itself to minimize the size of the lambda.

GPR_ASSERT(write_info->bytes_transferred() == buffer_->Length());
}
endpoint_->executor_->Run(
[cb = std::move(cb), status = std::move(status)]() mutable {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. That was an oversight, they should both use the executor. But since IOCP is running its callbacks in an executor anyhow, they can both be called inline here.

@@ -22,6 +22,11 @@
namespace grpc_event_engine {
namespace experimental {

sockaddr_in GetLoopbackAddress();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a better name. Done.

case WSAECONNRESET:
case WSAECONNABORTED:
case WSA_OPERATION_ABORTED:
return StatusCreate(absl::StatusCode::kAborted, message, location, {});
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Chatting with @veblush, we could try to make this change in iomgr and see what the end2end tests say about it (hoping they catch what we care about). It's unclear if a low-level windows or posix socket error ever bubbles up as the final grpc_status_code. The careful thing to do is keep status quo, so I'll do that for now, and experiment separately. Until the status migration is complete (e.g., int payloads are gone) and the EventEngine migration is done, I'm not sure how we could be truly confident that this change is safe.

case WSAECONNRESET:
case WSAECONNABORTED:
case WSA_OPERATION_ABORTED:
return StatusCreate(absl::StatusCode::kAborted, message, location, {});
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}
}
GPR_ASSERT(data->Count() <= UINT_MAX);
WSABUF local_buffers[kMaxWSABUFCount];
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will evaluate, thanks.

GRPC_EVENT_ENGINE_TRACE("WindowsEndpoint::%p reading", this);
// Prepare the WSABUF struct
WSABUF wsa_buffers[kMaxWSABUFCount];
// TODO(hork): use read hint instead of the default?
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. min_progress_size is currently unused in the Windows iomgr implementation, but we can improve upon that here. I've added the override.

auto slice = data->RefSlice(i);
GPR_ASSERT(slice.size() <= ULONG_MAX);
buffers[i].len = slice.size();
buffers[i].buf = (char*)slice.begin();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: reinterpret_cast -- avoid c-style casts

@drfloob drfloob merged commit 557e558 into grpc:master Nov 28, 2022
@copybara-service copybara-service bot added the imported Specifies if the PR has been imported to the internal repository label Nov 29, 2022
wanlin31 pushed a commit that referenced this pull request May 18, 2023
* [EventEngine] WindowsEndpoint

Initial sketch, all tests passing

* Port fix from #28432

* GPR_WINDOWS guard

* use MemoryAllocator::MakeReservation for allocated buffers

* better logging (respect slice length)

* Automated change: Fix sanity tests

* improvements

* Automated change: Fix sanity tests

* InlinedVector<WSABUF, kMaxWSABUFCount>

* initial attempt at socket util reunification

* posix fixes + local run of sanitize.sh

* posix socket includes

* fix

* Automated change: Fix sanity tests

* remove unused include (breaks windows)

* remove stale comment

Co-authored-by: drfloob <drfloob@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bloat/none imported Specifies if the PR has been imported to the internal repository lang/c++ lang/core lang/Python per-call-memory/neutral per-channel-memory/neutral release notes: no Indicates if PR should not be in release notes
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants