Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - Add Deployment controller for managed connectors #1499

Closed
wants to merge 47 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
a56aa5c
Initial stuff for managed connector specs
simlay Aug 24, 2021
05bc7ef
managed_connector spec
nacardin Aug 24, 2021
594d450
Fixed helm chart for managedconnectors
simlay Aug 25, 2021
75968a5
cargo fmt
simlay Aug 25, 2021
ad7d410
Added list and delete endpoints
simlay Aug 26, 2021
22e907f
Added ManagedConnectors to watch api
simlay Aug 26, 2021
f980f9f
more updates
simlay Aug 26, 2021
04f7748
Added deployment generation
simlay Aug 27, 2021
092e8c3
working deployment
nacardin Aug 27, 2021
225b76f
Added args to connector config
simlay Aug 30, 2021
691c901
Updated to use k8-api patch branch
simlay Aug 31, 2021
27a615f
Merge branch 'master' of github.com:infinyon/fluvio into add-spec-for…
simlay Aug 31, 2021
e4b4851
Image name is now correct
simlay Sep 2, 2021
a17edf5
Merge branch 'master' of github.com:infinyon/fluvio into add-spec-for…
simlay Sep 2, 2021
06c8585
Merge branch 'master' of github.com:infinyon/fluvio into add-spec-for…
simlay Sep 2, 2021
c80f1b1
fix warnings
simlay Sep 3, 2021
12b1799
Merge branch 'master' of github.com:infinyon/fluvio into add-spec-for…
simlay Sep 3, 2021
ed364b3
updates to helm charts
simlay Sep 7, 2021
ad54a5e
Merge branch 'master' of github.com:infinyon/fluvio into add-spec-for…
simlay Sep 7, 2021
a4fe234
Working again
simlay Sep 7, 2021
a100488
Added status to container
simlay Sep 8, 2021
5f4525c
Merge branch 'master' of github.com:infinyon/fluvio into add-spec-for…
simlay Sep 10, 2021
4b9d84b
initial scaffolding to config map
simlay Sep 15, 2021
5cb88bf
Working for demo
simlay Sep 15, 2021
de2d6ba
Added fluvio-connector-helm chart
simlay Sep 16, 2021
d5d8ebc
Merge branch 'master' of github.com:infinyon/fluvio into add-spec-for…
simlay Sep 22, 2021
6ee61de
deleted unused files
simlay Sep 22, 2021
cf1f326
updates
simlay Sep 22, 2021
f05f170
Updated helm chart
simlay Sep 22, 2021
dd4da1b
fix clippy
simlay Sep 22, 2021
ccc4e72
cargo fmt
simlay Sep 22, 2021
511a445
Merge branch 'master' of github.com:infinyon/fluvio into add-spec-for…
simlay Sep 23, 2021
c57ac85
Merge branch 'master' of github.com:infinyon/fluvio into add-spec-for…
simlay Sep 24, 2021
6596407
Added default for create_topic
simlay Sep 24, 2021
9bf6646
Merge branch 'master' of github.com:infinyon/fluvio into add-spec-for…
simlay Sep 27, 2021
bdcb33c
Tested and working
simlay Sep 28, 2021
fd95342
cargo fmt
simlay Sep 28, 2021
8c9b190
Clean up unused comments and fmt
simlay Sep 28, 2021
20e903b
Updates from comments
simlay Sep 28, 2021
3399b25
added tls work around for configmap
simlay Sep 30, 2021
3a9bcf5
cargo fmt
simlay Sep 30, 2021
847557e
Merge branch 'master' of github.com:infinyon/fluvio into add-spec-for…
simlay Sep 30, 2021
bacaa73
Added filter for managed connector
simlay Sep 30, 2021
cdf589d
Update from comments
simlay Sep 30, 2021
05dc77a
cargo fmt
simlay Sep 30, 2021
66b4971
Merge branch 'master' of github.com:infinyon/fluvio into add-spec-for…
simlay Sep 30, 2021
343e042
fix clippy
simlay Sep 30, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Added deployment generation
  • Loading branch information
