Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - Add Deployment controller for managed connectors #1499

Closed
wants to merge 47 commits into from
Closed
Show file tree
Hide file tree
Changes from 38 commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
a56aa5c
Initial stuff for managed connector specs
simlay Aug 24, 2021
05bc7ef
managed_connector spec
nacardin Aug 24, 2021
594d450
Fixed helm chart for managedconnectors
simlay Aug 25, 2021
75968a5
cargo fmt
simlay Aug 25, 2021
ad7d410
Added list and delete endpoints
simlay Aug 26, 2021
22e907f
Added ManagedConnectors to watch api
simlay Aug 26, 2021
f980f9f
more updates
simlay Aug 26, 2021
04f7748
Added deployment generation
simlay Aug 27, 2021
092e8c3
working deployment
nacardin Aug 27, 2021
225b76f
Added args to connector config
simlay Aug 30, 2021
691c901
Updated to use k8-api patch branch
simlay Aug 31, 2021
27a615f
Merge branch 'master' of github.com:infinyon/fluvio into add-spec-for…
simlay Aug 31, 2021
e4b4851
Image name is now correct
simlay Sep 2, 2021
a17edf5
Merge branch 'master' of github.com:infinyon/fluvio into add-spec-for…
simlay Sep 2, 2021
06c8585
Merge branch 'master' of github.com:infinyon/fluvio into add-spec-for…
simlay Sep 2, 2021
c80f1b1
fix warnings
simlay Sep 3, 2021
12b1799
Merge branch 'master' of github.com:infinyon/fluvio into add-spec-for…
simlay Sep 3, 2021
ed364b3
updates to helm charts
simlay Sep 7, 2021
ad54a5e
Merge branch 'master' of github.com:infinyon/fluvio into add-spec-for…
simlay Sep 7, 2021
a4fe234
Working again
simlay Sep 7, 2021
a100488
Added status to container
simlay Sep 8, 2021
5f4525c
Merge branch 'master' of github.com:infinyon/fluvio into add-spec-for…
simlay Sep 10, 2021
4b9d84b
initial scaffolding to config map
simlay Sep 15, 2021
5cb88bf
Working for demo
simlay Sep 15, 2021
de2d6ba
Added fluvio-connector-helm chart
simlay Sep 16, 2021
d5d8ebc
Merge branch 'master' of github.com:infinyon/fluvio into add-spec-for…
simlay Sep 22, 2021
6ee61de
deleted unused files
simlay Sep 22, 2021
cf1f326
updates
simlay Sep 22, 2021
f05f170
Updated helm chart
simlay Sep 22, 2021
dd4da1b
fix clippy
simlay Sep 22, 2021
ccc4e72
cargo fmt
simlay Sep 22, 2021
511a445
Merge branch 'master' of github.com:infinyon/fluvio into add-spec-for…
simlay Sep 23, 2021
c57ac85
Merge branch 'master' of github.com:infinyon/fluvio into add-spec-for…
simlay Sep 24, 2021
6596407
Added default for create_topic
simlay Sep 24, 2021
9bf6646
Merge branch 'master' of github.com:infinyon/fluvio into add-spec-for…
simlay Sep 27, 2021
bdcb33c
Tested and working
simlay Sep 28, 2021
fd95342
cargo fmt
simlay Sep 28, 2021
8c9b190
Clean up unused comments and fmt
simlay Sep 28, 2021
20e903b
Updates from comments
simlay Sep 28, 2021
3399b25
added tls work around for configmap
simlay Sep 30, 2021
3a9bcf5
cargo fmt
simlay Sep 30, 2021
847557e
Merge branch 'master' of github.com:infinyon/fluvio into add-spec-for…
simlay Sep 30, 2021
bacaa73
Added filter for managed connector
simlay Sep 30, 2021
cdf589d
Update from comments
simlay Sep 30, 2021
05dc77a
cargo fmt
simlay Sep 30, 2021
66b4971
Merge branch 'master' of github.com:infinyon/fluvio into add-spec-for…
simlay Sep 30, 2021
343e042
fix clippy
simlay Sep 30, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 8 additions & 2 deletions crates/fluvio-connector/src/cli/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@
//! CLI tree to generate Create a Managed Connector
//!

use fluvio_controlplane_metadata::connector::ManagedConnectorSpec;
use structopt::StructOpt;
use tracing::debug;

use fluvio::Fluvio;
use fluvio::metadata::topic::{TopicSpec, TopicReplicaParam};
use fluvio_controlplane_metadata::connector::ManagedConnectorSpec;

use crate::error::ConnectorError;
use crate::config::ConnectorConfig;
Expand All @@ -27,12 +28,17 @@ pub struct CreateManagedConnectorOpt {
impl CreateManagedConnectorOpt {
pub async fn process(self, fluvio: &Fluvio) -> Result<(), ConnectorError> {
let config = ConnectorConfig::from_file(&self.config)?;
let spec: ManagedConnectorSpec = config.into();
let spec: ManagedConnectorSpec = config.clone().into();
let name = spec.name.clone();

debug!("creating managed_connector: {}, spec: {:#?}", name, spec);

let admin = fluvio.admin().await;
if config.create_topic {
let topic_spec = TopicSpec::Computed(TopicReplicaParam::new(1, 1, false));
println!("topic spec: {:?}", topic_spec);
simlay marked this conversation as resolved.
Show resolved Hide resolved
admin.create(config.topic, false, topic_spec).await?;
}
admin.create(name.to_string(), false, spec).await?;

Ok(())
Expand Down
15 changes: 6 additions & 9 deletions crates/fluvio-connector/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,21 @@ use std::io::Read;
use fluvio_controlplane_metadata::connector::ManagedConnectorSpec;
use crate::error::ConnectorError;

#[derive(Debug, Deserialize)]
#[derive(Debug, Deserialize, Clone)]
pub struct ConnectorConfig {
name: String,
#[serde(rename = "type")]
type_: String,
topic: String,
create_topic: Option<bool>,
#[serde(default = "ConnectorConfig::default_args")]
pub(crate) topic: String,
#[serde(default)]
pub(crate) create_topic: bool,
#[serde(default)]
parameters: BTreeMap<String, String>,
#[serde(default = "ConnectorConfig::default_args")]
#[serde(default)]
secrets: BTreeMap<String, String>,
}

impl ConnectorConfig {
fn default_args() -> BTreeMap<String, String> {
BTreeMap::new()
}

pub fn from_file<P: Into<PathBuf>>(path: P) -> Result<ConnectorConfig, ConnectorError> {
let mut file = File::open(path.into())?;
let mut contents = String::new();
Expand Down
3 changes: 2 additions & 1 deletion crates/fluvio-connector/test-data/test-config.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
version: v1
name: my-test-connector
type: test-connector
topic: foobar
topic: test-connector
create_topic: true
direction: source
paramaters:
foo: bar
Expand Down
249 changes: 249 additions & 0 deletions crates/fluvio-sc/src/k8/controllers/managed_connector_deployment.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,249 @@
use std::collections::HashMap;
use std::{time::Duration};

use fluvio_stream_dispatcher::{store::K8ChangeListener};
use k8_types::LabelSelector;
simlay marked this conversation as resolved.
Show resolved Hide resolved
use tracing::debug;
simlay marked this conversation as resolved.
Show resolved Hide resolved
use tracing::error;
use tracing::trace;
use tracing::instrument;

use fluvio_future::task::spawn;
use fluvio_future::timer::sleep;
use k8_client::ClientError;

use crate::stores::{StoreContext};
use crate::stores::connector::ManagedConnectorSpec;
simlay marked this conversation as resolved.
Show resolved Hide resolved
use crate::stores::connector::ManagedConnectorStatus;
use crate::stores::connector::ManagedConnectorStatusResolution;

use crate::k8::objects::managed_connector_deployment::ManagedConnectorDeploymentSpec;

use crate::stores::k8::K8MetaItem;
use crate::k8::objects::managed_connector_deployment::K8DeploymentSpec;
use crate::stores::MetadataStoreObject;
use crate::stores::actions::WSAction;

use k8_types::{
simlay marked this conversation as resolved.
Show resolved Hide resolved
TemplateSpec, TemplateMeta,
core::pod::{
PodSpec, ContainerSpec, VolumeMount, ConfigMapVolumeSource, KeyToPath, VolumeSpec,
},
LabelProvider,
};

/// Update Statefulset and Service from SPG
pub struct ManagedConnectorDeploymentController {
namespace: String,
connectors: StoreContext<ManagedConnectorSpec>,
deployments: StoreContext<ManagedConnectorDeploymentSpec>,
}

impl ManagedConnectorDeploymentController {
pub fn start(
namespace: String,
connectors: StoreContext<ManagedConnectorSpec>,
deployments: StoreContext<ManagedConnectorDeploymentSpec>,
) {
let controller = Self {
namespace,
connectors,
deployments,
};

spawn(controller.dispatch_loop());
}

async fn dispatch_loop(mut self) {
loop {
if let Err(err) = self.inner_loop().await {
error!("error with managed connector loop: {:#?}", err);
debug!("sleeping 1 miniute to try again");
sleep(Duration::from_secs(60)).await;
}
}
}

#[instrument(skip(self), name = "ManagedConnectorDeploymentController")]
async fn inner_loop(&mut self) -> Result<(), ClientError> {
use tokio::select;

let mut connector_listener = self.connectors.change_listener();
let _ = connector_listener.wait_for_initial_sync().await;

let mut deployment_listener = self.deployments.change_listener();
let _ = deployment_listener.wait_for_initial_sync().await;

self.sync_connectors_to_deployments(&mut connector_listener)
.await?;

loop {
select! {
_ = connector_listener.listen() => {
debug!("detected connector changes");
self.sync_connectors_to_deployments(&mut connector_listener).await?;
},
_ = deployment_listener.listen() => {
self.sync_deployments_to_connector(&mut deployment_listener).await?;
}
}
}
}

async fn sync_deployments_to_connector(
&mut self,
listener: &mut K8ChangeListener<ManagedConnectorDeploymentSpec>,
) -> Result<(), ClientError> {
if !listener.has_change() {
trace!("no managed connector change, skipping");
return Ok(());
}
let changes = listener.sync_changes().await;
let epoch = changes.epoch;
let (updates, deletes) = changes.parts();

debug!(
"received managed connector deployment changes updates: {},deletes: {},epoch: {}",
updates.len(),
deletes.len(),
epoch,
);
for mc_deployment in updates.into_iter() {
let key = mc_deployment.key();
let deployment_status = mc_deployment.status();
let ready_replicas = deployment_status.0.ready_replicas;

let resolution = if ready_replicas.is_some() && ready_replicas.unwrap() > 0 {
simlay marked this conversation as resolved.
Show resolved Hide resolved
ManagedConnectorStatusResolution::Running
} else {
ManagedConnectorStatusResolution::Failed
};

let connector_status = ManagedConnectorStatus {
resolution,
..Default::default()
};
self.connectors
.update_status(key.to_string(), connector_status.clone())
.await?;
}
Ok(())
}

/// svc has been changed, update spu
async fn sync_connectors_to_deployments(
&mut self,
listener: &mut K8ChangeListener<ManagedConnectorSpec>,
) -> Result<(), ClientError> {
if !listener.has_change() {
trace!("no managed connector change, skipping");
return Ok(());
}

let changes = listener.sync_changes().await;
let epoch = changes.epoch;
let (updates, deletes) = changes.parts();

debug!(
"received managed connector changes updates: {},deletes: {},epoch: {}",
updates.len(),
deletes.len(),
epoch,
);

for mc_item in updates.into_iter() {
self.sync_connector_to_deployment(mc_item).await?
}

Ok(())
}

#[instrument(skip(self, managed_connector))]
async fn sync_connector_to_deployment(
&mut self,
managed_connector: MetadataStoreObject<ManagedConnectorSpec, K8MetaItem>,
) -> Result<(), ClientError> {
let key = managed_connector.key();

let k8_deployment_spec =
Self::generate_k8_deployment_spec(managed_connector.spec(), &self.namespace, key);
trace!(?k8_deployment_spec);
let deployment_action = WSAction::Apply(
MetadataStoreObject::with_spec(key, k8_deployment_spec.into())
.with_context(managed_connector.ctx().create_child()),
);

debug!(?deployment_action, "applying deployment");

self.deployments.wait_action(key, deployment_action).await?;

Ok(())
}

const DEFAULT_CONNECTOR_NAME: &'static str = "fluvio-connector";
pub fn generate_k8_deployment_spec(
mc_spec: &ManagedConnectorSpec,
_namespace: &str,
_name: &str,
) -> K8DeploymentSpec {
let image = format!("infinyon/fluvio-connect-{}", mc_spec.type_);
debug!("Starting connector for image: {:?}", image);

let config_map_volume_spec = VolumeSpec {
name: "fluvio-config-volume".to_string(),
config_map: Some(ConfigMapVolumeSource {
name: Some("fluvio-config-map".to_string()),
items: Some(vec![KeyToPath {
key: "fluvioClientConfig".to_string(),
path: "config".to_string(),
..Default::default()
}]),
..Default::default()
}),
..Default::default()
};

let parameters = &mc_spec.parameters;
let args: Vec<String> = parameters
.keys()
.zip(parameters.values())
.flat_map(|(key, value)| [key.clone(), value.clone()])
.collect::<Vec<_>>();
let template = TemplateSpec {
metadata: Some(
TemplateMeta::default().set_labels(vec![("app", Self::DEFAULT_CONNECTOR_NAME)]),
),
spec: PodSpec {
termination_grace_period_seconds: Some(10),
containers: vec![ContainerSpec {
name: Self::DEFAULT_CONNECTOR_NAME.to_owned(),
image: Some(image),
image_pull_policy: Some("IfNotPresent".to_string()),
/*
env, // TODO
*/
volume_mounts: vec![VolumeMount {
name: "fluvio-config-volume".to_string(),
mount_path: "/home/fluvio/.fluvio".to_string(),
..Default::default()
}],
args,
..Default::default()
}],
volumes: vec![config_map_volume_spec],
//security_context: spu_k8_config.pod_security_context.clone(),
//node_selector: Some(spu_pod_config.node_selector.clone()),
..Default::default()
},
};

let mut match_labels = HashMap::new();
match_labels.insert("app".to_owned(), Self::DEFAULT_CONNECTOR_NAME.to_owned());

K8DeploymentSpec {
template,
selector: LabelSelector { match_labels },
..Default::default()
}
}
}
22 changes: 21 additions & 1 deletion crates/fluvio-sc/src/k8/controllers/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
pub mod spg_stateful;
pub mod spu_service;
pub mod spu_controller;
pub mod managed_connector_deployment;

pub use k8_operator::run_k8_operators;

Expand All @@ -18,9 +19,12 @@ mod k8_operator {
use crate::k8::objects::statefulset::StatefulsetSpec;
use crate::k8::objects::spg_service::SpgServiceSpec;
use crate::k8::objects::spu_k8_config::ScK8Config;
use crate::k8::objects::managed_connector_deployment::ManagedConnectorDeploymentSpec;

use crate::k8::controllers::spg_stateful::SpgStatefulSetController;
use crate::k8::controllers::spu_service::SpuServiceController;
use crate::k8::controllers::spu_controller::K8SpuController;
use crate::k8::controllers::managed_connector_deployment::ManagedConnectorDeploymentController;

pub async fn run_k8_operators(
namespace: String,
Expand All @@ -33,6 +37,9 @@ mod k8_operator {
let spu_service_ctx: StoreContext<SpuServiceSpec> = StoreContext::new();
let statefulset_ctx: StoreContext<StatefulsetSpec> = StoreContext::new();
let spg_service_ctx: StoreContext<SpgServiceSpec> = StoreContext::new();
let managed_connector_deployments_ctx: StoreContext<ManagedConnectorDeploymentSpec> =
StoreContext::new();

let config_ctx: StoreContext<ScK8Config> = StoreContext::new();

info!("starting k8 cluster operators");
Expand All @@ -55,11 +62,17 @@ mod k8_operator {
spg_service_ctx.clone(),
);

K8ClusterStateDispatcher::<_, _>::start(
namespace.clone(),
k8_client.clone(),
managed_connector_deployments_ctx.clone(),
);

K8ClusterStateDispatcher::<_, _>::start(namespace.clone(), k8_client, config_ctx.clone());

whitelist!(config, "k8_spg", {
SpgStatefulSetController::start(
namespace,
namespace.clone(),
config_ctx.clone(),
global_ctx.spgs().clone(),
statefulset_ctx,
Expand All @@ -80,5 +93,12 @@ mod k8_operator {
whitelist!(config, "k8_spu_service", {
SpuServiceController::start(config_ctx, spu_service_ctx, global_ctx.spgs().clone());
});
whitelist!(config, "k8_managed_connector_delpoyment", {
ManagedConnectorDeploymentController::start(
namespace,
global_ctx.managed_connectors().clone(),
managed_connector_deployments_ctx,
);
});
}
}
Loading