Skip to content

Commit

Permalink
Add Deployment controller for managed connectors (#1499)
Browse files Browse the repository at this point in the history
Closes #1648.

Co-authored-by: Nick Cardin <nick@cardin.email>
  • Loading branch information
simlay and nacardin committed Sep 30, 2021
1 parent e320332 commit dea95a0
Show file tree
Hide file tree
Showing 10 changed files with 405 additions and 18 deletions.
7 changes: 4 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
## Platform Version 0.9.9 - UNRELEASED
* Add `impl std::error::Error for ErrorCode` for better error reporting ([#1693](https://github.com/infinyon/fluvio/pull/1693))
* Add ability to create a consumer that can read from multiple partitions concurrently. ([#1568](https://github.com/infinyon/fluvio/issues/1568))
* Expose partition for `fluvio consume --format`. ([#1701](https://github.com/infinyon/fluvio/issues/1701))
* Fix consumer stream hanging after rollver ([#1700](https://github.com/infinyon/fluvio/issues/1700))
* Expose partition for `fluvio consume --format`. ([#1701](https://github.com/infinyon/fluvio/issues/1701))
* Fix consumer stream hanging after rollver ([#1700](https://github.com/infinyon/fluvio/issues/1700))
* Added deployment controller for managed connectors ([#1499](https://github.com/infinyon/fluvio/pull/1499)).

## Platform Version 0.9.8 - 2021-09-23
* Add progress indicator to `fluvio cluster start` ([#1627](https://github.com/infinyon/fluvio/pull/1627))
Expand All @@ -26,7 +27,7 @@
* Increase platform stability ([#1497](https://github.com/infinyon/fluvio/pull/1497))
* Spawn a thread to handle stream fetch requests ([#1522](https://github.com/infinyon/fluvio/issues/1522))

## Platform Version 0.9.4 - 2021-08-26
## Platform Version 0.9.4 - 2021-08-26
* Publish docker image for aarch64 #1389 ([#1389](https://github.com/infinyon/fluvio/pull/1389))
* Do not panic when trying to create topic with space in the name. ([#1448](https://github.com/infinyon/fluvio/pull/1448))
* Deprecate consumer fetch API ([#957](https://github.com/infinyon/fluvio/issues/957))
Expand Down
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 8 additions & 2 deletions crates/fluvio-connector/src/cli/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@
//! CLI tree to generate Create a Managed Connector
//!
use fluvio_controlplane_metadata::connector::ManagedConnectorSpec;
use structopt::StructOpt;
use tracing::debug;

use fluvio::Fluvio;
use fluvio::metadata::topic::{TopicSpec, TopicReplicaParam};
use fluvio_controlplane_metadata::connector::ManagedConnectorSpec;

use crate::error::ConnectorError;
use crate::config::ConnectorConfig;
Expand All @@ -27,12 +28,17 @@ pub struct CreateManagedConnectorOpt {
impl CreateManagedConnectorOpt {
pub async fn process(self, fluvio: &Fluvio) -> Result<(), ConnectorError> {
let config = ConnectorConfig::from_file(&self.config)?;
let spec: ManagedConnectorSpec = config.into();
let spec: ManagedConnectorSpec = config.clone().into();
let name = spec.name.clone();

debug!("creating managed_connector: {}, spec: {:#?}", name, spec);

let admin = fluvio.admin().await;
if config.create_topic {
let topic_spec = TopicSpec::Computed(TopicReplicaParam::new(1, 1, false));
debug!("topic spec: {:?}", topic_spec);
admin.create(config.topic, false, topic_spec).await?;
}
admin.create(name.to_string(), false, spec).await?;

Ok(())
Expand Down
15 changes: 6 additions & 9 deletions crates/fluvio-connector/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,21 @@ use std::io::Read;
use fluvio_controlplane_metadata::connector::ManagedConnectorSpec;
use crate::error::ConnectorError;

#[derive(Debug, Deserialize)]
#[derive(Debug, Deserialize, Clone)]
pub struct ConnectorConfig {
name: String,
#[serde(rename = "type")]
type_: String,
topic: String,
create_topic: Option<bool>,
#[serde(default = "ConnectorConfig::default_args")]
pub(crate) topic: String,
#[serde(default)]
pub(crate) create_topic: bool,
#[serde(default)]
parameters: BTreeMap<String, String>,
#[serde(default = "ConnectorConfig::default_args")]
#[serde(default)]
secrets: BTreeMap<String, String>,
}

impl ConnectorConfig {
fn default_args() -> BTreeMap<String, String> {
BTreeMap::new()
}

pub fn from_file<P: Into<PathBuf>>(path: P) -> Result<ConnectorConfig, ConnectorError> {
let mut file = File::open(path.into())?;
let mut contents = String::new();
Expand Down
3 changes: 2 additions & 1 deletion crates/fluvio-connector/test-data/test-config.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
version: v1
name: my-test-connector
type: test-connector
topic: foobar
topic: test-connector
create_topic: true
direction: source
paramaters:
foo: bar
Expand Down
242 changes: 242 additions & 0 deletions crates/fluvio-sc/src/k8/controllers/managed_connector_deployment.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,242 @@
use std::{collections::HashMap, time::Duration};

use tracing::{debug, error, trace, instrument};
use k8_client::ClientError;
use k8_types::{
LabelSelector, TemplateSpec, TemplateMeta,
core::pod::{
PodSpec, ContainerSpec, VolumeMount, ConfigMapVolumeSource, KeyToPath, VolumeSpec,
},
LabelProvider,
};

use fluvio_future::{task::spawn, timer::sleep};
use fluvio_stream_dispatcher::store::K8ChangeListener;

use crate::stores::{
StoreContext,
connector::{ManagedConnectorSpec, ManagedConnectorStatus, ManagedConnectorStatusResolution},
k8::K8MetaItem,
MetadataStoreObject,
actions::WSAction,
};

use crate::k8::objects::managed_connector_deployment::{
ManagedConnectorDeploymentSpec, K8DeploymentSpec,
};

/// Update Statefulset and Service from SPG
pub struct ManagedConnectorDeploymentController {
namespace: String,
connectors: StoreContext<ManagedConnectorSpec>,
deployments: StoreContext<ManagedConnectorDeploymentSpec>,
}

impl ManagedConnectorDeploymentController {
pub fn start(
namespace: String,
connectors: StoreContext<ManagedConnectorSpec>,
deployments: StoreContext<ManagedConnectorDeploymentSpec>,
) {
let controller = Self {
namespace,
connectors,
deployments,
};

spawn(controller.dispatch_loop());
}

async fn dispatch_loop(mut self) {
loop {
if let Err(err) = self.inner_loop().await {
error!("error with managed connector loop: {:#?}", err);
debug!("sleeping 1 miniute to try again");
sleep(Duration::from_secs(60)).await;
}
}
}

#[instrument(skip(self), name = "ManagedConnectorDeploymentController")]
async fn inner_loop(&mut self) -> Result<(), ClientError> {
use tokio::select;

let mut connector_listener = self.connectors.change_listener();
let _ = connector_listener.wait_for_initial_sync().await;

let mut deployment_listener = self.deployments.change_listener();
let _ = deployment_listener.wait_for_initial_sync().await;

self.sync_connectors_to_deployments(&mut connector_listener)
.await?;

loop {
select! {
_ = connector_listener.listen() => {
debug!("detected connector changes");
self.sync_connectors_to_deployments(&mut connector_listener).await?;
},
_ = deployment_listener.listen() => {
self.sync_deployments_to_connector(&mut deployment_listener).await?;
}
}
}
}

async fn sync_deployments_to_connector(
&mut self,
listener: &mut K8ChangeListener<ManagedConnectorDeploymentSpec>,
) -> Result<(), ClientError> {
if !listener.has_change() {
trace!("no managed connector change, skipping");
return Ok(());
}
let changes = listener.sync_changes().await;
let epoch = changes.epoch;
let (updates, deletes) = changes.parts();

debug!(
"received managed connector deployment changes updates: {},deletes: {},epoch: {}",
updates.len(),
deletes.len(),
epoch,
);
for mc_deployment in updates.into_iter() {
let key = mc_deployment.key();
let deployment_status = mc_deployment.status();
let ready_replicas = deployment_status.0.ready_replicas;
let resolution = match ready_replicas {
Some(ready_replicas) if ready_replicas > 0 => {
ManagedConnectorStatusResolution::Running
}
_ => ManagedConnectorStatusResolution::Failed,
};

let connector_status = ManagedConnectorStatus {
resolution,
..Default::default()
};
self.connectors
.update_status(key.to_string(), connector_status.clone())
.await?;
}
Ok(())
}

/// svc has been changed, update spu
async fn sync_connectors_to_deployments(
&mut self,
listener: &mut K8ChangeListener<ManagedConnectorSpec>,
) -> Result<(), ClientError> {
if !listener.has_change() {
trace!("no managed connector change, skipping");
return Ok(());
}

let changes = listener.sync_changes().await;
let epoch = changes.epoch;
let (updates, deletes) = changes.parts();

debug!(
"received managed connector changes updates: {},deletes: {},epoch: {}",
updates.len(),
deletes.len(),
epoch,
);

for mc_item in updates.into_iter() {
self.sync_connector_to_deployment(mc_item).await?
}

Ok(())
}

#[instrument(skip(self, managed_connector))]
async fn sync_connector_to_deployment(
&mut self,
managed_connector: MetadataStoreObject<ManagedConnectorSpec, K8MetaItem>,
) -> Result<(), ClientError> {
let key = managed_connector.key();

let k8_deployment_spec =
Self::generate_k8_deployment_spec(managed_connector.spec(), &self.namespace, key);
trace!(?k8_deployment_spec);
let deployment_action = WSAction::Apply(
MetadataStoreObject::with_spec(key, k8_deployment_spec.into())
.with_context(managed_connector.ctx().create_child()),
);

debug!(?deployment_action, "applying deployment");

self.deployments.wait_action(key, deployment_action).await?;

Ok(())
}

const DEFAULT_CONNECTOR_NAME: &'static str = "fluvio-connector";
pub fn generate_k8_deployment_spec(
mc_spec: &ManagedConnectorSpec,
_namespace: &str,
_name: &str,
) -> K8DeploymentSpec {
let image = format!("infinyon/fluvio-connect-{}", mc_spec.type_);
debug!("Starting connector for image: {:?}", image);

let config_map_volume_spec = VolumeSpec {
name: "fluvio-config-volume".to_string(),
config_map: Some(ConfigMapVolumeSource {
name: Some("fluvio-config-map".to_string()),
items: Some(vec![KeyToPath {
key: "fluvioClientConfig".to_string(),
path: "config".to_string(),
..Default::default()
}]),
..Default::default()
}),
..Default::default()
};

let parameters = &mc_spec.parameters;
let args: Vec<String> = parameters
.keys()
.zip(parameters.values())
.flat_map(|(key, value)| [key.clone(), value.clone()])
.collect::<Vec<_>>();
let template = TemplateSpec {
metadata: Some(
TemplateMeta::default().set_labels(vec![("app", Self::DEFAULT_CONNECTOR_NAME)]),
),
spec: PodSpec {
termination_grace_period_seconds: Some(10),
containers: vec![ContainerSpec {
name: Self::DEFAULT_CONNECTOR_NAME.to_owned(),
image: Some(image),
image_pull_policy: Some("IfNotPresent".to_string()),
/*
env, // TODO
*/
volume_mounts: vec![VolumeMount {
name: "fluvio-config-volume".to_string(),
mount_path: "/home/fluvio/.fluvio".to_string(),
..Default::default()
}],
args,
..Default::default()
}],
volumes: vec![config_map_volume_spec],
//security_context: spu_k8_config.pod_security_context.clone(),
//node_selector: Some(spu_pod_config.node_selector.clone()),
..Default::default()
},
};

let mut match_labels = HashMap::new();
match_labels.insert("app".to_owned(), Self::DEFAULT_CONNECTOR_NAME.to_owned());

K8DeploymentSpec {
template,
selector: LabelSelector { match_labels },
..Default::default()
}
}
}
Loading

0 comments on commit dea95a0

Please sign in to comment.