simlay committed Aug 27, 2021
commit 04f7748f6ef21ade131b5cd37807569a73b0c996
203 changes: 81 additions & 122 deletions src/sc/src/k8/controllers/manged_connector_deployment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ use crate::k8::objects::spg_service::SpgServiceSpec;
//use crate::k8::objects::deployment::DeploymentStatus;
use crate::k8::objects::managed_connector_deployment::ManagedConnectorDeploymentSpec;

use crate::stores::k8::K8MetaItem;
use fluvio_controlplane_metadata::store::MetadataStoreObject;
use crate::k8::objects::managed_connector_deployment::K8DeploymentSpec;



/// Update Statefulset and Service from SPG
pub struct ManagedConnectorDeploymentController {
Expand Down Expand Up @@ -66,62 +71,32 @@ impl ManagedConnectorDeploymentController {
}
}

#[instrument(skip(self), name = "SpgStatefulSetController")]
#[instrument(skip(self), name = "ManagedConnectorDeploymentController")]
async fn inner_loop(&mut self) -> Result<(), ClientError> {
todo!();
use tokio::select;

let mut connector_listener = self.connectors.change_listener();
let mut config_listener = self.configs.change_listener();

self.sync_connectors_to_deployment(&mut connector_listener).await?;
self.sync_with_config(&mut config_listener).await?;
self.sync_connectors_to_deployments(&mut connector_listener).await?;

loop {
select! {
_ = connector_listener.listen() => {
debug!("detected connector changes");
self.sync_connectors_to_deployment(&mut connector_listener).await?;
},

_ = config_listener.listen() => {
debug!("detected config changes");
self.sync_with_config(&mut config_listener).await?;
}

}
}
/*

let mut spg_listener = self.groups.change_listener();
let mut config_listener = self.configs.change_listener();

self.sync_spgs_to_statefulset(&mut spg_listener).await?;
self.sync_with_config(&mut config_listener).await?;

loop {
select! {
_ = spg_listener.listen() => {
debug!("detected spg changes");
self.sync_spgs_to_statefulset(&mut spg_listener).await?;
self.sync_connectors_to_deployments(&mut connector_listener).await?;
},

_ = config_listener.listen() => {
debug!("detected config changes");
self.sync_with_config(&mut config_listener).await?;
}

}
}
*/
}

