Skip to content

Commit

Permalink
outbound: move target types into stack modules
Browse files Browse the repository at this point in the history
Depends on #967

This branch refactors the `linkerd-app-outbound` crate so that each
stack target type is now defined in the module for the stack that uses
it, rather than in one big `target` module. For example, `Logical` is
now defined in `logical.rs` and `Endpoint` is defined in `endpoint.rs`.

There should be no functional change in this PR --- it just moves code
around.
  • Loading branch information
hawkw committed Apr 13, 2021
1 parent dbc3036 commit c3f5d48
Show file tree
Hide file tree
Showing 9 changed files with 364 additions and 368 deletions.
186 changes: 184 additions & 2 deletions linkerd/app/outbound/src/endpoint.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,31 @@
use crate::{target::Endpoint, Outbound};
use linkerd_app_core::{svc, tls};
use crate::{
logical::{Concrete, Logical},
tcp::opaque_transport,
Accept, Outbound,
};
use linkerd_app_core::{
metrics,
proxy::{api_resolve::Metadata, http, resolve::map_endpoint::MapEndpoint},
svc::{self, Param},
tls,
transport::{self, Remote, ServerAddr},
transport_header, Addr, Conditional,
};
use std::net::SocketAddr;

#[derive(Clone, Debug)]
pub struct Endpoint<P> {
pub addr: Remote<ServerAddr>,
pub tls: tls::ConditionalClientTls,
pub metadata: Metadata,
pub logical_addr: Addr,
pub protocol: P,
}

#[derive(Copy, Clone)]
pub struct FromMetadata {
pub identity_disabled: bool,
}

impl<E> Outbound<E> {
pub fn push_into_endpoint<P, T>(
Expand Down Expand Up @@ -29,3 +55,159 @@ impl<E> Outbound<E> {
}
}
}

// === impl Endpoint ===

impl<P> Endpoint<P> {
pub fn no_tls(reason: tls::NoClientTls) -> impl Fn(Accept<P>) -> Self {
move |accept| Self::from((reason, accept))
}
}

impl<P> From<(tls::NoClientTls, Logical<P>)> for Endpoint<P> {
fn from((reason, logical): (tls::NoClientTls, Logical<P>)) -> Self {
match logical.profile.borrow().endpoint.clone() {
None => Self {
addr: Remote(ServerAddr(logical.orig_dst.into())),
metadata: Metadata::default(),
tls: Conditional::None(reason),
logical_addr: logical.addr(),
protocol: logical.protocol,
},
Some((addr, metadata)) => Self {
addr: Remote(ServerAddr(addr)),
tls: FromMetadata::client_tls(&metadata, reason),
metadata,
logical_addr: logical.addr(),
protocol: logical.protocol,
},
}
}
}

impl<P> From<(tls::NoClientTls, Accept<P>)> for Endpoint<P> {
fn from((reason, accept): (tls::NoClientTls, Accept<P>)) -> Self {
Self {
addr: Remote(ServerAddr(accept.orig_dst.into())),
metadata: Metadata::default(),
tls: Conditional::None(reason),
logical_addr: accept.orig_dst.0.into(),
protocol: accept.protocol,
}
}
}

impl<P> Param<Remote<ServerAddr>> for Endpoint<P> {
fn param(&self) -> Remote<ServerAddr> {
self.addr
}
}

impl<P> Param<tls::ConditionalClientTls> for Endpoint<P> {
fn param(&self) -> tls::ConditionalClientTls {
self.tls.clone()
}
}

impl<P> Param<Option<opaque_transport::PortOverride>> for Endpoint<P> {
fn param(&self) -> Option<opaque_transport::PortOverride> {
self.metadata
.opaque_transport_port()
.map(opaque_transport::PortOverride)
}
}

impl<P> Param<Option<http::AuthorityOverride>> for Endpoint<P> {
fn param(&self) -> Option<http::AuthorityOverride> {
self.metadata
.authority_override()
.cloned()
.map(http::AuthorityOverride)
}
}

impl<P> Param<transport::labels::Key> for Endpoint<P> {
fn param(&self) -> transport::labels::Key {
transport::labels::Key::OutboundConnect(self.param())
}
}

impl<P> Param<metrics::OutboundEndpointLabels> for Endpoint<P> {
fn param(&self) -> metrics::OutboundEndpointLabels {
metrics::OutboundEndpointLabels {
authority: Some(self.logical_addr.to_http_authority()),
labels: metrics::prefix_labels("dst", self.metadata.labels().iter()),
server_id: self.tls.clone(),
target_addr: self.addr.into(),
}
}
}

impl<P> Param<metrics::EndpointLabels> for Endpoint<P> {
fn param(&self) -> metrics::EndpointLabels {
Param::<metrics::OutboundEndpointLabels>::param(self).into()
}
}

impl<P: std::hash::Hash> std::hash::Hash for Endpoint<P> {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.addr.hash(state);
self.tls.hash(state);
self.logical_addr.hash(state);
self.protocol.hash(state);
}
}

// === EndpointFromMetadata ===

