Skip to content

Commit

Permalink
Make transport generic over its bag-of-bytes type (#2155)
Browse files Browse the repository at this point in the history
Motivation:

The transport protocols deal in request/response part types. The
bag-of-bytes message type used in `[UInt8]`. This means that transports
might have to copy to and from the bag-of-bytes they use which is
inefficient.

Modifications:

- Add a `GRPCContiguousBytes` protocol defining a basic bag-of-bytes
type.
- Make the transport protocols have an associated `Bytes` type which
conforms to `GRPCContiguousBytes`.
- Propagate this requirement throughout the codebase; this affects the
generated code.
- Update the code generator to generate the appropriate code.
- Update tests

Result:

- Transports can use a bag-of-bytes type of their choosing.
  • Loading branch information
glbrntt authored Jan 17, 2025
1 parent eb7ed6f commit 93d0536
Show file tree
Hide file tree
Showing 43 changed files with 360 additions and 219 deletions.
25 changes: 20 additions & 5 deletions Sources/GRPCCodeGen/Internal/StructuredSwift+Client.swift
Original file line number Diff line number Diff line change
Expand Up @@ -645,10 +645,10 @@ extension FunctionDescription {

extension StructDescription {
/// ```
/// struct <Name>: <ClientProtocol> {
/// private let client: GRPCCore.GRPCClient
/// struct <Name><Transport>: <ClientProtocol> where Transport: GRPCCore.ClientTransport {
/// private let client: GRPCCore.GRPCClient<Transport>
///
/// init(wrapping client: GRPCCore.GRPCClient) {
/// init(wrapping client: GRPCCore.GRPCClient<Transport>) {
/// self.client = client
/// }
///
Expand All @@ -665,9 +665,18 @@ extension StructDescription {
StructDescription(
accessModifier: accessLevel,
name: name,
generics: [.member("Transport")],
conformances: [clientProtocol],
whereClause: WhereClause(
requirements: [.conformance("Transport", "GRPCCore.ClientTransport")]
),
members: [
.variable(accessModifier: .private, kind: .let, left: "client", type: .grpcClient),
.variable(
accessModifier: .private,
kind: .let,
left: "client",
type: .grpcClient(genericOver: "Transport")
),
.commentable(
.preFormatted(
"""
Expand All @@ -681,7 +690,13 @@ extension StructDescription {
accessModifier: accessLevel,
kind: .initializer,
parameters: [
ParameterDescription(label: "wrapping", name: "client", type: .grpcClient)
ParameterDescription(
label: "wrapping",
name: "client",
type: .grpcClient(
genericOver: "Transport"
)
)
],
whereClause: nil,
body: [
Expand Down
8 changes: 7 additions & 1 deletion Sources/GRPCCodeGen/Internal/StructuredSwift+Server.swift
Original file line number Diff line number Diff line change
Expand Up @@ -311,14 +311,20 @@ extension FunctionDescription {
return FunctionDescription(
accessModifier: accessLevel,
kind: .function(name: "registerMethods"),
generics: [.member("Transport")],
parameters: [
ParameterDescription(
label: "with",
name: "router",
type: .rpcRouter,
type: .rpcRouter(genericOver: "Transport"),
`inout`: true
)
],
whereClause: WhereClause(
requirements: [
.conformance("Transport", "GRPCCore.ServerTransport")
]
),
body: methods.map { method in
.functionCall(
.registerWithRouter(
Expand Down
11 changes: 9 additions & 2 deletions Sources/GRPCCodeGen/Internal/StructuredSwift+Types.swift
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,11 @@ extension ExistingTypeDescription {
}

package static let serverContext: Self = .grpcCore("ServerContext")
package static let rpcRouter: Self = .grpcCore("RPCRouter")

package static func rpcRouter(genericOver type: String) -> Self {
.generic(wrapper: .grpcCore("RPCRouter"), wrapped: .member(type))
}

package static let serviceDescriptor: Self = .grpcCore("ServiceDescriptor")
package static let methodDescriptor: Self = .grpcCore("MethodDescriptor")

Expand All @@ -80,5 +84,8 @@ extension ExistingTypeDescription {

package static let callOptions: Self = .grpcCore("CallOptions")
package static let metadata: Self = .grpcCore("Metadata")
package static let grpcClient: Self = .grpcCore("GRPCClient")

package static func grpcClient(genericOver transport: String) -> Self {
.generic(wrapper: .grpcCore("GRPCClient"), wrapped: [.member(transport)])
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -194,9 +194,12 @@ extension ClientRPCExecutor.RetryExecutor {
}

@inlinable
func executeAttempt<R: Sendable>(
func executeAttempt<R: Sendable, Bytes: GRPCContiguousBytes>(
context: ClientContext,
stream: RPCStream<ClientTransport.Inbound, ClientTransport.Outbound>,
stream: RPCStream<
RPCAsyncSequence<RPCResponsePart<Bytes>, any Error>,
RPCWriter<RPCRequestPart<Bytes>>.Closable
>,
metadata: Metadata,
retryStream: BroadcastAsyncSequence<Input>,
method: MethodDescriptor,
Expand Down
7 changes: 5 additions & 2 deletions Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor.swift
Original file line number Diff line number Diff line change
Expand Up @@ -113,15 +113,18 @@ extension ClientRPCExecutor {
/// - stream: The stream to excecute the RPC on.
/// - Returns: The deserialized response.
@inlinable // would be private
static func _execute<Input: Sendable, Output: Sendable>(
static func _execute<Input: Sendable, Output: Sendable, Bytes: GRPCContiguousBytes>(
in group: inout TaskGroup<Void>,
context: ClientContext,
request: StreamingClientRequest<Input>,
attempt: Int,
serializer: some MessageSerializer<Input>,
deserializer: some MessageDeserializer<Output>,
interceptors: [any ClientInterceptor],
stream: RPCStream<ClientTransport.Inbound, ClientTransport.Outbound>
stream: RPCStream<
RPCAsyncSequence<RPCResponsePart<Bytes>, any Error>,
RPCWriter<RPCRequestPart<Bytes>>.Closable
>
) async -> StreamingClientResponse<Output> {

if interceptors.isEmpty {
Expand Down
27 changes: 17 additions & 10 deletions Sources/GRPCCore/Call/Client/Internal/ClientStreamExecutor.swift
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,17 @@ internal enum ClientStreamExecutor {
/// - stream: The stream to excecute the RPC on.
/// - Returns: A streamed response.
@inlinable
static func execute<Input: Sendable, Output: Sendable>(
static func execute<Input: Sendable, Output: Sendable, Bytes: GRPCContiguousBytes>(
in group: inout TaskGroup<Void>,
request: StreamingClientRequest<Input>,
context: ClientContext,
attempt: Int,
serializer: some MessageSerializer<Input>,
deserializer: some MessageDeserializer<Output>,
stream: RPCStream<ClientTransport.Inbound, ClientTransport.Outbound>
stream: RPCStream<
RPCAsyncSequence<RPCResponsePart<Bytes>, any Error>,
RPCWriter<RPCRequestPart<Bytes>>.Closable
>
) async -> StreamingClientResponse<Output> {
// Let the server know this is a retry.
var metadata = request.metadata
Expand Down Expand Up @@ -83,8 +86,8 @@ internal enum ClientStreamExecutor {
}

@inlinable // would be private
static func _processRequest<Outbound>(
on stream: some ClosableRPCWriterProtocol<RPCRequestPart>,
static func _processRequest<Outbound, Bytes: GRPCContiguousBytes>(
on stream: some ClosableRPCWriterProtocol<RPCRequestPart<Bytes>>,
request: StreamingClientRequest<Outbound>,
serializer: some MessageSerializer<Outbound>
) async {
Expand All @@ -104,16 +107,19 @@ internal enum ClientStreamExecutor {
}

@usableFromInline
enum OnFirstResponsePart: Sendable {
case metadata(Metadata, UnsafeTransfer<ClientTransport.Inbound.AsyncIterator>)
enum OnFirstResponsePart<Bytes: GRPCContiguousBytes>: Sendable {
case metadata(
Metadata,
UnsafeTransfer<RPCAsyncSequence<RPCResponsePart<Bytes>, any Error>.AsyncIterator>
)
case status(Status, Metadata)
case failed(RPCError)
}

@inlinable // would be private
static func _waitForFirstResponsePart(
on stream: ClientTransport.Inbound
) async -> OnFirstResponsePart {
static func _waitForFirstResponsePart<Bytes: GRPCContiguousBytes>(
on stream: RPCAsyncSequence<RPCResponsePart<Bytes>, any Error>
) async -> OnFirstResponsePart<Bytes> {
var iterator = stream.makeAsyncIterator()
let result = await Result<OnFirstResponsePart, any Error> {
switch try await iterator.next() {
Expand Down Expand Up @@ -165,7 +171,8 @@ internal enum ClientStreamExecutor {

@usableFromInline
struct RawBodyPartToMessageSequence<
Base: AsyncSequence<RPCResponsePart, Failure>,
Base: AsyncSequence<RPCResponsePart<Bytes>, Failure>,
Bytes: GRPCContiguousBytes,
Message: Sendable,
Deserializer: MessageDeserializer<Message>,
Failure: Error
Expand Down
36 changes: 18 additions & 18 deletions Sources/GRPCCore/Call/Server/Internal/ServerRPCExecutor.swift
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@ struct ServerRPCExecutor {
/// interceptors will be called in the order of the array.
/// - handler: A handler which turns the request into a response.
@inlinable
static func execute<Input, Output>(
static func execute<Input, Output, Bytes: GRPCContiguousBytes>(
context: ServerContext,
stream: RPCStream<
RPCAsyncSequence<RPCRequestPart, any Error>,
RPCWriter<RPCResponsePart>.Closable
RPCAsyncSequence<RPCRequestPart<Bytes>, any Error>,
RPCWriter<RPCResponsePart<Bytes>>.Closable
>,
deserializer: some MessageDeserializer<Input>,
serializer: some MessageSerializer<Output>,
Expand Down Expand Up @@ -66,11 +66,11 @@ struct ServerRPCExecutor {
}

@inlinable
static func _execute<Input, Output>(
static func _execute<Input, Output, Bytes: GRPCContiguousBytes>(
context: ServerContext,
metadata: Metadata,
inbound: UnsafeTransfer<RPCAsyncSequence<RPCRequestPart, any Error>.AsyncIterator>,
outbound: RPCWriter<RPCResponsePart>.Closable,
inbound: UnsafeTransfer<RPCAsyncSequence<RPCRequestPart<Bytes>, any Error>.AsyncIterator>,
outbound: RPCWriter<RPCResponsePart<Bytes>>.Closable,
deserializer: some MessageDeserializer<Input>,
serializer: some MessageSerializer<Output>,
interceptors: [any ServerInterceptor],
Expand Down Expand Up @@ -106,12 +106,12 @@ struct ServerRPCExecutor {
}

@inlinable
static func _processRPCWithTimeout<Input, Output>(
static func _processRPCWithTimeout<Input, Output, Bytes: GRPCContiguousBytes>(
timeout: Duration,
context: ServerContext,
metadata: Metadata,
inbound: UnsafeTransfer<RPCAsyncSequence<RPCRequestPart, any Error>.AsyncIterator>,
outbound: RPCWriter<RPCResponsePart>.Closable,
inbound: UnsafeTransfer<RPCAsyncSequence<RPCRequestPart<Bytes>, any Error>.AsyncIterator>,
outbound: RPCWriter<RPCResponsePart<Bytes>>.Closable,
deserializer: some MessageDeserializer<Input>,
serializer: some MessageSerializer<Output>,
interceptors: [any ServerInterceptor],
Expand Down Expand Up @@ -147,11 +147,11 @@ struct ServerRPCExecutor {
}

@inlinable
static func _processRPC<Input, Output>(
static func _processRPC<Input, Output, Bytes: GRPCContiguousBytes>(
context: ServerContext,
metadata: Metadata,
inbound: UnsafeTransfer<RPCAsyncSequence<RPCRequestPart, any Error>.AsyncIterator>,
outbound: RPCWriter<RPCResponsePart>.Closable,
inbound: UnsafeTransfer<RPCAsyncSequence<RPCRequestPart<Bytes>, any Error>.AsyncIterator>,
outbound: RPCWriter<RPCResponsePart<Bytes>>.Closable,
deserializer: some MessageDeserializer<Input>,
serializer: some MessageSerializer<Output>,
interceptors: [any ServerInterceptor],
Expand Down Expand Up @@ -235,12 +235,12 @@ struct ServerRPCExecutor {
}

@inlinable
static func _waitForFirstRequestPart(
inbound: RPCAsyncSequence<RPCRequestPart, any Error>
) async -> OnFirstRequestPart {
static func _waitForFirstRequestPart<Bytes: GRPCContiguousBytes>(
inbound: RPCAsyncSequence<RPCRequestPart<Bytes>, any Error>
) async -> OnFirstRequestPart<Bytes> {
var iterator = inbound.makeAsyncIterator()
let part = await Result { try await iterator.next() }
let onFirstRequestPart: OnFirstRequestPart
let onFirstRequestPart: OnFirstRequestPart<Bytes>

switch part {
case .success(.metadata(let metadata)):
Expand Down Expand Up @@ -275,10 +275,10 @@ struct ServerRPCExecutor {
}

@usableFromInline
enum OnFirstRequestPart {
enum OnFirstRequestPart<Bytes: GRPCContiguousBytes> {
case process(
Metadata,
UnsafeTransfer<RPCAsyncSequence<RPCRequestPart, any Error>.AsyncIterator>
UnsafeTransfer<RPCAsyncSequence<RPCRequestPart<Bytes>, any Error>.AsyncIterator>
)
case reject(RPCError)
}
Expand Down
14 changes: 7 additions & 7 deletions Sources/GRPCCore/Call/Server/RPCRouter.swift
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,15 @@
/// 1. Remove individual methods by calling ``removeHandler(forMethod:)``, or
/// 2. Implement ``RegistrableRPCService/registerMethods(with:)`` to register only the methods you
/// want to be served.
public struct RPCRouter: Sendable {
public struct RPCRouter<Transport: ServerTransport>: Sendable {
@usableFromInline
struct RPCHandler: Sendable {
@usableFromInline
let _fn:
@Sendable (
_ stream: RPCStream<
RPCAsyncSequence<RPCRequestPart, any Error>,
RPCWriter<RPCResponsePart>.Closable
RPCAsyncSequence<RPCRequestPart<Transport.Bytes>, any Error>,
RPCWriter<RPCResponsePart<Transport.Bytes>>.Closable
>,
_ context: ServerContext,
_ interceptors: [any ServerInterceptor]
Expand Down Expand Up @@ -73,8 +73,8 @@ public struct RPCRouter: Sendable {
@inlinable
func handle(
stream: RPCStream<
RPCAsyncSequence<RPCRequestPart, any Error>,
RPCWriter<RPCResponsePart>.Closable
RPCAsyncSequence<RPCRequestPart<Transport.Bytes>, any Error>,
RPCWriter<RPCResponsePart<Transport.Bytes>>.Closable
>,
context: ServerContext,
interceptors: [any ServerInterceptor]
Expand Down Expand Up @@ -171,8 +171,8 @@ public struct RPCRouter: Sendable {
extension RPCRouter {
internal func handle(
stream: RPCStream<
RPCAsyncSequence<RPCRequestPart, any Error>,
RPCWriter<RPCResponsePart>.Closable
RPCAsyncSequence<RPCRequestPart<Transport.Bytes>, any Error>,
RPCWriter<RPCResponsePart<Transport.Bytes>>.Closable
>,
context: ServerContext
) async {
Expand Down
2 changes: 1 addition & 1 deletion Sources/GRPCCore/Call/Server/RegistrableRPCService.swift
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,5 @@ public protocol RegistrableRPCService: Sendable {
/// Registers methods to server with the provided ``RPCRouter``.
///
/// - Parameter router: The router to register methods with.
func registerMethods(with router: inout RPCRouter)
func registerMethods<Transport: ServerTransport>(with router: inout RPCRouter<Transport>)
}
4 changes: 2 additions & 2 deletions Sources/GRPCCore/Coding/Coding.swift
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public protocol MessageSerializer<Message>: Sendable {
///
/// - Parameter message: The message to serialize.
/// - Returns: The serialized bytes of a message.
func serialize(_ message: Message) throws -> [UInt8]
func serialize<Bytes: GRPCContiguousBytes>(_ message: Message) throws -> Bytes
}

/// Deserializes a sequence of bytes into a message.
Expand All @@ -49,5 +49,5 @@ public protocol MessageDeserializer<Message>: Sendable {
///
/// - Parameter serializedMessageBytes: The bytes to deserialize.
/// - Returns: The deserialized message.
func deserialize(_ serializedMessageBytes: [UInt8]) throws -> Message
func deserialize<Bytes: GRPCContiguousBytes>(_ serializedMessageBytes: Bytes) throws -> Message
}
Loading

0 comments on commit 93d0536

Please sign in to comment.