Skip to content

Commit

Permalink
split: change traffic splits to require a profile
Browse files Browse the repository at this point in the history
This branch builds on #963 by changing the traffic split service to
require its target type implement `Param<profiles::Receiver>` rather
than requiring `Param<Option<profiles::Receiver>>`. This lets us
simplify the implementation significantly by removing the "default" case
that's built when no profile was discovered.

Depends on #963

Signed-off-by: Eliza Weisman <eliza@buoyant.io>
  • Loading branch information
hawkw committed Apr 13, 2021
1 parent 6584c24 commit e92e8ce
Showing 1 changed file with 83 additions and 112 deletions.
195 changes: 83 additions & 112 deletions linkerd/service-profiles/src/split.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,7 @@ pub struct NewSplit<N, S, Req> {
_service: PhantomData<fn(Req) -> S>,
}

pub enum Split<T, N, S, Req> {
Default(S),
Split(Box<Inner<T, N, S, Req>>),
}

pub struct Inner<T, N, S, Req> {
pub struct Split<T, N, S, Req> {
rng: SmallRng,
rx: Pin<Box<dyn Stream<Item = Profile> + Send + Sync>>,
target: T,
Expand All @@ -58,56 +53,43 @@ impl<N: Clone, S, Req> Clone for NewSplit<N, S, Req> {

impl<T, N, S, Req> NewService<T> for NewSplit<N, S, Req>
where
T: Clone + Param<LookupAddr> + Param<Option<Receiver>>,
T: Clone + Param<LookupAddr> + Param<Receiver>,
N: NewService<(Option<ConcreteAddr>, T), Service = S> + Clone,
S: tower::Service<Req>,
S::Error: Into<Error>,
{
type Service = Split<T, N, S, Req>;

fn new_service(&mut self, target: T) -> Self::Service {
// If there is a profile, it is used to configure one or more inner
// services and a concrete address is provided so that the endpoint
// discovery is performed.
//
// Otherwise, profile lookup was rejected and, therefore, no concrete
// address is provided.
match target.param() {
None => {
trace!("Building default service");
Split::Default(self.inner.new_service((None, target)))
}
Some(rx) => {
let mut targets = rx.borrow().targets.clone();
if targets.is_empty() {
let LookupAddr(addr) = target.param();
targets.push(Target { addr, weight: 1 })
}
trace!(?targets, "Building split service");

let mut addrs = IndexSet::with_capacity(targets.len());
let mut weights = Vec::with_capacity(targets.len());
let mut services = ReadyCache::default();
let mut new_service = self.inner.clone();
for Target { weight, addr } in targets.into_iter() {
services.push(
addr.clone(),
new_service.new_service((Some(ConcreteAddr(addr.clone())), target.clone())),
);
addrs.insert(addr);
weights.push(weight);
}
let rx: Receiver = target.param();
let mut targets = rx.borrow().targets.clone();
if targets.is_empty() {
let LookupAddr(addr) = target.param();
targets.push(Target { addr, weight: 1 })
}
trace!(?targets, "Building split service");

let mut addrs = IndexSet::with_capacity(targets.len());
let mut weights = Vec::with_capacity(targets.len());
let mut services = ReadyCache::default();
let mut new_service = self.inner.clone();
for Target { weight, addr } in targets.into_iter() {
services.push(
addr.clone(),
new_service.new_service((Some(ConcreteAddr(addr.clone())), target.clone())),
);
addrs.insert(addr);
weights.push(weight);
}

Split::Split(Box::new(Inner {
rx: crate::stream_profile(rx),
target,
new_service,
services,
addrs,
distribution: WeightedIndex::new(weights).unwrap(),
rng: SmallRng::from_rng(&mut thread_rng()).expect("RNG must initialize"),
}))
}
Split {
rx: crate::stream_profile(rx),
target,
new_service,
services,
addrs,
distribution: WeightedIndex::new(weights).unwrap(),
rng: SmallRng::from_rng(&mut thread_rng()).expect("RNG must initialize"),
}
}
}
Expand All @@ -129,77 +111,66 @@ where
type Future = Pin<Box<dyn Future<Output = Result<S::Response, Error>> + Send + 'static>>;

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
match self {
Self::Default(ref mut svc) => svc.poll_ready(cx).map_err(Into::into),
Self::Split(ref mut inner) => {
let mut update = None;
while let Poll::Ready(Some(up)) = inner.rx.as_mut().poll_next(cx) {
update = Some(up.clone());
}
let mut update = None;
while let Poll::Ready(Some(up)) = self.rx.as_mut().poll_next(cx) {
update = Some(up.clone());
}

// Every time the profile updates, rebuild the distribution, reusing
// services that existed in the prior state.
if let Some(Profile { mut targets, .. }) = update {
if targets.is_empty() {
let LookupAddr(addr) = inner.target.param();
targets.push(Target { addr, weight: 1 })
}
debug!(?targets, "Updating");

// Replace the old set of addresses with an empty set. The
// prior set is used to determine whether a new service
// needs to be created and what stale services should be
// removed.
let mut prior_addrs =
std::mem::replace(&mut inner.addrs, IndexSet::with_capacity(targets.len()));
let mut weights = Vec::with_capacity(targets.len());

// Create an updated distribution and set of services.
for Target { weight, addr } in targets.into_iter() {
// Reuse the prior services whenever possible.
if !prior_addrs.remove(&addr) {
debug!(%addr, "Creating target");
let svc = inner.new_service.new_service((
Some(ConcreteAddr(addr.clone())),
inner.target.clone(),
));
inner.services.push(addr.clone(), svc);
} else {
trace!(%addr, "Target already exists");
}
inner.addrs.insert(addr);
weights.push(weight);
}

inner.distribution = WeightedIndex::new(weights).unwrap();

// Remove all prior services that did not exist in the new
// set of targets.
for addr in prior_addrs.into_iter() {
inner.services.evict(&addr);
}
// Every time the profile updates, rebuild the distribution, reusing
// services that existed in the prior state.
if let Some(Profile { mut targets, .. }) = update {
if targets.is_empty() {
let LookupAddr(addr) = self.target.param();
targets.push(Target { addr, weight: 1 })
}
debug!(?targets, "Updating");

// Replace the old set of addresses with an empty set. The
// prior set is used to determine whether a new service
// needs to be created and what stale services should be
// removed.
let mut prior_addrs =
std::mem::replace(&mut self.addrs, IndexSet::with_capacity(targets.len()));
let mut weights = Vec::with_capacity(targets.len());

// Create an updated distribution and set of services.
for Target { weight, addr } in targets.into_iter() {
// Reuse the prior services whenever possible.
if !prior_addrs.remove(&addr) {
debug!(%addr, "Creating target");
let svc = self
.new_service
.new_service((Some(ConcreteAddr(addr.clone())), self.target.clone()));
self.services.push(addr.clone(), svc);
} else {
trace!(%addr, "Target already exists");
}
self.addrs.insert(addr);
weights.push(weight);
}

self.distribution = WeightedIndex::new(weights).unwrap();

// Wait for all target services to be ready. If any services fail, then
// the whole service fails.
Poll::Ready(ready!(inner.services.poll_pending(cx)).map_err(Into::into))
// Remove all prior services that did not exist in the new
// set of targets.
for addr in prior_addrs.into_iter() {
self.services.evict(&addr);
}
}

// Wait for all target services to be ready. If any services fail, then
// the whole service fails.
Poll::Ready(ready!(self.services.poll_pending(cx)).map_err(Into::into))
}

fn call(&mut self, req: Req) -> Self::Future {
match self {
Self::Default(ref mut svc) => Box::pin(svc.call(req).err_into::<Error>()),
Self::Split(ref mut inner) => {
let idx = if inner.addrs.len() == 1 {
0
} else {
inner.distribution.sample(&mut inner.rng)
};
let addr = inner.addrs.get_index(idx).expect("invalid index");
trace!(?addr, "Dispatching");
Box::pin(inner.services.call_ready(addr, req).err_into::<Error>())
}
}
let idx = if self.addrs.len() == 1 {
0
} else {
self.distribution.sample(&mut self.rng)
};
let addr = self.addrs.get_index(idx).expect("invalid index");
trace!(?addr, "Dispatching");
Box::pin(self.services.call_ready(addr, req).err_into::<Error>())
}
}

0 comments on commit e92e8ce

Please sign in to comment.