impl FromMetadata {
fn client_tls(metadata: &Metadata, reason: tls::NoClientTls) -> tls::ConditionalClientTls {
// If we're transporting an opaque protocol OR we're communicating with
// a gateway, then set an ALPN value indicating support for a transport
// header.
let use_transport_header =
metadata.opaque_transport_port().is_some() || metadata.authority_override().is_some();

metadata
.identity()
.cloned()
.map(move |server_id| {
Conditional::Some(tls::ClientTls {
server_id,
alpn: if use_transport_header {
Some(tls::client::AlpnProtocols(vec![
transport_header::PROTOCOL.into()
]))
} else {
None
},
})
})
.unwrap_or(Conditional::None(reason))
}
}

impl<P: Copy + std::fmt::Debug> MapEndpoint<Concrete<P>, Metadata> for FromMetadata {
type Out = Endpoint<P>;

fn map_endpoint(
&self,
concrete: &Concrete<P>,
addr: SocketAddr,
metadata: Metadata,
) -> Self::Out {
tracing::trace!(%addr, ?metadata, ?concrete, "Resolved endpoint");
let tls = if self.identity_disabled {
tls::ConditionalClientTls::None(tls::NoClientTls::Disabled)
} else {
Self::client_tls(&metadata, tls::NoClientTls::NotProvidedByServiceDiscovery)
};
Endpoint {
addr: Remote(ServerAddr(addr)),
tls,
metadata,
logical_addr: concrete.logical.addr(),
protocol: concrete.logical.protocol,
}
}
}
7 changes: 2 additions & 5 deletions linkerd/app/outbound/src/http/logical.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::{CanonicalDstHeader, Concrete, Endpoint, Logical};
use crate::{resolve, stack_labels, target, Outbound};
use crate::{endpoint, resolve, stack_labels, Outbound};
use linkerd_app_core::{
classify, config, profiles,
proxy::{
Expand Down Expand Up @@ -66,10 +66,7 @@ impl<E> Outbound<E> {
.check_service::<ConcreteAddr>()
.push_request_filter(|c: Concrete| Ok::<_, Never>(c.resolve))
.push(svc::layer::mk(move |inner| {
map_endpoint::Resolve::new(
target::EndpointFromMetadata { identity_disabled },
inner,
)
map_endpoint::Resolve::new(endpoint::FromMetadata { identity_disabled }, inner)
}))
.check_service::<Concrete>()
.into_inner();
Expand Down
8 changes: 4 additions & 4 deletions linkerd/app/outbound/src/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ use linkerd_app_core::{
};
use std::{net::SocketAddr, str::FromStr, sync::Arc};

pub type Accept = crate::target::Accept<Version>;
pub type Logical = crate::target::Logical<Version>;
pub type Concrete = crate::target::Concrete<Version>;
pub type Endpoint = crate::target::Endpoint<Version>;
pub type Accept = crate::Accept<Version>;
pub type Logical = crate::logical::Logical<Version>;
pub type Concrete = crate::logical::Concrete<Version>;
pub type Endpoint = crate::endpoint::Endpoint<Version>;

#[derive(Clone, Debug)]
pub struct CanonicalDstHeader(pub Addr);
Expand Down
34 changes: 31 additions & 3 deletions linkerd/app/outbound/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ pub mod http;
mod ingress;
pub mod logical;
mod resolve;
pub mod target;
pub mod tcp;
#[cfg(test)]
pub(crate) mod test_util;

use crate::http::SkipHttpDetection;
use linkerd_app_core::{
config::{ProxyConfig, ServerConfig},
io, metrics, profiles,
Expand All @@ -29,8 +29,8 @@ use linkerd_app_core::{
stack::{self, Param},
},
tls,
transport::{addrs::*, listen::Bind},
AddrMatch, Error, ProxyRuntime,
transport::{self, addrs::*, listen::Bind},
AddrMatch, Conditional, Error, ProxyRuntime,
};
use std::{collections::HashMap, future::Future, time::Duration};
use tracing::info;
Expand All @@ -56,6 +56,14 @@ pub struct Outbound<S> {
stack: svc::Stack<S>,
}

#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)]
pub struct Accept<P> {
pub orig_dst: OrigDstAddr,
pub protocol: P,
}

// === impl Outbound ===

impl Outbound<()> {
pub fn new(config: Config, runtime: ProxyRuntime) -> Self {
Self {
Expand Down Expand Up @@ -228,6 +236,26 @@ impl Outbound<()> {
}
}

// === impl Accept ===

impl<P> Param<transport::labels::Key> for Accept<P> {
fn param(&self) -> transport::labels::Key {
const NO_TLS: tls::ConditionalServerTls = Conditional::None(tls::NoServerTls::Loopback);
transport::labels::Key::accept(
transport::labels::Direction::Out,
NO_TLS,
self.orig_dst.into(),
)
}
}

// When a profile is not discovered, always enable protocol detection.
impl Param<SkipHttpDetection> for Accept<()> {
fn param(&self) -> SkipHttpDetection {
SkipHttpDetection(false)
}
}

fn stack_labels(proto: &'static str, name: &'static str) -> metrics::StackLabels {
metrics::StackLabels::outbound(proto, name)
}
Expand Down
Loading

0 comments on commit c3f5d48

Please sign in to comment.