Skip to content

Commit

Permalink
storage controller: make proxying of GETs to pageservers more robust (n…
Browse files Browse the repository at this point in the history
…eondatabase#9065)

## Problem

These commits are split off from
https://github.com/neondatabase/neon/pull/8971/commits where I was
fixing this to make a better scale test pass -- Vlad also independently
recognized these issues with cloudbench in
neondatabase#9062.

1. The storage controller proxies GET requests to pageservers based on
their intent, not the ground truth of where they're really attached.
2. Proxied requests can race with scheduling to tenants, resulting in
404 responses if the request hits the wrong pageserver.

Closes: neondatabase#9062

## Summary of changes

1. If a shard has a running reconciler, then use the database
generation_pageserver to decide who to proxy the request to
2. If such a request gets a 404 response and its scheduled node has
changed since the request was dispatched.
  • Loading branch information
jcsp authored Sep 25, 2024
1 parent 2cf47b1 commit 4b711ca
Show file tree
Hide file tree
Showing 4 changed files with 157 additions and 26 deletions.
23 changes: 18 additions & 5 deletions storage_controller/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,7 @@ async fn handle_tenant_timeline_passthrough(
tracing::info!("Proxying request for tenant {} ({})", tenant_id, path);

// Find the node that holds shard zero
let (node, tenant_shard_id) = service.tenant_shard0_node(tenant_id)?;
let (node, tenant_shard_id) = service.tenant_shard0_node(tenant_id).await?;

// Callers will always pass an unsharded tenant ID. Before proxying, we must
// rewrite this to a shard-aware shard zero ID.
Expand Down Expand Up @@ -545,10 +545,10 @@ async fn handle_tenant_timeline_passthrough(
let _timer = latency.start_timer(labels.clone());

let client = mgmt_api::Client::new(node.base_url(), service.get_config().jwt_token.as_deref());
let resp = client.get_raw(path).await.map_err(|_e|
// FIXME: give APiError a proper Unavailable variant. We return 503 here because
// if we can't successfully send a request to the pageserver, we aren't available.
ApiError::ShuttingDown)?;
let resp = client.get_raw(path).await.map_err(|e|
// We return 503 here because if we can't successfully send a request to the pageserver,
// either we aren't available or the pageserver is unavailable.
ApiError::ResourceUnavailable(format!("Error sending pageserver API request to {node}: {e}").into()))?;

if !resp.status().is_success() {
let error_counter = &METRICS_REGISTRY
Expand All @@ -557,6 +557,19 @@ async fn handle_tenant_timeline_passthrough(
error_counter.inc(labels);
}

// Transform 404 into 503 if we raced with a migration
if resp.status() == reqwest::StatusCode::NOT_FOUND {
// Look up node again: if we migrated it will be different
let (new_node, _tenant_shard_id) = service.tenant_shard0_node(tenant_id).await?;
if new_node.get_id() != node.get_id() {
// Rather than retry here, send the client a 503 to prompt a retry: this matches
// the pageserver's use of 503, and all clients calling this API should retry on 503.
return Err(ApiError::ResourceUnavailable(
format!("Pageserver {node} returned 404, was migrated to {new_node}").into(),
));
}
}

// We have a reqest::Response, would like a http::Response
let mut builder = hyper::Response::builder().status(map_reqwest_hyper_status(resp.status())?);
for (k, v) in resp.headers() {
Expand Down
4 changes: 4 additions & 0 deletions storage_controller/src/reconciler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,8 @@ impl Reconciler {
}
}

pausable_failpoint!("reconciler-live-migrate-pre-generation-inc");

// Increment generation before attaching to new pageserver
self.generation = Some(
self.persistence
Expand Down Expand Up @@ -617,6 +619,8 @@ impl Reconciler {
},
);

pausable_failpoint!("reconciler-live-migrate-post-detach");

tracing::info!("🔁 Switching to AttachedSingle mode on node {dest_ps}",);
let dest_final_conf = build_location_config(
&self.shard,
Expand Down
74 changes: 53 additions & 21 deletions storage_controller/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3508,34 +3508,66 @@ impl Service {

/// When you need to send an HTTP request to the pageserver that holds shard0 of a tenant, this
/// function looks up and returns node. If the tenant isn't found, returns Err(ApiError::NotFound)
pub(crate) fn tenant_shard0_node(
pub(crate) async fn tenant_shard0_node(
&self,
tenant_id: TenantId,
) -> Result<(Node, TenantShardId), ApiError> {
let locked = self.inner.read().unwrap();
let Some((tenant_shard_id, shard)) = locked
.tenants
.range(TenantShardId::tenant_range(tenant_id))
.next()
else {
return Err(ApiError::NotFound(
anyhow::anyhow!("Tenant {tenant_id} not found").into(),
));
// Look up in-memory state and maybe use the node from there.
{
let locked = self.inner.read().unwrap();
let Some((tenant_shard_id, shard)) = locked
.tenants
.range(TenantShardId::tenant_range(tenant_id))
.next()
else {
return Err(ApiError::NotFound(
anyhow::anyhow!("Tenant {tenant_id} not found").into(),
));
};

let Some(intent_node_id) = shard.intent.get_attached() else {
tracing::warn!(
tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(),
"Shard not scheduled (policy {:?}), cannot generate pass-through URL",
shard.policy
);
return Err(ApiError::Conflict(
"Cannot call timeline API on non-attached tenant".to_string(),
));
};

if shard.reconciler.is_none() {
// Optimization: while no reconcile is in flight, we may trust our in-memory state
// to tell us which pageserver to use. Otherwise we will fall through and hit the database
let Some(node) = locked.nodes.get(intent_node_id) else {
// This should never happen
return Err(ApiError::InternalServerError(anyhow::anyhow!(
"Shard refers to nonexistent node"
)));
};
return Ok((node.clone(), *tenant_shard_id));
}
};

// TODO: should use the ID last published to compute_hook, rather than the intent: the intent might
// point to somewhere we haven't attached yet.
let Some(node_id) = shard.intent.get_attached() else {
tracing::warn!(
tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(),
"Shard not scheduled (policy {:?}), cannot generate pass-through URL",
shard.policy
);
return Err(ApiError::Conflict(
"Cannot call timeline API on non-attached tenant".to_string(),
// Look up the latest attached pageserver location from the database
// generation state: this will reflect the progress of any ongoing migration.
// Note that it is not guaranteed to _stay_ here, our caller must still handle
// the case where they call through to the pageserver and get a 404.
let db_result = self.persistence.tenant_generations(tenant_id).await?;
let Some(ShardGenerationState {
tenant_shard_id,
generation: _,
generation_pageserver: Some(node_id),
}) = db_result.first()
else {
// This can happen if we raced with a tenant deletion or a shard split. On a retry
// the caller will either succeed (shard split case), get a proper 404 (deletion case),
// or a conflict response (case where tenant was detached in background)
return Err(ApiError::ResourceUnavailable(
"Shard {} not found in database, or is not attached".into(),
));
};

let locked = self.inner.read().unwrap();
let Some(node) = locked.nodes.get(node_id) else {
// This should never happen
return Err(ApiError::InternalServerError(anyhow::anyhow!(
Expand Down
82 changes: 82 additions & 0 deletions test_runner/regress/test_storage_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import time
from collections import defaultdict
from datetime import datetime, timezone
from enum import Enum
from typing import Any, Dict, List, Optional, Set, Tuple, Union

import pytest
Expand Down Expand Up @@ -2466,6 +2467,87 @@ def has_hit_migration_failpoint():
raise


class MigrationFailpoints(Enum):
# While only the origin is attached
PRE_GENERATION_INC = "reconciler-live-migrate-pre-generation-inc"
# While both locations are attached
POST_NOTIFY = "reconciler-live-migrate-post-notify"
# While only the destination is attached
POST_DETACH = "reconciler-live-migrate-post-detach"


@pytest.mark.parametrize(
"migration_failpoint",
[
MigrationFailpoints.PRE_GENERATION_INC,
MigrationFailpoints.POST_NOTIFY,
MigrationFailpoints.POST_DETACH,
],
)
def test_storage_controller_proxy_during_migration(
neon_env_builder: NeonEnvBuilder, migration_failpoint: MigrationFailpoints
):
"""
If we send a proxied GET request to the controller during a migration, it should route
the request to whichever pageserver was most recently issued a generation.
Reproducer for https://github.com/neondatabase/neon/issues/9062
"""
neon_env_builder.num_pageservers = 2
neon_env_builder.enable_pageserver_remote_storage(s3_storage())
env = neon_env_builder.init_configs()
env.start()

tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
env.neon_cli.create_tenant(tenant_id, timeline_id)

# Activate a failpoint that will cause live migration to get stuck _after_ the generation has been issued
# to the new pageserver: this should result in requests routed to the new pageserver.
env.storage_controller.configure_failpoints((migration_failpoint.value, "pause"))

origin_pageserver = env.get_tenant_pageserver(tenant_id)
dest_ps_id = [p.id for p in env.pageservers if p.id != origin_pageserver.id][0]

try:
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
migrate_fut = executor.submit(
env.storage_controller.tenant_shard_migrate,
TenantShardId(tenant_id, 0, 0),
dest_ps_id,
)

def has_hit_migration_failpoint():
expr = f"at failpoint {str(migration_failpoint.value)}"
log.info(expr)
assert env.storage_controller.log_contains(expr)

wait_until(10, 1, has_hit_migration_failpoint)

# This request should be routed to whichever pageserver holds the highest generation
tenant_info = env.storage_controller.pageserver_api().tenant_status(
tenant_id,
)

if migration_failpoint in (
MigrationFailpoints.POST_NOTIFY,
MigrationFailpoints.POST_DETACH,
):
# We expect request to land on the destination
assert tenant_info["generation"] == 2
elif migration_failpoint == MigrationFailpoints.PRE_GENERATION_INC:
# We expect request to land on the origin
assert tenant_info["generation"] == 1

# Eventually migration completes
env.storage_controller.configure_failpoints((migration_failpoint.value, "off"))
migrate_fut.result()
except:
# Always disable 'pause' failpoints, even on failure, to avoid hanging in shutdown
env.storage_controller.configure_failpoints((migration_failpoint.value, "off"))
raise


@run_only_on_default_postgres("this is like a 'unit test' against storcon db")
def test_safekeeper_deployment_time_update(neon_env_builder: NeonEnvBuilder):
env = neon_env_builder.init_configs()
Expand Down

0 comments on commit 4b711ca

Please sign in to comment.