-
Notifications
You must be signed in to change notification settings - Fork 10.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[EventEngine] WindowsEndpoint #31735
Conversation
Initial sketch, all tests passing
Automated fix for refs/heads/win-ee-endpoint
} | ||
|
||
// TODO(hork): implement normalization | ||
absl::StatusOr<std::string> ResolvedAddressToString( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I count two implementations of this in the code base now, making this a third.
Given this is relatively complex and bug-prone code to write & maintain, I'd like us to have a plan to get down to one. I'd strongly prefer that we never exceed two -- can this implementation be merged with another to achieve that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will reunify the PosixEventEngine code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is done, but because of it, I think we'll need a cherrypick.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correction: this does not require a cherrypick, I did a dry run and found no uses (which makes sense)
char* wsa_message = gpr_format_message(wsa_error); | ||
std::string message; | ||
if (!custom_message.empty()) { | ||
std::string message = absl::StrCat(wsa_message, ": ", custom_message); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove the std::string
here - we don't want to declare a new variable and then immediately throw it away
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oops, thanks.
case WSAECONNRESET: | ||
case WSAECONNABORTED: | ||
case WSA_OPERATION_ABORTED: | ||
return StatusCreate(absl::StatusCode::kAborted, message, location, {}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agree we should be careful with status codes here - this could constitute an api change
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Chatting with @veblush, we could try to make this change in iomgr and see what the end2end tests say about it (hoping they catch what we care about). It's unclear if a low-level windows or posix socket error ever bubbles up as the final grpc_status_code
. The careful thing to do is keep status quo, so I'll do that for now, and experiment separately. Until the status migration is complete (e.g., int payloads are gone) and the EventEngine migration is done, I'm not sure how we could be truly confident that this change is safe.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
sockaddr addr; | ||
int addr_len = sizeof(addr); | ||
if (getsockname(socket_->socket(), &addr, &addr_len) < 0) { | ||
GPR_ASSERT(false && |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd prefer gpr_log, abort() -- this isn't an assert
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. There are 84 other uses of GPR_ASSERT(false && "Reason")
in the codebase at the moment, it's a pattern/smell that can be cleaned up if you feel so inclined.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#31757 starts this work
GRPC_EVENT_ENGINE_TRACE("WindowsEndpoint::%p reading", this); | ||
// Prepare the WSABUF struct | ||
WSABUF wsa_buffers[kMaxWSABUFCount]; | ||
// TODO(hork): use read hint instead of the default? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ideally we should not return from a read until the read hint is achieved (it's labelled min_progress_size
in iomgr and that seems a better name)
if we do so, the calling code will assuredly call read again and not process the incoming memory until that limit is achieved anyway
whether that's important now or not I have no opinion, but ultimately we'll want to post at least min_progress_size bytes for an async read to avoid spurious wakeups from the kernel (we could post more, that would be fine)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed. min_progress_size
is currently unused in the Windows iomgr implementation, but we can improve upon that here. I've added the override.
public: | ||
explicit BaseEventClosure(WindowsEndpoint* endpoint); | ||
virtual void Run() = 0; | ||
void SetCallback(absl::AnyInvocable<void(absl::Status)> cb) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We always call SetCallback
then SetSliceBuffer
- suggest combining these into a single Init
/Prime
/Prepare
call
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Primed.
} | ||
} | ||
GPR_ASSERT(data->Count() <= UINT_MAX); | ||
WSABUF local_buffers[kMaxWSABUFCount]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggest absl::InlinedVector<WSABUF, kMaxWSABUFCount>
here and eschewing the complex hand allocation/deallocation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will evaluate, thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
buffers = allocated; | ||
} | ||
for (int i = 0; i < data->Count(); i++) { | ||
auto slice = data->RefSlice(i); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
meta: we need a way of getting this data out without referencing the contained slice - doing so is incredibly inefficient
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I deleted a comment here along the lines of // DO NOT SUBMIT: implement PeekSlice
, but did not want to complicate this PR with a public API change. I can do the public SliceBuffer change now in a separate PR.
There are also ways we could add the functionality internally as a non-member helper function around the c slice buffer, but I think it's probably best in the public API.
if (status == 0) { | ||
if (bytes_sent == data->Length()) { | ||
// Write completed, exiting early | ||
executor_->Run( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting question I don't know the answer to: we do a synchronous write for efficiency here, but then need an extra allocation to call the callback.... does this render the synchronous path moot?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're talking about allocations due to the size of the lambda capture? Even if so, this still saves a few hops through IOCP callbacks, so not quite moot, but it's not as good as the ExecCtx version we're replacing. We could instead capture this
and add ref counting to the endpoint itself to minimize the size of the lambda.
GPR_ASSERT(write_info->bytes_transferred() == buffer_->Length()); | ||
} | ||
endpoint_->executor_->Run( | ||
[cb = std::move(cb), status = std::move(status)]() mutable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The read callback is made inline without going through the executor again... is there a reason this one also cannot?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed. That was an oversight, they should both use the executor. But since IOCP is running its callbacks in an executor anyhow, they can both be called inline here.
@@ -22,6 +22,11 @@ | |||
namespace grpc_event_engine { | |||
namespace experimental { | |||
|
|||
sockaddr_in GetLoopbackAddress(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pedant: prefer GetSomeIpv4LoopbackAddress
-- rfc5735 has loopback as being all of 127.0.0.0/8
for ipv4
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a better name. Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your review. I responded with a couple of questions, fixed most suggestions, and have one or two left to do.
buffers = allocated; | ||
} | ||
for (int i = 0; i < data->Count(); i++) { | ||
auto slice = data->RefSlice(i); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I deleted a comment here along the lines of // DO NOT SUBMIT: implement PeekSlice
, but did not want to complicate this PR with a public API change. I can do the public SliceBuffer change now in a separate PR.
There are also ways we could add the functionality internally as a non-member helper function around the c slice buffer, but I think it's probably best in the public API.
sockaddr addr; | ||
int addr_len = sizeof(addr); | ||
if (getsockname(socket_->socket(), &addr, &addr_len) < 0) { | ||
GPR_ASSERT(false && |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. There are 84 other uses of GPR_ASSERT(false && "Reason")
in the codebase at the moment, it's a pattern/smell that can be cleaned up if you feel so inclined.
char* wsa_message = gpr_format_message(wsa_error); | ||
std::string message; | ||
if (!custom_message.empty()) { | ||
std::string message = absl::StrCat(wsa_message, ": ", custom_message); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oops, thanks.
if (status == 0) { | ||
if (bytes_sent == data->Length()) { | ||
// Write completed, exiting early | ||
executor_->Run( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're talking about allocations due to the size of the lambda capture? Even if so, this still saves a few hops through IOCP callbacks, so not quite moot, but it's not as good as the ExecCtx version we're replacing. We could instead capture this
and add ref counting to the endpoint itself to minimize the size of the lambda.
GPR_ASSERT(write_info->bytes_transferred() == buffer_->Length()); | ||
} | ||
endpoint_->executor_->Run( | ||
[cb = std::move(cb), status = std::move(status)]() mutable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed. That was an oversight, they should both use the executor. But since IOCP is running its callbacks in an executor anyhow, they can both be called inline here.
@@ -22,6 +22,11 @@ | |||
namespace grpc_event_engine { | |||
namespace experimental { | |||
|
|||
sockaddr_in GetLoopbackAddress(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a better name. Done.
case WSAECONNRESET: | ||
case WSAECONNABORTED: | ||
case WSA_OPERATION_ABORTED: | ||
return StatusCreate(absl::StatusCode::kAborted, message, location, {}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Chatting with @veblush, we could try to make this change in iomgr and see what the end2end tests say about it (hoping they catch what we care about). It's unclear if a low-level windows or posix socket error ever bubbles up as the final grpc_status_code
. The careful thing to do is keep status quo, so I'll do that for now, and experiment separately. Until the status migration is complete (e.g., int payloads are gone) and the EventEngine migration is done, I'm not sure how we could be truly confident that this change is safe.
case WSAECONNRESET: | ||
case WSAECONNABORTED: | ||
case WSA_OPERATION_ABORTED: | ||
return StatusCreate(absl::StatusCode::kAborted, message, location, {}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
} | ||
} | ||
GPR_ASSERT(data->Count() <= UINT_MAX); | ||
WSABUF local_buffers[kMaxWSABUFCount]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will evaluate, thanks.
GRPC_EVENT_ENGINE_TRACE("WindowsEndpoint::%p reading", this); | ||
// Prepare the WSABUF struct | ||
WSABUF wsa_buffers[kMaxWSABUFCount]; | ||
// TODO(hork): use read hint instead of the default? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed. min_progress_size
is currently unused in the Windows iomgr implementation, but we can improve upon that here. I've added the override.
Automated fix for refs/heads/win-ee-endpoint
auto slice = data->RefSlice(i); | ||
GPR_ASSERT(slice.size() <= ULONG_MAX); | ||
buffers[i].len = slice.size(); | ||
buffers[i].buf = (char*)slice.begin(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: reinterpret_cast
-- avoid c-style casts
Automated fix for refs/heads/win-ee-endpoint
* [EventEngine] WindowsEndpoint Initial sketch, all tests passing * Port fix from #28432 * GPR_WINDOWS guard * use MemoryAllocator::MakeReservation for allocated buffers * better logging (respect slice length) * Automated change: Fix sanity tests * improvements * Automated change: Fix sanity tests * InlinedVector<WSABUF, kMaxWSABUFCount> * initial attempt at socket util reunification * posix fixes + local run of sanitize.sh * posix socket includes * fix * Automated change: Fix sanity tests * remove unused include (breaks windows) * remove stale comment Co-authored-by: drfloob <drfloob@users.noreply.github.com>
Initial implementation, tested basic communication.
This also modifies the EventEngine TCP socket utils which were forked from iomgr for the PosixEventEngine, and makes them once again usable on other platforms.
Future work: