diff --git a/CHANGELOG.md b/CHANGELOG.md index b8cc6ae40c..34aeef2f73 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,8 +3,9 @@ ## Platform Version 0.9.9 - UNRELEASED * Add `impl std::error::Error for ErrorCode` for better error reporting ([#1693](https://github.com/infinyon/fluvio/pull/1693)) * Add ability to create a consumer that can read from multiple partitions concurrently. ([#1568](https://github.com/infinyon/fluvio/issues/1568)) -* Expose partition for `fluvio consume --format`. ([#1701](https://github.com/infinyon/fluvio/issues/1701)) -* Fix consumer stream hanging after rollver ([#1700](https://github.com/infinyon/fluvio/issues/1700)) +* Expose partition for `fluvio consume --format`. ([#1701](https://github.com/infinyon/fluvio/issues/1701)) +* Fix consumer stream hanging after rollver ([#1700](https://github.com/infinyon/fluvio/issues/1700)) +* Added deployment controller for managed connectors ([#1499](https://github.com/infinyon/fluvio/pull/1499)). ## Platform Version 0.9.8 - 2021-09-23 * Add progress indicator to `fluvio cluster start` ([#1627](https://github.com/infinyon/fluvio/pull/1627)) @@ -26,7 +27,7 @@ * Increase platform stability ([#1497](https://github.com/infinyon/fluvio/pull/1497)) * Spawn a thread to handle stream fetch requests ([#1522](https://github.com/infinyon/fluvio/issues/1522)) -## Platform Version 0.9.4 - 2021-08-26 +## Platform Version 0.9.4 - 2021-08-26 * Publish docker image for aarch64 #1389 ([#1389](https://github.com/infinyon/fluvio/pull/1389)) * Do not panic when trying to create topic with space in the name. ([#1448](https://github.com/infinyon/fluvio/pull/1448)) * Deprecate consumer fetch API ([#957](https://github.com/infinyon/fluvio/issues/957)) diff --git a/Cargo.lock b/Cargo.lock index 1a6eb46c46..495c71a5dd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3244,9 +3244,9 @@ dependencies = [ [[package]] name = "openssl-sys" -version = "0.9.66" +version = "0.9.67" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1996d2d305e561b70d1ee0c53f1542833f4e1ac6ce9a6708b6ff2738ca67dc82" +checksum = "69df2d8dfc6ce3aaf44b40dec6f487d5a886516cf6879c49e98e0710f310a058" dependencies = [ "autocfg", "cc", diff --git a/crates/fluvio-connector/src/cli/create.rs b/crates/fluvio-connector/src/cli/create.rs index 2c235ca7f0..2427bc71c1 100644 --- a/crates/fluvio-connector/src/cli/create.rs +++ b/crates/fluvio-connector/src/cli/create.rs @@ -4,11 +4,12 @@ //! CLI tree to generate Create a Managed Connector //! -use fluvio_controlplane_metadata::connector::ManagedConnectorSpec; use structopt::StructOpt; use tracing::debug; use fluvio::Fluvio; +use fluvio::metadata::topic::{TopicSpec, TopicReplicaParam}; +use fluvio_controlplane_metadata::connector::ManagedConnectorSpec; use crate::error::ConnectorError; use crate::config::ConnectorConfig; @@ -27,12 +28,17 @@ pub struct CreateManagedConnectorOpt { impl CreateManagedConnectorOpt { pub async fn process(self, fluvio: &Fluvio) -> Result<(), ConnectorError> { let config = ConnectorConfig::from_file(&self.config)?; - let spec: ManagedConnectorSpec = config.into(); + let spec: ManagedConnectorSpec = config.clone().into(); let name = spec.name.clone(); debug!("creating managed_connector: {}, spec: {:#?}", name, spec); let admin = fluvio.admin().await; + if config.create_topic { + let topic_spec = TopicSpec::Computed(TopicReplicaParam::new(1, 1, false)); + debug!("topic spec: {:?}", topic_spec); + admin.create(config.topic, false, topic_spec).await?; + } admin.create(name.to_string(), false, spec).await?; Ok(()) diff --git a/crates/fluvio-connector/src/config.rs b/crates/fluvio-connector/src/config.rs index 42c89d34e3..3fd7af037b 100644 --- a/crates/fluvio-connector/src/config.rs +++ b/crates/fluvio-connector/src/config.rs @@ -7,24 +7,21 @@ use std::io::Read; use fluvio_controlplane_metadata::connector::ManagedConnectorSpec; use crate::error::ConnectorError; -#[derive(Debug, Deserialize)] +#[derive(Debug, Deserialize, Clone)] pub struct ConnectorConfig { name: String, #[serde(rename = "type")] type_: String, - topic: String, - create_topic: Option, - #[serde(default = "ConnectorConfig::default_args")] + pub(crate) topic: String, + #[serde(default)] + pub(crate) create_topic: bool, + #[serde(default)] parameters: BTreeMap, - #[serde(default = "ConnectorConfig::default_args")] + #[serde(default)] secrets: BTreeMap, } impl ConnectorConfig { - fn default_args() -> BTreeMap { - BTreeMap::new() - } - pub fn from_file>(path: P) -> Result { let mut file = File::open(path.into())?; let mut contents = String::new(); diff --git a/crates/fluvio-connector/test-data/test-config.yaml b/crates/fluvio-connector/test-data/test-config.yaml index 8e85093dc4..3446d3f9b3 100644 --- a/crates/fluvio-connector/test-data/test-config.yaml +++ b/crates/fluvio-connector/test-data/test-config.yaml @@ -1,7 +1,8 @@ version: v1 name: my-test-connector type: test-connector -topic: foobar +topic: test-connector +create_topic: true direction: source paramaters: foo: bar diff --git a/crates/fluvio-sc/src/k8/controllers/managed_connector_deployment.rs b/crates/fluvio-sc/src/k8/controllers/managed_connector_deployment.rs new file mode 100644 index 0000000000..ad738b33be --- /dev/null +++ b/crates/fluvio-sc/src/k8/controllers/managed_connector_deployment.rs @@ -0,0 +1,242 @@ +use std::{collections::HashMap, time::Duration}; + +use tracing::{debug, error, trace, instrument}; +use k8_client::ClientError; +use k8_types::{ + LabelSelector, TemplateSpec, TemplateMeta, + core::pod::{ + PodSpec, ContainerSpec, VolumeMount, ConfigMapVolumeSource, KeyToPath, VolumeSpec, + }, + LabelProvider, +}; + +use fluvio_future::{task::spawn, timer::sleep}; +use fluvio_stream_dispatcher::store::K8ChangeListener; + +use crate::stores::{ + StoreContext, + connector::{ManagedConnectorSpec, ManagedConnectorStatus, ManagedConnectorStatusResolution}, + k8::K8MetaItem, + MetadataStoreObject, + actions::WSAction, +}; + +use crate::k8::objects::managed_connector_deployment::{ + ManagedConnectorDeploymentSpec, K8DeploymentSpec, +}; + +/// Update Statefulset and Service from SPG +pub struct ManagedConnectorDeploymentController { + namespace: String, + connectors: StoreContext, + deployments: StoreContext, +} + +impl ManagedConnectorDeploymentController { + pub fn start( + namespace: String, + connectors: StoreContext, + deployments: StoreContext, + ) { + let controller = Self { + namespace, + connectors, + deployments, + }; + + spawn(controller.dispatch_loop()); + } + + async fn dispatch_loop(mut self) { + loop { + if let Err(err) = self.inner_loop().await { + error!("error with managed connector loop: {:#?}", err); + debug!("sleeping 1 miniute to try again"); + sleep(Duration::from_secs(60)).await; + } + } + } + + #[instrument(skip(self), name = "ManagedConnectorDeploymentController")] + async fn inner_loop(&mut self) -> Result<(), ClientError> { + use tokio::select; + + let mut connector_listener = self.connectors.change_listener(); + let _ = connector_listener.wait_for_initial_sync().await; + + let mut deployment_listener = self.deployments.change_listener(); + let _ = deployment_listener.wait_for_initial_sync().await; + + self.sync_connectors_to_deployments(&mut connector_listener) + .await?; + + loop { + select! { + _ = connector_listener.listen() => { + debug!("detected connector changes"); + self.sync_connectors_to_deployments(&mut connector_listener).await?; + }, + _ = deployment_listener.listen() => { + self.sync_deployments_to_connector(&mut deployment_listener).await?; + } + } + } + } + + async fn sync_deployments_to_connector( + &mut self, + listener: &mut K8ChangeListener, + ) -> Result<(), ClientError> { + if !listener.has_change() { + trace!("no managed connector change, skipping"); + return Ok(()); + } + let changes = listener.sync_changes().await; + let epoch = changes.epoch; + let (updates, deletes) = changes.parts(); + + debug!( + "received managed connector deployment changes updates: {},deletes: {},epoch: {}", + updates.len(), + deletes.len(), + epoch, + ); + for mc_deployment in updates.into_iter() { + let key = mc_deployment.key(); + let deployment_status = mc_deployment.status(); + let ready_replicas = deployment_status.0.ready_replicas; + let resolution = match ready_replicas { + Some(ready_replicas) if ready_replicas > 0 => { + ManagedConnectorStatusResolution::Running + } + _ => ManagedConnectorStatusResolution::Failed, + }; + + let connector_status = ManagedConnectorStatus { + resolution, + ..Default::default() + }; + self.connectors + .update_status(key.to_string(), connector_status.clone()) + .await?; + } + Ok(()) + } + + /// svc has been changed, update spu + async fn sync_connectors_to_deployments( + &mut self, + listener: &mut K8ChangeListener, + ) -> Result<(), ClientError> { + if !listener.has_change() { + trace!("no managed connector change, skipping"); + return Ok(()); + } + + let changes = listener.sync_changes().await; + let epoch = changes.epoch; + let (updates, deletes) = changes.parts(); + + debug!( + "received managed connector changes updates: {},deletes: {},epoch: {}", + updates.len(), + deletes.len(), + epoch, + ); + + for mc_item in updates.into_iter() { + self.sync_connector_to_deployment(mc_item).await? + } + + Ok(()) + } + + #[instrument(skip(self, managed_connector))] + async fn sync_connector_to_deployment( + &mut self, + managed_connector: MetadataStoreObject, + ) -> Result<(), ClientError> { + let key = managed_connector.key(); + + let k8_deployment_spec = + Self::generate_k8_deployment_spec(managed_connector.spec(), &self.namespace, key); + trace!(?k8_deployment_spec); + let deployment_action = WSAction::Apply( + MetadataStoreObject::with_spec(key, k8_deployment_spec.into()) + .with_context(managed_connector.ctx().create_child()), + ); + + debug!(?deployment_action, "applying deployment"); + + self.deployments.wait_action(key, deployment_action).await?; + + Ok(()) + } + + const DEFAULT_CONNECTOR_NAME: &'static str = "fluvio-connector"; + pub fn generate_k8_deployment_spec( + mc_spec: &ManagedConnectorSpec, + _namespace: &str, + _name: &str, + ) -> K8DeploymentSpec { + let image = format!("infinyon/fluvio-connect-{}", mc_spec.type_); + debug!("Starting connector for image: {:?}", image); + + let config_map_volume_spec = VolumeSpec { + name: "fluvio-config-volume".to_string(), + config_map: Some(ConfigMapVolumeSource { + name: Some("fluvio-config-map".to_string()), + items: Some(vec![KeyToPath { + key: "fluvioClientConfig".to_string(), + path: "config".to_string(), + ..Default::default() + }]), + ..Default::default() + }), + ..Default::default() + }; + + let parameters = &mc_spec.parameters; + let args: Vec = parameters + .keys() + .zip(parameters.values()) + .flat_map(|(key, value)| [key.clone(), value.clone()]) + .collect::>(); + let template = TemplateSpec { + metadata: Some( + TemplateMeta::default().set_labels(vec![("app", Self::DEFAULT_CONNECTOR_NAME)]), + ), + spec: PodSpec { + termination_grace_period_seconds: Some(10), + containers: vec![ContainerSpec { + name: Self::DEFAULT_CONNECTOR_NAME.to_owned(), + image: Some(image), + image_pull_policy: Some("IfNotPresent".to_string()), + /* + env, // TODO + */ + volume_mounts: vec![VolumeMount { + name: "fluvio-config-volume".to_string(), + mount_path: "/home/fluvio/.fluvio".to_string(), + ..Default::default() + }], + args, + ..Default::default() + }], + volumes: vec![config_map_volume_spec], + //security_context: spu_k8_config.pod_security_context.clone(), + //node_selector: Some(spu_pod_config.node_selector.clone()), + ..Default::default() + }, + }; + + let mut match_labels = HashMap::new(); + match_labels.insert("app".to_owned(), Self::DEFAULT_CONNECTOR_NAME.to_owned()); + + K8DeploymentSpec { + template, + selector: LabelSelector { match_labels }, + ..Default::default() + } + } +} diff --git a/crates/fluvio-sc/src/k8/controllers/mod.rs b/crates/fluvio-sc/src/k8/controllers/mod.rs index 05208eba5c..b8d2cfa6d9 100644 --- a/crates/fluvio-sc/src/k8/controllers/mod.rs +++ b/crates/fluvio-sc/src/k8/controllers/mod.rs @@ -1,6 +1,7 @@ pub mod spg_stateful; pub mod spu_service; pub mod spu_controller; +pub mod managed_connector_deployment; pub use k8_operator::run_k8_operators; @@ -18,9 +19,12 @@ mod k8_operator { use crate::k8::objects::statefulset::StatefulsetSpec; use crate::k8::objects::spg_service::SpgServiceSpec; use crate::k8::objects::spu_k8_config::ScK8Config; + use crate::k8::objects::managed_connector_deployment::ManagedConnectorDeploymentSpec; + use crate::k8::controllers::spg_stateful::SpgStatefulSetController; use crate::k8::controllers::spu_service::SpuServiceController; use crate::k8::controllers::spu_controller::K8SpuController; + use crate::k8::controllers::managed_connector_deployment::ManagedConnectorDeploymentController; pub async fn run_k8_operators( namespace: String, @@ -33,6 +37,9 @@ mod k8_operator { let spu_service_ctx: StoreContext = StoreContext::new(); let statefulset_ctx: StoreContext = StoreContext::new(); let spg_service_ctx: StoreContext = StoreContext::new(); + let managed_connector_deployments_ctx: StoreContext = + StoreContext::new(); + let config_ctx: StoreContext = StoreContext::new(); info!("starting k8 cluster operators"); @@ -55,11 +62,17 @@ mod k8_operator { spg_service_ctx.clone(), ); + K8ClusterStateDispatcher::<_, _>::start( + namespace.clone(), + k8_client.clone(), + managed_connector_deployments_ctx.clone(), + ); + K8ClusterStateDispatcher::<_, _>::start(namespace.clone(), k8_client, config_ctx.clone()); whitelist!(config, "k8_spg", { SpgStatefulSetController::start( - namespace, + namespace.clone(), config_ctx.clone(), global_ctx.spgs().clone(), statefulset_ctx, @@ -80,5 +93,12 @@ mod k8_operator { whitelist!(config, "k8_spu_service", { SpuServiceController::start(config_ctx, spu_service_ctx, global_ctx.spgs().clone()); }); + whitelist!(config, "k8_managed_connector_delpoyment", { + ManagedConnectorDeploymentController::start( + namespace, + global_ctx.managed_connectors().clone(), + managed_connector_deployments_ctx, + ); + }); } } diff --git a/crates/fluvio-sc/src/k8/objects/managed_connector_deployment.rs b/crates/fluvio-sc/src/k8/objects/managed_connector_deployment.rs new file mode 100644 index 0000000000..2826704b4f --- /dev/null +++ b/crates/fluvio-sc/src/k8/objects/managed_connector_deployment.rs @@ -0,0 +1,98 @@ +use std::fmt; + +use serde::Deserialize; +use serde::Serialize; + +use crate::dispatcher::core::Spec; +use crate::dispatcher::core::Status; +use crate::stores::connector::ManagedConnectorSpec; + +pub use k8_types::app::deployment::DeploymentStatus as K8DeploymentStatus; +pub use k8_types::app::deployment::DeploymentSpec as K8DeploymentSpec; + +/// Statefulset Spec +#[derive(Deserialize, Serialize, Debug, Default, Clone, PartialEq)] +#[serde(transparent)] +pub struct ManagedConnectorDeploymentSpec(K8DeploymentSpec); + +impl Spec for ManagedConnectorDeploymentSpec { + const LABEL: &'static str = "Deployment"; + type IndexKey = String; + type Status = DeploymentStatus; + type Owner = ManagedConnectorSpec; +} + +impl From for ManagedConnectorDeploymentSpec { + fn from(k8: K8DeploymentSpec) -> Self { + Self(k8) + } +} + +impl From for K8DeploymentSpec { + fn from(spec: ManagedConnectorDeploymentSpec) -> Self { + spec.0 + } +} + +/// Statefulset Spec +#[derive(Deserialize, Serialize, Debug, Default, Clone, PartialEq)] +#[serde(transparent)] +pub struct DeploymentStatus(pub K8DeploymentStatus); + +impl Status for DeploymentStatus {} + +impl fmt::Display for DeploymentStatus { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "{:#?}", self.0) + } +} + +impl From for DeploymentStatus { + fn from(k8: K8DeploymentStatus) -> Self { + Self(k8) + } +} + +impl From for K8DeploymentStatus { + fn from(status: DeploymentStatus) -> Self { + status.0 + } +} + +mod extended { + + use k8_types::K8Obj; + + use tracing::trace; + + use crate::stores::k8::K8ConvertError; + use crate::stores::k8::K8ExtendedSpec; + use crate::stores::k8::K8MetaItem; + use crate::stores::MetadataStoreObject; + use crate::stores::k8::default_convert_from_k8; + use fluvio_controlplane_metadata::k8_types::Spec; + use fluvio_controlplane_metadata::connector::K8ManagedConnectorSpec; + + use super::*; + + impl K8ExtendedSpec for ManagedConnectorDeploymentSpec { + type K8Spec = K8DeploymentSpec; + type K8Status = K8DeploymentStatus; + + fn convert_from_k8( + k8_obj: K8Obj, + ) -> Result, K8ConvertError> { + if k8_obj + .metadata + .owner_references + .iter() + .any(|v| v.kind == K8ManagedConnectorSpec::metadata().names.kind) + { + trace!("converting k8 managed connector: {:#?}", k8_obj); + default_convert_from_k8(k8_obj) + } else { + Err(K8ConvertError::Skip(k8_obj)) + } + } + } +} diff --git a/crates/fluvio-sc/src/k8/objects/mod.rs b/crates/fluvio-sc/src/k8/objects/mod.rs index 6762ecee40..48ef2308db 100644 --- a/crates/fluvio-sc/src/k8/objects/mod.rs +++ b/crates/fluvio-sc/src/k8/objects/mod.rs @@ -3,3 +3,4 @@ pub mod spg_service; pub mod spu_k8_config; pub mod statefulset; pub mod spu_service; +pub mod managed_connector_deployment; diff --git a/k8-util/helm/fluvio-app/templates/fluvio-connector-config.yaml b/k8-util/helm/fluvio-app/templates/fluvio-connector-config.yaml new file mode 100644 index 0000000000..72aab161ad --- /dev/null +++ b/k8-util/helm/fluvio-app/templates/fluvio-connector-config.yaml @@ -0,0 +1,21 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: fluvio-config-map +data: + fluvioClientConfig: | + version = "2.0" + current_profile = "k8-config-map-deployment" + + [profile.k8-config-map-deployment] + cluster = "k8-config-map-deployment" + + [cluster.k8-config-map-deployment] + {{ if .Values.tls }} + endpoint = "fluvio-sc-public:9005" + {{else}} + endpoint = "fluvio-sc-public:9003" + {{end}} + use_spu_local_address = true + [cluster.k8-config-map-deployment.tls] + tls_policy = "disabled"