async fn sync_with_config(
/// svc has been changed, update spu
async fn sync_connectors_to_deployments(
&mut self,
listener: &mut K8ChangeListener<ScK8Config>,
listener: &mut K8ChangeListener<ManagedConnectorSpec>,
) -> Result<(), ClientError> {
if !listener.has_change() {
trace!("no config change, skipping");
trace!("no managed connector change, skipping");
return Ok(());
}

Expand All @@ -130,113 +105,97 @@ impl ManagedConnectorDeploymentController {
let (updates, deletes) = changes.parts();

debug!(
"received config changes updates: {},deletes: {},epoch: {}",
"received service changes updates: {},deletes: {},epoch: {}",
updates.len(),
deletes.len(),
epoch,
);

for config in updates.into_iter() {
for group_item in self.connectors.store().clone_values().await {
/*
let spu_group = SpuGroupObj::new(group_item);
for mc_item in updates.into_iter() {

self.sync_spg_to_statefulset(spu_group, &config.spec)
.await?
*/
}
self.sync_connector_to_deployment(mc_item).await?
}
/*
*/

Ok(())
}

/// svc has been changed, update spu
async fn sync_connectors_to_deployment(
#[instrument(skip(self, managed_connector))]
async fn sync_connector_to_deployment(
&mut self,
listener: &mut K8ChangeListener<ManagedConnectorSpec>,
managed_connector: MetadataStoreObject<ManagedConnectorSpec, K8MetaItem>,
) -> Result<(), ClientError> {
/*
if !listener.has_change() {
trace!("no spg change, skipping");
return Ok(());
}

let changes = listener.sync_changes().await;
let epoch = changes.epoch;
let (updates, deletes) = changes.parts();

debug!(
"received service changes updates: {},deletes: {},epoch: {}",
updates.len(),
deletes.len(),
epoch,
let key = managed_connector.key();
let k8_deployment_spec = Self::generate_k8_deployment_spec(
&managed_connector.spec(),
&self.namespace,
key,
);
trace!(?k8_deployment_spec);
use crate::stores::MetadataStoreObject;
use crate::stores::actions::WSAction;

if let Some(config_obj) = self.configs.store().value("fluvio").await {
let config = config_obj.inner_owned().spec;
for group_item in updates.into_iter() {
let spu_group = SpuGroupObj::new(group_item);
let deployment_action =
WSAction::Apply(
MetadataStoreObject::with_spec(key, k8_deployment_spec.into())
.with_context(managed_connector.ctx().create_child()),
);

self.sync_spg_to_statefulset(spu_group, &config).await?
}
} else {
error!("config map is not loaded, skipping");
}
*/
debug!(?deployment_action, "applying statefulset");

self.deployment
.wait_action(&key, deployment_action)
.await?;

Ok(())
}
const DEFAULT_CONNECTOR_NAME : &'static str = "fluvio-connector";
pub fn generate_k8_deployment_spec(
mc_spec: &ManagedConnectorSpec,
namespace: &str,
name: &str,
) -> K8DeploymentSpec {
use k8_types::{
TemplateSpec,
TemplateMeta,
core::pod::{
PodSpec,
ContainerSpec,
},
LabelProvider,
};

#[instrument(skip(self, spu_k8_config, spu_group))]
async fn sync_spg_to_statefulset(
&mut self,
spu_group: SpuGroupObj,
spu_k8_config: &ScK8Config,
) -> Result<(), ClientError> {
let image = format!("infinyon/fluvio-connector-{}", mc_spec.name);
/*
let spg_name = spu_group.key();

// ensure we don't have conflict with existing spu group
if let Some(conflict_id) = spu_group.is_conflict_with(self.spus.store()).await {
warn!(conflict_id, "connector is in conflict with existing id");
let status = SpuGroupStatus::invalid(format!("conflict with: {}", conflict_id));

self.connectors
.update_status(spg_name.to_owned(), status)
.await?;
} else {
// if we pass this stage, then we reserved id.
if !spu_group.is_already_valid() {
debug!("not valid");
let status = SpuGroupStatus::reserved();
self.groups
.update_status(spg_name.to_owned(), status)
.await?;
return Ok(());
}

debug!("continue");
let (spg_service_key, spg_service_action) = spu_group.as_service();

/*
trace!("spg_service_actions: {:#?}", spg_service_action);
self.spg_services
.wait_action(&spg_service_key, spg_service_action)
.await?;
*/

let (stateful_key, stateful_action) =
spu_group.as_statefulset(&self.namespace, spu_k8_config, self.tls.as_ref());

debug!(?stateful_action, "applying statefulset");
self.statefulsets
.wait_action(&stateful_key, stateful_action)
.await?;
}
let env = todo!();
let args = todo!();
*/

Ok(())
let template = TemplateSpec {
metadata: Some(
TemplateMeta::default()
.set_labels(vec![("app", Self::DEFAULT_CONNECTOR_NAME)]),
),
spec: PodSpec {
termination_grace_period_seconds: Some(10),
containers: vec![ContainerSpec {
name: Self::DEFAULT_CONNECTOR_NAME.to_owned(),
image: Some(image.clone()),
/*
env,
args,
*/
..Default::default()
}],
//security_context: spu_k8_config.pod_security_context.clone(),
//node_selector: Some(spu_pod_config.node_selector.clone()),
..Default::default()
},
};
K8DeploymentSpec {
template,
..Default::default()
}
}

}

14 changes: 14 additions & 0 deletions src/sc/src/k8/objects/managed_connector_deployment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,20 @@ impl Spec for ManagedConnectorDeploymentSpec {
type Owner = ManagedConnectorSpec;
}

/*
* TODO: Maybe make this more generic.
#[derive(Deserialize, Serialize, Debug, Default, Clone, PartialEq)]
#[serde(transparent)]
pub struct DeploymentSpec<T: Spec>(K8DeploymentSpec, T);

impl<T: Spec> Spec for DeploymentSpec<T> {
const LABEL: &'static str = "Deployment";
type IndexKey = String;
type Status = DeploymentStatus;
type Owner = T;
}
*/

impl From<K8DeploymentSpec> for ManagedConnectorDeploymentSpec {
fn from(k8: K8DeploymentSpec) -> Self {
Self(k8)
Expand Down