Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support kubernetes service-accounts #538

Merged
merged 24 commits into from
Nov 24, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
UserId -> enum
  • Loading branch information
c-thiel committed Nov 22, 2024
commit 131cbad594b2c27b8d9c435fedc719e93bffdbd6
18 changes: 9 additions & 9 deletions crates/iceberg-catalog/src/implementations/postgres/user.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ pub(crate) async fn delete_user<'c, 'e: 'c, E: sqlx::Executor<'c, Database = sql
email = null
WHERE id = $1
"#,
id.inner(),
id.to_string(),
)
.execute(connection)
.await
Expand Down Expand Up @@ -211,7 +211,7 @@ pub(crate) async fn create_or_update_user<
DO UPDATE SET name = $2, email = $3, last_updated_with = $4, user_type = $5
returning (xmax = 0) AS created, id, name, email, created_at, updated_at, last_updated_with as "last_updated_with: DbUserLastUpdatedWith", user_type as "user_type: DbUserType"
"#,
id.inner(),
id.to_string(),
name,
email,
db_last_updated_with as _,
Expand Down Expand Up @@ -278,7 +278,7 @@ mod test {
async fn test_create_or_update_user(pool: sqlx::PgPool) {
let state = CatalogState::from_pools(pool.clone(), pool.clone());

let user_id = UserId::default_prefix("test_user_1").unwrap();
let user_id = UserId::oidc("test_user_1").unwrap();
let user_name = "Test User 1";

create_or_update_user(
Expand Down Expand Up @@ -345,7 +345,7 @@ mod test {
async fn test_search_user(pool: sqlx::PgPool) {
let state = CatalogState::from_pools(pool.clone(), pool.clone());

let user_id = UserId::default_prefix("test_user_1").unwrap();
let user_id = UserId::kubernetes("test_user_1").unwrap();
let user_name = "Test User 1";

create_or_update_user(
Expand All @@ -372,7 +372,7 @@ mod test {
async fn test_delete_user(pool: sqlx::PgPool) {
let state = CatalogState::from_pools(pool.clone(), pool.clone());

let user_id = UserId::default_prefix("test_user_1").unwrap();
let user_id = UserId::oidc("test_user_1").unwrap();
let user_name = "Test User 1";

create_or_update_user(
Expand Down Expand Up @@ -405,7 +405,7 @@ mod test {
assert_eq!(users.users.len(), 0);

// Delete non-existent user
let user_id = UserId::default_prefix("test_user_2").unwrap();
let user_id = UserId::oidc("test_user_2").unwrap();
let result = delete_user(user_id, &state.read_write.write_pool)
.await
.unwrap();
Expand All @@ -416,7 +416,7 @@ mod test {
async fn test_paginate_user(pool: sqlx::PgPool) {
let state = CatalogState::from_pools(pool.clone(), pool.clone());
for i in 0..10 {
let user_id = UserId::default_prefix(&format!("test_user_{i}")).unwrap();
let user_id = UserId::oidc(&format!("test_user_{i}")).unwrap();
let user_name = &format!("test user {i}");

create_or_update_user(
Expand Down Expand Up @@ -458,7 +458,7 @@ mod test {
assert_eq!(users.users.len(), 5);

for (uidx, u) in users.users.iter().enumerate() {
let user_id = UserId::default_prefix(&format!("test_user_{uidx}")).unwrap();
let user_id = UserId::oidc(&format!("test_user_{uidx}")).unwrap();
let user_name = format!("test user {uidx}");
assert_eq!(u.id, user_id);
assert_eq!(u.name, user_name);
Expand All @@ -480,7 +480,7 @@ mod test {

for (uidx, u) in users.users.iter().enumerate() {
let uidx = uidx + 5;
let user_id = UserId::default_prefix(&format!("test_user_{uidx}")).unwrap();
let user_id = UserId::oidc(&format!("test_user_{uidx}")).unwrap();
let user_name = format!("test user {uidx}");
assert_eq!(u.id, user_id);
assert_eq!(u.name, user_name);
Expand Down
116 changes: 41 additions & 75 deletions crates/iceberg-catalog/src/service/authn/identities.rs
Original file line number Diff line number Diff line change
@@ -1,73 +1,59 @@
use crate::api;
use crate::api::management::v1::user::UserType;
use crate::service::authn::Claims;
use crate::service::Actor;
use iceberg_ext::catalog::rest::{ErrorModel, IcebergErrorResponse};
use serde::{Deserialize, Serialize};

/// Unique identifier of a user in the system.
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, utoipa::ToSchema)]
pub struct UserId {
pub idp: String,
pub user_id: String,
pub enum UserId {
/// OIDC principal
OIDC(String),
/// K8s principal
Kubernetes(String),
}

impl TryFrom<String> for UserId {
type Error = IcebergErrorResponse;

fn try_from(s: String) -> Result<Self, Self::Error> {
if let Some((idp, user_id)) = s.split_once('/') {
UserId::idp_prefixed(user_id, idp)
} else {
UserId::default_prefix(&s)
match s.split_once('/') {
Some(("oidc", user_id)) => Ok(UserId::oidc(user_id)?),
Some(("kubernetes", user_id)) => Ok(UserId::kubernetes(user_id)?),
_ => Err(ErrorModel::bad_request(
format!("Invalid user id format: {s}"),
"InvalidUserId",
None,
)
.into()),
}
}
}

impl std::fmt::Display for UserId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(format!("{}/{}", self.idp, self.user_id).as_str())
match self {
UserId::OIDC(user_id) => write!(f, "oidc/{user_id}"),
UserId::Kubernetes(user_id) => write!(f, "kubernetes/{user_id}"),
}
}
}

impl UserId {
pub(crate) fn idp_prefixed(user_id: &str, idp_prefix: &str) -> api::Result<Self> {
Self::validate_len(user_id)?;
Self::no_illegal_chars(user_id)?;

// Lowercase all subjects
Ok(Self {
idp: idp_prefix.to_string(),
user_id: user_id.to_lowercase(),
})
}

pub(crate) fn default_prefix(subject: &str) -> api::Result<Self> {
fn validate_subject(subject: &str) -> api::Result<()> {
Self::validate_len(subject)?;

Self::no_illegal_chars(subject)?;

// Lowercase all subjects
let subject = subject.to_lowercase();

Ok(Self {
idp: "default".to_string(),
user_id: subject,
})
Ok(())
}

pub(super) fn try_from_claims(claims: &Claims) -> api::Result<Self> {
// For azure, the oid claim is permanent to the user account
// accross all Entra ID applications. sub is only unique for one client.
// To enable collaboration between projects, we use oid as the user id if
// provided.
let sub = if let Some(oid) = &claims.oid {
oid.as_str()
} else {
claims.sub.as_str()
};
pub(crate) fn oidc(subject: &str) -> api::Result<Self> {
Self::validate_subject(subject)?;
Ok(Self::OIDC(subject.to_string()))
}

Self::default_prefix(sub)
pub(crate) fn kubernetes(subject: &str) -> api::Result<Self> {
Self::validate_subject(subject)?;
Ok(Self::Kubernetes(subject.to_string()))
}

fn validate_len(subject: &str) -> api::Result<()> {
Expand All @@ -85,7 +71,7 @@ impl UserId {
fn no_illegal_chars(subject: &str) -> api::Result<()> {
if subject
.chars()
.any(|c| !(c.is_alphanumeric() || c == '-' || c == '_' || c == '/'))
.any(|c| !(c.is_alphanumeric() || c == '-' || c == '_'))
{
return Err(ErrorModel::bad_request(
"sub or oid claim contain illegal characters. Only alphanumeric + - are legal.",
Expand All @@ -96,11 +82,6 @@ impl UserId {
}
Ok(())
}

#[must_use]
pub fn inner(&self) -> String {
self.to_string()
}
}

impl From<UserId> for String {
Expand Down Expand Up @@ -223,39 +204,24 @@ impl Principal {

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_user_id() {
let user_id = super::UserId::try_from("default/123".to_string()).unwrap();
assert_eq!(user_id.idp, "default");
assert_eq!(user_id.user_id, "123");

let user_id = super::UserId::try_from("default/123/456".to_string()).unwrap();
assert_eq!(user_id.idp, "default");
assert_eq!(user_id.user_id, "123/456");
let user_id = UserId::try_from("oidc/123".to_string()).unwrap();
assert_eq!(user_id, UserId::OIDC("123".to_string()));
assert_eq!(user_id.to_string(), "oidc/123");

let user_id = super::UserId::try_from("kubernetes/123/456/789".to_string()).unwrap();
assert_eq!(user_id.idp, "kubernetes");
assert_eq!(user_id.user_id, "123/456/789");
let user_id = UserId::try_from("kubernetes/1234".to_string()).unwrap();
assert_eq!(user_id, UserId::Kubernetes("1234".to_string()));
assert_eq!(user_id.to_string(), "kubernetes/1234");

let user_str = user_id.to_string();
assert_eq!(user_str, "kubernetes/123/456/789");
let user_id: UserId = serde_json::from_str(r#""oidc/123""#).unwrap();
assert_eq!(user_id, UserId::OIDC("123".to_string()));

// Test json serde
let user_id: super::UserId = serde_json::from_str("\"kubernetes/123/456/789\"").unwrap();
assert_eq!(user_id.idp, "kubernetes");
assert_eq!(user_id.user_id, "123/456/789");
assert_eq!(
serde_json::to_string(&user_id).unwrap(),
"\"kubernetes/123/456/789\""
);
let user_id: UserId = serde_json::from_str(r#""kubernetes/123""#).unwrap();
assert_eq!(user_id, UserId::Kubernetes("123".to_string()));

// Default deserialization without prefix
let user_id: super::UserId = serde_json::from_str("\"123-456-789\"").unwrap();
assert_eq!(user_id.idp, "default");
assert_eq!(user_id.user_id, "123-456-789");
assert_eq!(
serde_json::to_string(&user_id).unwrap(),
"\"default/123-456-789\""
);
serde_json::from_str::<UserId>(r#""nonexistant/123""#).unwrap_err();
}
}
13 changes: 11 additions & 2 deletions crates/iceberg-catalog/src/service/authn/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ impl AuthDetails {
)
.into());
};
let prefixed_id = UserId::idp_prefixed(uid.as_str(), "kubernetes")?;
let prefixed_id = UserId::kubernetes(uid.as_str())?;
return Ok(AuthDetails::Principal(Principal {
actor: Actor::Principal(prefixed_id.clone()),
user_id: prefixed_id,
Expand All @@ -47,7 +47,16 @@ impl AuthDetails {
}

fn try_from_jwt_claims(claims: Claims) -> Result<Self> {
let user_id = UserId::try_from_claims(&claims)?;
// For azure, the oid claim is permanent to the user account
// accross all Entra ID applications. sub is only unique for one client.
// To enable collaboration between projects, we use oid as the user id if
// provided.
let sub = if let Some(oid) = &claims.oid {
oid.as_str()
} else {
claims.sub.as_str()
};
let user_id = UserId::oidc(sub)?;

let first_name = claims.given_name.or(claims.first_name);
let last_name = claims.family_name.or(claims.last_name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1647,7 +1647,7 @@ mod tests {
async fn test_cannot_assign_role_to_itself() {
let (_, authorizer) = authorizer_for_empty_store().await;

let user_id = UserId::default_prefix(&uuid::Uuid::now_v7().to_string()).unwrap();
let user_id = UserId::oidc(&uuid::Uuid::now_v7().to_string()).unwrap();
let role_id = RoleId::new(uuid::Uuid::nil());

authorizer
Expand Down Expand Up @@ -1684,7 +1684,7 @@ mod tests {
.unwrap();
assert!(relations.is_empty());

let user_id = UserId::default_prefix(&uuid::Uuid::now_v7().to_string()).unwrap();
let user_id = UserId::oidc(&uuid::Uuid::now_v7().to_string()).unwrap();
authorizer
.write(
Some(vec![TupleKey {
Expand Down Expand Up @@ -1741,7 +1741,7 @@ mod tests {
#[tokio::test]
async fn test_get_allowed_actions_as_user() {
let (_, authorizer) = authorizer_for_empty_store().await;
let user_id = UserId::default_prefix(&uuid::Uuid::now_v7().to_string()).unwrap();
let user_id = UserId::oidc(&uuid::Uuid::now_v7().to_string()).unwrap();
let actor = Actor::Principal(user_id.clone());
let access: Vec<ServerAction> =
get_allowed_actions(authorizer.clone(), &actor, &OPENFGA_SERVER, &None)
Expand Down Expand Up @@ -1775,7 +1775,7 @@ mod tests {
async fn test_get_allowed_actions_as_role() {
let (_, authorizer) = authorizer_for_empty_store().await;
let role_id = RoleId::new(uuid::Uuid::now_v7());
let user_id = UserId::default_prefix(&uuid::Uuid::now_v7().to_string()).unwrap();
let user_id = UserId::oidc(&uuid::Uuid::now_v7().to_string()).unwrap();
let actor = Actor::Role {
principal: user_id.clone(),
assumed_role: role_id,
Expand Down Expand Up @@ -1811,7 +1811,7 @@ mod tests {
#[tokio::test]
async fn test_get_allowed_actions_for_other_principal() {
let (_, authorizer) = authorizer_for_empty_store().await;
let user_id = UserId::default_prefix(&uuid::Uuid::now_v7().to_string()).unwrap();
let user_id = UserId::oidc(&uuid::Uuid::now_v7().to_string()).unwrap();
let role_id = RoleId::new(uuid::Uuid::now_v7());
let actor = Actor::Principal(user_id.clone());

Expand Down Expand Up @@ -1868,8 +1868,8 @@ mod tests {
async fn test_checked_write() {
let (_, authorizer) = authorizer_for_empty_store().await;

let user1_id = UserId::default_prefix(&uuid::Uuid::now_v7().to_string()).unwrap();
let user2_id = UserId::default_prefix(&uuid::Uuid::now_v7().to_string()).unwrap();
let user1_id = UserId::oidc(&uuid::Uuid::now_v7().to_string()).unwrap();
let user2_id = UserId::oidc(&uuid::Uuid::now_v7().to_string()).unwrap();

authorizer
.write(
Expand Down Expand Up @@ -1905,7 +1905,7 @@ mod tests {
async fn test_assign_to_role() {
let (_, authorizer) = authorizer_for_empty_store().await;

let user_id_owner = UserId::default_prefix(&uuid::Uuid::now_v7().to_string()).unwrap();
let user_id_owner = UserId::kubernetes(&uuid::Uuid::now_v7().to_string()).unwrap();
let role_id_1 = RoleId::new(uuid::Uuid::nil());
let role_id_2 = RoleId::new(uuid::Uuid::now_v7());

Expand Down Expand Up @@ -1946,8 +1946,8 @@ mod tests {
async fn test_assign_to_project() {
let (_, authorizer) = authorizer_for_empty_store().await;

let user_id_owner = UserId::default_prefix(&uuid::Uuid::now_v7().to_string()).unwrap();
let user_id_assignee = UserId::default_prefix(&uuid::Uuid::nil().to_string()).unwrap();
let user_id_owner = UserId::oidc(&uuid::Uuid::now_v7().to_string()).unwrap();
let user_id_assignee = UserId::kubernetes(&uuid::Uuid::nil().to_string()).unwrap();
let role_id = RoleId::new(uuid::Uuid::now_v7());
let project_id = ProjectIdent::from(uuid::Uuid::nil());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ impl ParseOpenFgaEntity for UserId {
));
}

UserId::default_prefix(id)
UserId::try_from(id.to_string())
.map_err(|_e| OpenFGAError::unexpected_entity(vec![FgaType::User], id.to_string()))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1243,7 +1243,7 @@ pub(crate) mod tests {
#[tokio::test]
async fn test_list_projects() {
let authorizer = new_authorizer_in_empty_store().await;
let user_id = UserId::default_prefix("this_user").unwrap();
let user_id = UserId::oidc("this_user").unwrap();
let actor = Actor::Principal(user_id.clone());
let project = ProjectIdent::from(uuid::Uuid::now_v7());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1416,13 +1416,13 @@ mod test {

#[test]
fn test_assignment_serialization() {
let user_id = UserId::default_prefix("my_user").unwrap();
let user_id = UserId::oidc("my_user").unwrap();
let user_or_role = UserOrRole::User(user_id);
let assignment = ServerAssignment::GlobalAdmin(user_or_role);
let serialized = serde_json::to_string(&assignment).unwrap();
let expected = serde_json::json!({
"type": "global_admin",
"user": "my_user"
"user": "oidc/my_user"
});
assert_eq!(
expected,
Expand Down
Loading